You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/02 02:59:04 UTC
[14/43] hbase git commit: HBASE-20497 The getRecoveredQueueStartPos
always return 0 in RecoveredReplicationSourceShipper
HBASE-20497 The getRecoveredQueueStartPos always return 0 in RecoveredReplicationSourceShipper
Signed-off-by: zhangduo <zh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a1363038
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a1363038
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a1363038
Branch: refs/heads/HBASE-19064
Commit: a13630383335371dee338f4e2b42ac0f5de57667
Parents: 59f6ecd
Author: huzheng <op...@gmail.com>
Authored: Sat Apr 28 11:14:43 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Apr 28 20:50:30 2018 +0800
----------------------------------------------------------------------
.../RecoveredReplicationSourceShipper.java | 26 +-
.../replication/TestReplicationSource.java | 296 -----------------
.../regionserver/TestReplicationSource.java | 323 +++++++++++++++++++
3 files changed, 335 insertions(+), 310 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/a1363038/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
index d74211e..91109cf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -60,7 +60,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
}
@Override
- long getStartPosition() {
+ public long getStartPosition() {
long startPosition = getRecoveredQueueStartPos();
int numRetries = 0;
while (numRetries <= maxRetriesMultiplier) {
@@ -79,32 +79,30 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
// normally has a position (unless the RS failed between 2 logs)
private long getRecoveredQueueStartPos() {
long startPosition = 0;
- String peerClusterZnode = source.getQueueId();
+ String peerClusterZNode = source.getQueueId();
try {
- startPosition = this.replicationQueues.getWALPosition(source.getServerWALsBelongTo(),
- peerClusterZnode, this.queue.peek().getName());
- if (LOG.isTraceEnabled()) {
- LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position " +
- startPosition);
- }
+ startPosition = this.replicationQueues.getWALPosition(source.getServer().getServerName(),
+ peerClusterZNode, this.queue.peek().getName());
+ LOG.trace("Recovered queue started with log {} at position {}", this.queue.peek(),
+ startPosition);
} catch (ReplicationException e) {
- terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
+ terminate("Couldn't get the position of this recovered queue " + peerClusterZNode, e);
}
return startPosition;
}
private void terminate(String reason, Exception cause) {
if (cause == null) {
- LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
-
+ LOG.info("Closing worker for wal group {} because: {}", this.walGroupId, reason);
} else {
- LOG.error("Closing worker for wal group " + this.walGroupId
- + " because an error occurred: " + reason, cause);
+ LOG.error(
+ "Closing worker for wal group " + this.walGroupId + " because an error occurred: " + reason,
+ cause);
}
entryReader.interrupt();
Threads.shutdown(entryReader, sleepForRetries);
this.interrupt();
Threads.shutdown(this, sleepForRetries);
- LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
+ LOG.info("ReplicationSourceWorker {} terminated", this.getName());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a1363038/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
deleted file mode 100644
index 1bb361b..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-import java.io.IOException;
-import java.util.OptionalLong;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.Waiter.Predicate;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.regionserver.Replication;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.apache.hadoop.hbase.wal.WALProvider;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Category({ReplicationTests.class, MediumTests.class})
-public class TestReplicationSource {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestReplicationSource.class);
-
- private static final Logger LOG =
- LoggerFactory.getLogger(TestReplicationSource.class);
- private final static HBaseTestingUtility TEST_UTIL =
- new HBaseTestingUtility();
- private final static HBaseTestingUtility TEST_UTIL_PEER =
- new HBaseTestingUtility();
- private static FileSystem FS;
- private static Path oldLogDir;
- private static Path logDir;
- private static Configuration conf = TEST_UTIL.getConfiguration();
-
- /**
- * @throws java.lang.Exception
- */
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- TEST_UTIL.startMiniDFSCluster(1);
- FS = TEST_UTIL.getDFSCluster().getFileSystem();
- Path rootDir = TEST_UTIL.createRootDir();
- oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
- if (FS.exists(oldLogDir)) FS.delete(oldLogDir, true);
- logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
- if (FS.exists(logDir)) FS.delete(logDir, true);
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- TEST_UTIL_PEER.shutdownMiniHBaseCluster();
- TEST_UTIL.shutdownMiniHBaseCluster();
- TEST_UTIL.shutdownMiniDFSCluster();
- }
-
- /**
- * Sanity check that we can move logs around while we are reading
- * from them. Should this test fail, ReplicationSource would have a hard
- * time reading logs that are being archived.
- * @throws Exception
- */
- @Test
- public void testLogMoving() throws Exception{
- Path logPath = new Path(logDir, "log");
- if (!FS.exists(logDir)) FS.mkdirs(logDir);
- if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir);
- WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath,
- TEST_UTIL.getConfiguration());
- for(int i = 0; i < 3; i++) {
- byte[] b = Bytes.toBytes(Integer.toString(i));
- KeyValue kv = new KeyValue(b,b,b);
- WALEdit edit = new WALEdit();
- edit.add(kv);
- WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0,
- HConstants.DEFAULT_CLUSTER_ID);
- writer.append(new WAL.Entry(key, edit));
- writer.sync(false);
- }
- writer.close();
-
- WAL.Reader reader = WALFactory.createReader(FS, logPath, TEST_UTIL.getConfiguration());
- WAL.Entry entry = reader.next();
- assertNotNull(entry);
-
- Path oldLogPath = new Path(oldLogDir, "log");
- FS.rename(logPath, oldLogPath);
-
- entry = reader.next();
- assertNotNull(entry);
-
- entry = reader.next();
- entry = reader.next();
-
- assertNull(entry);
- reader.close();
- }
-
- /**
- * Tests that {@link ReplicationSource#terminate(String)} will timeout properly
- */
- @Test
- public void testTerminateTimeout() throws Exception {
- ReplicationSource source = new ReplicationSource();
- ReplicationEndpoint replicationEndpoint = new HBaseInterClusterReplicationEndpoint() {
- @Override
- protected void doStart() {
- notifyStarted();
- }
-
- @Override
- protected void doStop() {
- // not calling notifyStopped() here causes the caller of stop() to get a Future that never
- // completes
- }
- };
- replicationEndpoint.start();
- ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
- Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
- Configuration testConf = HBaseConfiguration.create();
- testConf.setInt("replication.source.maxretriesmultiplier", 1);
- ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
- Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
- source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null,
- p -> OptionalLong.empty(), null);
- ExecutorService executor = Executors.newSingleThreadExecutor();
- Future<?> future = executor.submit(new Runnable() {
-
- @Override
- public void run() {
- source.terminate("testing source termination");
- }
- });
- long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
- Waiter.waitFor(testConf, sleepForRetries * 2, new Predicate<Exception>() {
-
- @Override
- public boolean evaluate() throws Exception {
- return future.isDone();
- }
-
- });
-
- }
-
- /**
- * Tests that recovered queues are preserved on a regionserver shutdown.
- * See HBASE-18192
- * @throws Exception
- */
- @Test
- public void testServerShutdownRecoveredQueue() throws Exception {
- try {
- // Ensure single-threaded WAL
- conf.set("hbase.wal.provider", "defaultProvider");
- conf.setInt("replication.sleep.before.failover", 2000);
- // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in.
- conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName());
- MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
- TEST_UTIL_PEER.startMiniCluster(1);
-
- HRegionServer serverA = cluster.getRegionServer(0);
- final ReplicationSourceManager managerA =
- ((Replication) serverA.getReplicationSourceService()).getReplicationManager();
- HRegionServer serverB = cluster.getRegionServer(1);
- final ReplicationSourceManager managerB =
- ((Replication) serverB.getReplicationSourceService()).getReplicationManager();
- final Admin admin = TEST_UTIL.getAdmin();
-
- final String peerId = "TestPeer";
- admin.addReplicationPeer(peerId,
- new ReplicationPeerConfig().setClusterKey(TEST_UTIL_PEER.getClusterKey()));
- // Wait for replication sources to come up
- Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
- @Override public boolean evaluate() throws Exception {
- return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty());
- }
- });
- // Disabling peer makes sure there is at least one log to claim when the server dies
- // The recovered queue will also stay there until the peer is disabled even if the
- // WALs it contains have no data.
- admin.disableReplicationPeer(peerId);
-
- // Stopping serverA
- // It's queues should be claimed by the only other alive server i.e. serverB
- cluster.stopRegionServer(serverA.getServerName());
- Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
- @Override public boolean evaluate() throws Exception {
- return managerB.getOldSources().size() == 1;
- }
- });
-
- final HRegionServer serverC = cluster.startRegionServer().getRegionServer();
- serverC.waitForServerOnline();
- Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
- @Override public boolean evaluate() throws Exception {
- return serverC.getReplicationSourceService() != null;
- }
- });
- final ReplicationSourceManager managerC =
- ((Replication) serverC.getReplicationSourceService()).getReplicationManager();
- // Sanity check
- assertEquals(0, managerC.getOldSources().size());
-
- // Stopping serverB
- // Now serverC should have two recovered queues:
- // 1. The serverB's normal queue
- // 2. serverA's recovered queue on serverB
- cluster.stopRegionServer(serverB.getServerName());
- Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
- @Override public boolean evaluate() throws Exception {
- return managerC.getOldSources().size() == 2;
- }
- });
- admin.enableReplicationPeer(peerId);
- Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
- @Override public boolean evaluate() throws Exception {
- return managerC.getOldSources().size() == 0;
- }
- });
- } finally {
- conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
- }
- }
-
- /**
- * Regionserver implementation that adds a delay on the graceful shutdown.
- */
- public static class ShutdownDelayRegionServer extends HRegionServer {
- public ShutdownDelayRegionServer(Configuration conf) throws IOException, InterruptedException {
- super(conf);
- }
-
- @Override
- protected void stopServiceThreads() {
- // Add a delay before service threads are shutdown.
- // This will keep the zookeeper connection alive for the duration of the delay.
- LOG.info("Adding a delay to the regionserver shutdown");
- try {
- Thread.sleep(2000);
- } catch (InterruptedException ex) {
- LOG.error("Interrupted while sleeping");
- }
- super.stopServiceThreads();
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/hbase/blob/a1363038/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
new file mode 100644
index 0000000..274ccab
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.OptionalLong;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.Waiter.Predicate;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSourceShipper;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ReplicationTests.class, MediumTests.class})
+public class TestReplicationSource {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationSource.class);
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestReplicationSource.class);
+ private final static HBaseTestingUtility TEST_UTIL =
+ new HBaseTestingUtility();
+ private final static HBaseTestingUtility TEST_UTIL_PEER =
+ new HBaseTestingUtility();
+ private static FileSystem FS;
+ private static Path oldLogDir;
+ private static Path logDir;
+ private static Configuration conf = TEST_UTIL.getConfiguration();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniDFSCluster(1);
+ FS = TEST_UTIL.getDFSCluster().getFileSystem();
+ Path rootDir = TEST_UTIL.createRootDir();
+ oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ if (FS.exists(oldLogDir)) FS.delete(oldLogDir, true);
+ logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
+ if (FS.exists(logDir)) FS.delete(logDir, true);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL_PEER.shutdownMiniHBaseCluster();
+ TEST_UTIL.shutdownMiniHBaseCluster();
+ TEST_UTIL.shutdownMiniDFSCluster();
+ }
+
+ /**
+ * Sanity check that we can move logs around while we are reading
+ * from them. Should this test fail, ReplicationSource would have a hard
+ * time reading logs that are being archived.
+ */
+ @Test
+ public void testLogMoving() throws Exception{
+ Path logPath = new Path(logDir, "log");
+ if (!FS.exists(logDir)) FS.mkdirs(logDir);
+ if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir);
+ WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath,
+ TEST_UTIL.getConfiguration());
+ for(int i = 0; i < 3; i++) {
+ byte[] b = Bytes.toBytes(Integer.toString(i));
+ KeyValue kv = new KeyValue(b,b,b);
+ WALEdit edit = new WALEdit();
+ edit.add(kv);
+ WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0,
+ HConstants.DEFAULT_CLUSTER_ID);
+ writer.append(new WAL.Entry(key, edit));
+ writer.sync(false);
+ }
+ writer.close();
+
+ WAL.Reader reader = WALFactory.createReader(FS, logPath, TEST_UTIL.getConfiguration());
+ WAL.Entry entry = reader.next();
+ assertNotNull(entry);
+
+ Path oldLogPath = new Path(oldLogDir, "log");
+ FS.rename(logPath, oldLogPath);
+
+ entry = reader.next();
+ assertNotNull(entry);
+
+ entry = reader.next();
+ entry = reader.next();
+
+ assertNull(entry);
+ reader.close();
+ }
+
+ /**
+ * Tests that {@link ReplicationSource#terminate(String)} will timeout properly
+ */
+ @Test
+ public void testTerminateTimeout() throws Exception {
+ ReplicationSource source = new ReplicationSource();
+ ReplicationEndpoint replicationEndpoint = new HBaseInterClusterReplicationEndpoint() {
+ @Override
+ protected void doStart() {
+ notifyStarted();
+ }
+
+ @Override
+ protected void doStop() {
+ // not calling notifyStopped() here causes the caller of stop() to get a Future that never
+ // completes
+ }
+ };
+ replicationEndpoint.start();
+ ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
+ Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
+ Configuration testConf = HBaseConfiguration.create();
+ testConf.setInt("replication.source.maxretriesmultiplier", 1);
+ ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
+ Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
+ source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null,
+ p -> OptionalLong.empty(), null);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ Future<?> future = executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ source.terminate("testing source termination");
+ }
+ });
+ long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
+ Waiter.waitFor(testConf, sleepForRetries * 2, new Predicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return future.isDone();
+ }
+ });
+ }
+
+ /**
+ * Tests that recovered queues are preserved on a regionserver shutdown.
+ * See HBASE-18192
+ */
+ @Test
+ public void testServerShutdownRecoveredQueue() throws Exception {
+ try {
+ // Ensure single-threaded WAL
+ conf.set("hbase.wal.provider", "defaultProvider");
+ conf.setInt("replication.sleep.before.failover", 2000);
+ // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in.
+ conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName());
+ MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
+ TEST_UTIL_PEER.startMiniCluster(1);
+
+ HRegionServer serverA = cluster.getRegionServer(0);
+ final ReplicationSourceManager managerA =
+ ((Replication) serverA.getReplicationSourceService()).getReplicationManager();
+ HRegionServer serverB = cluster.getRegionServer(1);
+ final ReplicationSourceManager managerB =
+ ((Replication) serverB.getReplicationSourceService()).getReplicationManager();
+ final Admin admin = TEST_UTIL.getAdmin();
+
+ final String peerId = "TestPeer";
+ admin.addReplicationPeer(peerId,
+ ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build());
+ // Wait for replication sources to come up
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() throws Exception {
+ return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty());
+ }
+ });
+ // Disabling peer makes sure there is at least one log to claim when the server dies
+ // The recovered queue will also stay there until the peer is disabled even if the
+ // WALs it contains have no data.
+ admin.disableReplicationPeer(peerId);
+
+ // Stopping serverA
+ // It's queues should be claimed by the only other alive server i.e. serverB
+ cluster.stopRegionServer(serverA.getServerName());
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() throws Exception {
+ return managerB.getOldSources().size() == 1;
+ }
+ });
+
+ final HRegionServer serverC = cluster.startRegionServer().getRegionServer();
+ serverC.waitForServerOnline();
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() throws Exception {
+ return serverC.getReplicationSourceService() != null;
+ }
+ });
+ final ReplicationSourceManager managerC =
+ ((Replication) serverC.getReplicationSourceService()).getReplicationManager();
+ // Sanity check
+ assertEquals(0, managerC.getOldSources().size());
+
+ // Stopping serverB
+ // Now serverC should have two recovered queues:
+ // 1. The serverB's normal queue
+ // 2. serverA's recovered queue on serverB
+ cluster.stopRegionServer(serverB.getServerName());
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() throws Exception {
+ return managerC.getOldSources().size() == 2;
+ }
+ });
+ admin.enableReplicationPeer(peerId);
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() throws Exception {
+ return managerC.getOldSources().size() == 0;
+ }
+ });
+ } finally {
+ conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
+ }
+ }
+
+ /**
+ * Regionserver implementation that adds a delay on the graceful shutdown.
+ */
+ public static class ShutdownDelayRegionServer extends HRegionServer {
+ public ShutdownDelayRegionServer(Configuration conf) throws IOException, InterruptedException {
+ super(conf);
+ }
+
+ @Override
+ protected void stopServiceThreads() {
+ // Add a delay before service threads are shutdown.
+ // This will keep the zookeeper connection alive for the duration of the delay.
+ LOG.info("Adding a delay to the regionserver shutdown");
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException ex) {
+ LOG.error("Interrupted while sleeping");
+ }
+ super.stopServiceThreads();
+ }
+ }
+
+ // Test HBASE-20497
+ @Test
+ public void testRecoveredReplicationSourceShipperGetPosition() throws Exception {
+ String walGroupId = "fake-wal-group-id";
+ ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L);
+ ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
+ PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
+ queue.put(new Path("/www/html/test"));
+ RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class);
+ Server server = Mockito.mock(Server.class);
+ Mockito.when(server.getServerName()).thenReturn(serverName);
+ Mockito.when(source.getServer()).thenReturn(server);
+ Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer);
+ ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class);
+ Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
+ .thenReturn(1001L);
+ Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
+ .thenReturn(-1L);
+ conf.setInt("replication.source.maxretriesmultiplier", -1);
+ RecoveredReplicationSourceShipper shipper =
+ new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage);
+ Assert.assertEquals(1001L, shipper.getStartPosition());
+ conf.unset("replication.source.maxretriesmultiplier");
+ }
+}
+