You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/02 02:59:04 UTC

[14/43] hbase git commit: HBASE-20497 The getRecoveredQueueStartPos always return 0 in RecoveredReplicationSourceShipper

HBASE-20497 The getRecoveredQueueStartPos always return 0 in RecoveredReplicationSourceShipper

Signed-off-by: zhangduo <zh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a1363038
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a1363038
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a1363038

Branch: refs/heads/HBASE-19064
Commit: a13630383335371dee338f4e2b42ac0f5de57667
Parents: 59f6ecd
Author: huzheng <op...@gmail.com>
Authored: Sat Apr 28 11:14:43 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Apr 28 20:50:30 2018 +0800

----------------------------------------------------------------------
 .../RecoveredReplicationSourceShipper.java      |  26 +-
 .../replication/TestReplicationSource.java      | 296 -----------------
 .../regionserver/TestReplicationSource.java     | 323 +++++++++++++++++++
 3 files changed, 335 insertions(+), 310 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a1363038/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
index d74211e..91109cf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -60,7 +60,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
   }
 
   @Override
-  long getStartPosition() {
+  public long getStartPosition() {
     long startPosition = getRecoveredQueueStartPos();
     int numRetries = 0;
     while (numRetries <= maxRetriesMultiplier) {
@@ -79,32 +79,30 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
   // normally has a position (unless the RS failed between 2 logs)
   private long getRecoveredQueueStartPos() {
     long startPosition = 0;
-    String peerClusterZnode = source.getQueueId();
+    String peerClusterZNode = source.getQueueId();
     try {
-      startPosition = this.replicationQueues.getWALPosition(source.getServerWALsBelongTo(),
-        peerClusterZnode, this.queue.peek().getName());
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position " +
-          startPosition);
-      }
+      startPosition = this.replicationQueues.getWALPosition(source.getServer().getServerName(),
+        peerClusterZNode, this.queue.peek().getName());
+      LOG.trace("Recovered queue started with log {} at position {}", this.queue.peek(),
+        startPosition);
     } catch (ReplicationException e) {
-      terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
+      terminate("Couldn't get the position of this recovered queue " + peerClusterZNode, e);
     }
     return startPosition;
   }
 
   private void terminate(String reason, Exception cause) {
     if (cause == null) {
-      LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
-
+      LOG.info("Closing worker for wal group {} because: {}", this.walGroupId, reason);
     } else {
-      LOG.error("Closing worker for wal group " + this.walGroupId
-          + " because an error occurred: " + reason, cause);
+      LOG.error(
+        "Closing worker for wal group " + this.walGroupId + " because an error occurred: " + reason,
+        cause);
     }
     entryReader.interrupt();
     Threads.shutdown(entryReader, sleepForRetries);
     this.interrupt();
     Threads.shutdown(this, sleepForRetries);
-    LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
+    LOG.info("ReplicationSourceWorker {} terminated", this.getName());
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a1363038/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
deleted file mode 100644
index 1bb361b..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-import java.io.IOException;
-import java.util.OptionalLong;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.Waiter.Predicate;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.regionserver.Replication;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.apache.hadoop.hbase.wal.WALProvider;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Category({ReplicationTests.class, MediumTests.class})
-public class TestReplicationSource {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestReplicationSource.class);
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TestReplicationSource.class);
-  private final static HBaseTestingUtility TEST_UTIL =
-      new HBaseTestingUtility();
-  private final static HBaseTestingUtility TEST_UTIL_PEER =
-      new HBaseTestingUtility();
-  private static FileSystem FS;
-  private static Path oldLogDir;
-  private static Path logDir;
-  private static Configuration conf = TEST_UTIL.getConfiguration();
-
-  /**
-   * @throws java.lang.Exception
-   */
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    TEST_UTIL.startMiniDFSCluster(1);
-    FS = TEST_UTIL.getDFSCluster().getFileSystem();
-    Path rootDir = TEST_UTIL.createRootDir();
-    oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
-    if (FS.exists(oldLogDir)) FS.delete(oldLogDir, true);
-    logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
-    if (FS.exists(logDir)) FS.delete(logDir, true);
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    TEST_UTIL_PEER.shutdownMiniHBaseCluster();
-    TEST_UTIL.shutdownMiniHBaseCluster();
-    TEST_UTIL.shutdownMiniDFSCluster();
-  }
-
-  /**
-   * Sanity check that we can move logs around while we are reading
-   * from them. Should this test fail, ReplicationSource would have a hard
-   * time reading logs that are being archived.
-   * @throws Exception
-   */
-  @Test
-  public void testLogMoving() throws Exception{
-    Path logPath = new Path(logDir, "log");
-    if (!FS.exists(logDir)) FS.mkdirs(logDir);
-    if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir);
-    WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath,
-        TEST_UTIL.getConfiguration());
-    for(int i = 0; i < 3; i++) {
-      byte[] b = Bytes.toBytes(Integer.toString(i));
-      KeyValue kv = new KeyValue(b,b,b);
-      WALEdit edit = new WALEdit();
-      edit.add(kv);
-      WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0,
-          HConstants.DEFAULT_CLUSTER_ID);
-      writer.append(new WAL.Entry(key, edit));
-      writer.sync(false);
-    }
-    writer.close();
-
-    WAL.Reader reader = WALFactory.createReader(FS, logPath, TEST_UTIL.getConfiguration());
-    WAL.Entry entry = reader.next();
-    assertNotNull(entry);
-
-    Path oldLogPath = new Path(oldLogDir, "log");
-    FS.rename(logPath, oldLogPath);
-
-    entry = reader.next();
-    assertNotNull(entry);
-
-    entry = reader.next();
-    entry = reader.next();
-
-    assertNull(entry);
-    reader.close();
-  }
-
-  /**
-   * Tests that {@link ReplicationSource#terminate(String)} will timeout properly
-   */
-  @Test
-  public void testTerminateTimeout() throws Exception {
-    ReplicationSource source = new ReplicationSource();
-    ReplicationEndpoint replicationEndpoint = new HBaseInterClusterReplicationEndpoint() {
-      @Override
-      protected void doStart() {
-        notifyStarted();
-      }
-
-      @Override
-      protected void doStop() {
-        // not calling notifyStopped() here causes the caller of stop() to get a Future that never
-        // completes
-      }
-    };
-    replicationEndpoint.start();
-    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
-    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
-    Configuration testConf = HBaseConfiguration.create();
-    testConf.setInt("replication.source.maxretriesmultiplier", 1);
-    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
-    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
-    source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null,
-      p -> OptionalLong.empty(), null);
-    ExecutorService executor = Executors.newSingleThreadExecutor();
-    Future<?> future = executor.submit(new Runnable() {
-
-      @Override
-      public void run() {
-        source.terminate("testing source termination");
-      }
-    });
-    long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
-    Waiter.waitFor(testConf, sleepForRetries * 2, new Predicate<Exception>() {
-
-      @Override
-      public boolean evaluate() throws Exception {
-        return future.isDone();
-      }
-
-    });
-
-  }
-
-  /**
-   * Tests that recovered queues are preserved on a regionserver shutdown.
-   * See HBASE-18192
-   * @throws Exception
-   */
-  @Test
-  public void testServerShutdownRecoveredQueue() throws Exception {
-    try {
-      // Ensure single-threaded WAL
-      conf.set("hbase.wal.provider", "defaultProvider");
-      conf.setInt("replication.sleep.before.failover", 2000);
-      // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in.
-      conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName());
-      MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
-      TEST_UTIL_PEER.startMiniCluster(1);
-
-      HRegionServer serverA = cluster.getRegionServer(0);
-      final ReplicationSourceManager managerA =
-          ((Replication) serverA.getReplicationSourceService()).getReplicationManager();
-      HRegionServer serverB = cluster.getRegionServer(1);
-      final ReplicationSourceManager managerB =
-          ((Replication) serverB.getReplicationSourceService()).getReplicationManager();
-      final Admin admin = TEST_UTIL.getAdmin();
-
-      final String peerId = "TestPeer";
-      admin.addReplicationPeer(peerId,
-          new ReplicationPeerConfig().setClusterKey(TEST_UTIL_PEER.getClusterKey()));
-      // Wait for replication sources to come up
-      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
-        @Override public boolean evaluate() throws Exception {
-          return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty());
-        }
-      });
-      // Disabling peer makes sure there is at least one log to claim when the server dies
-      // The recovered queue will also stay there until the peer is disabled even if the
-      // WALs it contains have no data.
-      admin.disableReplicationPeer(peerId);
-
-      // Stopping serverA
-      // It's queues should be claimed by the only other alive server i.e. serverB
-      cluster.stopRegionServer(serverA.getServerName());
-      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
-        @Override public boolean evaluate() throws Exception {
-          return managerB.getOldSources().size() == 1;
-        }
-      });
-
-      final HRegionServer serverC = cluster.startRegionServer().getRegionServer();
-      serverC.waitForServerOnline();
-      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
-        @Override public boolean evaluate() throws Exception {
-          return serverC.getReplicationSourceService() != null;
-        }
-      });
-      final ReplicationSourceManager managerC =
-          ((Replication) serverC.getReplicationSourceService()).getReplicationManager();
-      // Sanity check
-      assertEquals(0, managerC.getOldSources().size());
-
-      // Stopping serverB
-      // Now serverC should have two recovered queues:
-      // 1. The serverB's normal queue
-      // 2. serverA's recovered queue on serverB
-      cluster.stopRegionServer(serverB.getServerName());
-      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
-        @Override public boolean evaluate() throws Exception {
-          return managerC.getOldSources().size() == 2;
-        }
-      });
-      admin.enableReplicationPeer(peerId);
-      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
-        @Override public boolean evaluate() throws Exception {
-          return managerC.getOldSources().size() == 0;
-        }
-      });
-    } finally {
-      conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
-    }
-  }
-
-  /**
-   * Regionserver implementation that adds a delay on the graceful shutdown.
-   */
-  public static class ShutdownDelayRegionServer extends HRegionServer {
-    public ShutdownDelayRegionServer(Configuration conf) throws IOException, InterruptedException {
-      super(conf);
-    }
-
-    @Override
-    protected void stopServiceThreads() {
-      // Add a delay before service threads are shutdown.
-      // This will keep the zookeeper connection alive for the duration of the delay.
-      LOG.info("Adding a delay to the regionserver shutdown");
-      try {
-        Thread.sleep(2000);
-      } catch (InterruptedException ex) {
-        LOG.error("Interrupted while sleeping");
-      }
-      super.stopServiceThreads();
-    }
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/hbase/blob/a1363038/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
new file mode 100644
index 0000000..274ccab
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.OptionalLong;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.Waiter.Predicate;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSourceShipper;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ReplicationTests.class, MediumTests.class})
+public class TestReplicationSource {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestReplicationSource.class);
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestReplicationSource.class);
+  private final static HBaseTestingUtility TEST_UTIL =
+      new HBaseTestingUtility();
+  private final static HBaseTestingUtility TEST_UTIL_PEER =
+      new HBaseTestingUtility();
+  private static FileSystem FS;
+  private static Path oldLogDir;
+  private static Path logDir;
+  private static Configuration conf = TEST_UTIL.getConfiguration();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniDFSCluster(1);
+    FS = TEST_UTIL.getDFSCluster().getFileSystem();
+    Path rootDir = TEST_UTIL.createRootDir();
+    oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    if (FS.exists(oldLogDir)) FS.delete(oldLogDir, true);
+    logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
+    if (FS.exists(logDir)) FS.delete(logDir, true);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL_PEER.shutdownMiniHBaseCluster();
+    TEST_UTIL.shutdownMiniHBaseCluster();
+    TEST_UTIL.shutdownMiniDFSCluster();
+  }
+
+  /**
+   * Sanity check that we can move logs around while we are reading
+   * from them. Should this test fail, ReplicationSource would have a hard
+   * time reading logs that are being archived.
+   */
+  @Test
+  public void testLogMoving() throws Exception{
+    Path logPath = new Path(logDir, "log");
+    if (!FS.exists(logDir)) FS.mkdirs(logDir);
+    if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir);
+    WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath,
+        TEST_UTIL.getConfiguration());
+    for(int i = 0; i < 3; i++) {
+      byte[] b = Bytes.toBytes(Integer.toString(i));
+      KeyValue kv = new KeyValue(b,b,b);
+      WALEdit edit = new WALEdit();
+      edit.add(kv);
+      WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0,
+          HConstants.DEFAULT_CLUSTER_ID);
+      writer.append(new WAL.Entry(key, edit));
+      writer.sync(false);
+    }
+    writer.close();
+
+    WAL.Reader reader = WALFactory.createReader(FS, logPath, TEST_UTIL.getConfiguration());
+    WAL.Entry entry = reader.next();
+    assertNotNull(entry);
+
+    Path oldLogPath = new Path(oldLogDir, "log");
+    FS.rename(logPath, oldLogPath);
+
+    entry = reader.next();
+    assertNotNull(entry);
+
+    entry = reader.next();
+    entry = reader.next();
+
+    assertNull(entry);
+    reader.close();
+  }
+
+  /**
+   * Tests that {@link ReplicationSource#terminate(String)} will timeout properly
+   */
+  @Test
+  public void testTerminateTimeout() throws Exception {
+    ReplicationSource source = new ReplicationSource();
+    ReplicationEndpoint replicationEndpoint = new HBaseInterClusterReplicationEndpoint() {
+      @Override
+      protected void doStart() {
+        notifyStarted();
+      }
+
+      @Override
+      protected void doStop() {
+        // not calling notifyStopped() here causes the caller of stop() to get a Future that never
+        // completes
+      }
+    };
+    replicationEndpoint.start();
+    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
+    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
+    Configuration testConf = HBaseConfiguration.create();
+    testConf.setInt("replication.source.maxretriesmultiplier", 1);
+    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
+    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
+    source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null,
+      p -> OptionalLong.empty(), null);
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    Future<?> future = executor.submit(new Runnable() {
+
+      @Override
+      public void run() {
+        source.terminate("testing source termination");
+      }
+    });
+    long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
+    Waiter.waitFor(testConf, sleepForRetries * 2, new Predicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        return future.isDone();
+      }
+    });
+  }
+
+  /**
+   * Tests that recovered queues are preserved on a regionserver shutdown.
+   * See HBASE-18192
+   */
+  @Test
+  public void testServerShutdownRecoveredQueue() throws Exception {
+    try {
+      // Ensure single-threaded WAL
+      conf.set("hbase.wal.provider", "defaultProvider");
+      conf.setInt("replication.sleep.before.failover", 2000);
+      // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in.
+      conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName());
+      MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
+      TEST_UTIL_PEER.startMiniCluster(1);
+
+      HRegionServer serverA = cluster.getRegionServer(0);
+      final ReplicationSourceManager managerA =
+          ((Replication) serverA.getReplicationSourceService()).getReplicationManager();
+      HRegionServer serverB = cluster.getRegionServer(1);
+      final ReplicationSourceManager managerB =
+          ((Replication) serverB.getReplicationSourceService()).getReplicationManager();
+      final Admin admin = TEST_UTIL.getAdmin();
+
+      final String peerId = "TestPeer";
+      admin.addReplicationPeer(peerId,
+        ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build());
+      // Wait for replication sources to come up
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() throws Exception {
+          return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty());
+        }
+      });
+      // Disabling peer makes sure there is at least one log to claim when the server dies
+      // The recovered queue will also stay there until the peer is disabled even if the
+      // WALs it contains have no data.
+      admin.disableReplicationPeer(peerId);
+
+      // Stopping serverA
+      // It's queues should be claimed by the only other alive server i.e. serverB
+      cluster.stopRegionServer(serverA.getServerName());
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() throws Exception {
+          return managerB.getOldSources().size() == 1;
+        }
+      });
+
+      final HRegionServer serverC = cluster.startRegionServer().getRegionServer();
+      serverC.waitForServerOnline();
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() throws Exception {
+          return serverC.getReplicationSourceService() != null;
+        }
+      });
+      final ReplicationSourceManager managerC =
+          ((Replication) serverC.getReplicationSourceService()).getReplicationManager();
+      // Sanity check
+      assertEquals(0, managerC.getOldSources().size());
+
+      // Stopping serverB
+      // Now serverC should have two recovered queues:
+      // 1. The serverB's normal queue
+      // 2. serverA's recovered queue on serverB
+      cluster.stopRegionServer(serverB.getServerName());
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() throws Exception {
+          return managerC.getOldSources().size() == 2;
+        }
+      });
+      admin.enableReplicationPeer(peerId);
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() throws Exception {
+          return managerC.getOldSources().size() == 0;
+        }
+      });
+    } finally {
+      conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
+    }
+  }
+
+  /**
+   * Regionserver implementation that adds a delay on the graceful shutdown.
+   */
+  public static class ShutdownDelayRegionServer extends HRegionServer {
+    public ShutdownDelayRegionServer(Configuration conf) throws IOException, InterruptedException {
+      super(conf);
+    }
+
+    @Override
+    protected void stopServiceThreads() {
+      // Add a delay before service threads are shutdown.
+      // This will keep the zookeeper connection alive for the duration of the delay.
+      LOG.info("Adding a delay to the regionserver shutdown");
+      try {
+        Thread.sleep(2000);
+      } catch (InterruptedException ex) {
+        LOG.error("Interrupted while sleeping");
+      }
+      super.stopServiceThreads();
+    }
+  }
+
+  // Test HBASE-20497
+  @Test
+  public void testRecoveredReplicationSourceShipperGetPosition() throws Exception {
+    String walGroupId = "fake-wal-group-id";
+    ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L);
+    ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
+    PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
+    queue.put(new Path("/www/html/test"));
+    RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class);
+    Server server = Mockito.mock(Server.class);
+    Mockito.when(server.getServerName()).thenReturn(serverName);
+    Mockito.when(source.getServer()).thenReturn(server);
+    Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer);
+    ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class);
+    Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
+        .thenReturn(1001L);
+    Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
+        .thenReturn(-1L);
+    conf.setInt("replication.source.maxretriesmultiplier", -1);
+    RecoveredReplicationSourceShipper shipper =
+        new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage);
+    Assert.assertEquals(1001L, shipper.getStartPosition());
+    conf.unset("replication.source.maxretriesmultiplier");
+  }
+}
+