You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2021/02/19 18:16:09 UTC

[hbase] branch branch-1 updated: HBASE-25583: Handle the running replication source gracefully with replication nodes deleted (#2960)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new f9a9148  HBASE-25583: Handle the running replication source gracefully with replication nodes deleted (#2960)
f9a9148 is described below

commit f9a91488b2c39320bed502619bf7adb765c79de6
Author: Sandeep Pal <50...@users.noreply.github.com>
AuthorDate: Fri Feb 19 10:15:45 2021 -0800

    HBASE-25583: Handle the running replication source gracefully with replication nodes deleted (#2960)
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
    Signed-off-by: Andrew Purtell <ap...@apache.org>
---
 .../hbase/replication/ReplicationQueues.java       |   2 +-
 .../hbase/replication/ReplicationQueuesZKImpl.java |  50 ++++--
 .../ReplicationSourceWithoutPeerException.java     |  39 +++++
 .../regionserver/ReplicationSource.java            |  21 ++-
 .../regionserver/ReplicationSourceManager.java     |  38 +++--
 .../hbase/replication/ReplicationSourceDummy.java  |   1 +
 .../ReplicationSourceDummyWithNoTermination.java   |  26 +++
 .../hbase/replication/TestReplicationSource.java   |  80 ++++++++-
 .../replication/TestReplicationStateBasic.java     |  21 ++-
 .../replication/TestReplicationStateZKImpl.java    |   2 +-
 .../regionserver/TestReplicationSourceBase.java    | 150 +++++++++++++++++
 .../regionserver/TestReplicationSourceManager.java | 180 ++++-----------------
 ...tReplicationSourceWithoutReplicationZnodes.java |  85 ++++++++++
 .../regionserver/helper/DummyServer.java           |  95 +++++++++++
 14 files changed, 594 insertions(+), 196 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
index ccc7172..05ae794 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
@@ -59,7 +59,7 @@ public interface ReplicationQueues {
    * @param queueId a String that identifies the queue.
    * @param filename name of the WAL
    */
-  void removeLog(String queueId, String filename);
+  void removeLog(String queueId, String filename) throws ReplicationSourceWithoutPeerException;
 
   /**
    * Set the current position for a specific WAL in a given queue.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 4121fa3..83e3680 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -25,19 +25,19 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 
 /**
@@ -132,17 +132,44 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
   }
 
   @Override
-  public void removeLog(String queueId, String filename) {
+  public void removeLog(String queueId, String filename)
+    throws ReplicationSourceWithoutPeerException {
     try {
-      String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
-      znode = ZKUtil.joinZNode(znode, filename);
-      ZKUtil.deleteNode(this.zookeeper, znode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename="
-          + filename + ")", e);
+      try {
+        String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
+        znode = ZKUtil.joinZNode(znode, filename);
+        ZKUtil.deleteNode(this.zookeeper, znode);
+      } catch (KeeperException.NoNodeException e) {
+        // in case of no node exception we should not crash the region server
+        // but instead check if the replication peer has been removed.
+        // If so, we can throw here so that the source can terminate itself.
+        // This situation can occur when the replication peer znodes has been
+        // removed but the sources not terminated due to any miss from zk node delete watcher.
+        if (!doesPeerExist(queueId)) {
+          LOG.warn("Replication peer " + queueId + " has been removed", e);
+          throw new ReplicationSourceWithoutPeerException(
+            "Znodes for peer has been delete while a source is still active", e);
+        } else {
+          throw e;
+        }
+      }
+    } catch (KeeperException ke) {
+      this.abortable.abort(
+        "Failed to remove wal from queue (queueId=" + queueId + ", filename=" + filename + ")", ke);
     }
   }
 
+  private boolean doesPeerExist(String queueId) throws KeeperException {
+    String peerId = queueId;
+    if (peerId.contains("-")) {
+      // queueId will be in the form peerId + "-" + rsZNode.
+      // A peerId will not have "-" in its name, see HBASE-11394
+      peerId = queueId.split("-")[0];
+    }
+
+    return peerExists(peerId);
+  }
+
   @Override
   public void setLogPosition(String queueId, String filename, long position) {
     try {
@@ -426,8 +453,9 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
         // add delete op for peer
         listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
 
-        if (LOG.isTraceEnabled())
+        if (LOG.isTraceEnabled()) {
           LOG.trace(" The multi list size is: " + listOfOps.size());
+        }
       }
       ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
 
@@ -506,7 +534,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
   }
 
   /**
-   * @param lockOwner
+   * @param lockOwner lock owner
    * @return Serialized protobuf of <code>lockOwner</code> with pb magic prefix prepended suitable
    *         for use as content of an replication lock during region server fail over.
    */
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceWithoutPeerException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceWithoutPeerException.java
new file mode 100644
index 0000000..a07d0c0
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceWithoutPeerException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This exception is thrown when the replication source is running with no
+ * corresponding peer. This can due to race condition between PeersWatcher
+ * zk listerner and source trying to remove the queues, or if zk listener for
+ * delete node was never invoked for any reason. See HBASE-25583
+ */
+@InterfaceAudience.Private
+public class ReplicationSourceWithoutPeerException extends ReplicationException {
+  private static final long serialVersionUID = 1L;
+
+  public ReplicationSourceWithoutPeerException(String m, Throwable t) {
+    super(m, t);
+  }
+
+  public ReplicationSourceWithoutPeerException(String m) {
+    super(m);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index f27c53d..7be880d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
@@ -158,7 +159,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
    * @param clusterId unique UUID for the cluster
    * @param replicationEndpoint the replication endpoint implementation
    * @param metrics metrics for replication source
-   * @throws IOException
+   * @throws IOException IO Exception
    */
   @Override
   public void init(final Configuration conf, final FileSystem fs,
@@ -441,7 +442,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
   @Override
   public Path getCurrentPath() {
     for (ReplicationSourceShipperThread worker : workerThreads.values()) {
-      if (worker.getCurrentPath() != null) return worker.getCurrentPath();
+      if (worker.getCurrentPath() != null) {
+        return worker.getCurrentPath();
+      }
     }
     return null;
   }
@@ -460,7 +463,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     return 0;
   }
 
-  private boolean isSourceActive() {
+  public boolean isSourceActive() {
     return !this.stopper.isStopped() && this.sourceRunning;
   }
 
@@ -792,9 +795,13 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     }
 
     private void updateLogPosition(long lastReadPosition) {
-      manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition,
-        this.replicationQueueInfo.isQueueRecovered(), false);
-      lastLoggedPosition = lastReadPosition;
+      try {
+        manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition,
+          this.replicationQueueInfo.isQueueRecovered(), false);
+        lastLoggedPosition = lastReadPosition;
+      } catch (ReplicationSourceWithoutPeerException re) {
+        source.terminate("Replication peer is removed and source should terminate", re);
+      }
     }
 
     public void startup() {
@@ -976,7 +983,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
 
     /**
      * Set the worker state
-     * @param state
+     * @param state the state of the wal reader
      */
     public void setWorkerState(WorkerState state) {
       this.state = state;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 859a2e4..d9435a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
@@ -125,14 +126,14 @@ public class ReplicationSourceManager implements ReplicationListener {
   /**
    * Creates a replication manager and sets the watch on all the other registered region servers
    * @param replicationQueues the interface for manipulating replication queues
-   * @param replicationPeers
-   * @param replicationTracker
+   * @param replicationPeers the replication peers maintenance class
+   * @param replicationTracker the replication tracker to track the states
    * @param conf the configuration to use
    * @param server the server for this region server
    * @param fs the file system to use
    * @param logDir the directory that contains all wal directories of live RSs
    * @param oldLogDir the directory where old logs are archived
-   * @param clusterId
+   * @param clusterId the cluster id of the source cluster
    */
   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
@@ -181,14 +182,14 @@ public class ReplicationSourceManager implements ReplicationListener {
    * wal it belongs to and will log, for this region server, the current
    * position. It will also clean old logs from the queue.
    * @param log Path to the log currently being replicated from
-   * replication status in zookeeper. It will also delete older entries.
+   *            replication status in zookeeper. It will also delete older entries.
    * @param id id of the peer cluster
    * @param position current location in the log
    * @param queueRecovered indicates if this queue comes from another region server
    * @param holdLogInZK if true then the log is retained in ZK
    */
   public synchronized void logPositionAndCleanOldLogs(Path log, String id, long position,
-    boolean queueRecovered, boolean holdLogInZK) {
+      boolean queueRecovered, boolean holdLogInZK) throws ReplicationSourceWithoutPeerException {
     String fileName = log.getName();
     this.replicationQueues.setLogPosition(id, fileName, position);
     if (holdLogInZK) {
@@ -204,7 +205,8 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @param id id of the peer cluster
    * @param queueRecovered Whether this is a recovered queue
    */
-  public void cleanOldLogs(String key, String id, boolean queueRecovered) {
+  public void cleanOldLogs(String key, String id, boolean queueRecovered)
+      throws ReplicationSourceWithoutPeerException {
     String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
     if (queueRecovered) {
       Map<String, SortedSet<String>> walsForPeer = walsByIdRecoveredQueues.get(id);
@@ -222,9 +224,10 @@ public class ReplicationSourceManager implements ReplicationListener {
         }
       }
     }
- }
+  }
 
-  private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
+  private void cleanOldLogs(SortedSet<String> wals, String key, String id)
+      throws ReplicationSourceWithoutPeerException {
     SortedSet<String> walSet = wals.headSet(key);
     LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
     for (String wal : walSet) {
@@ -267,7 +270,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * need to enqueue the latest log of each wal group and do replication
    * @param id the id of the peer cluster
    * @return the source that was created
-   * @throws IOException
+   * @throws IOException IO Exception
    */
   protected ReplicationSourceInterface addSource(String id) throws IOException,
       ReplicationException {
@@ -365,7 +368,7 @@ public class ReplicationSourceManager implements ReplicationListener {
 
   /**
    * Get the normal source for a given peer
-   * @param peerId
+   * @param peerId the replication peer Id
    * @return the normal source for the give peer if it exists, otherwise null.
    */
   public ReplicationSourceInterface getSource(String peerId) {
@@ -402,7 +405,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * Check and enqueue the given log to the correct source. If there's still no source for the
    * group to which the given log belongs, create one
    * @param logPath the log path to check and enqueue
-   * @throws IOException
+   * @throws IOException IO Exception
    */
   private void recordLog(Path logPath) throws IOException {
     String logName = logPath.getName();
@@ -467,7 +470,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @param server the server object for this region server
    * @param peerId the id of the peer cluster
    * @return the created source
-   * @throws IOException
+   * @throws IOException IO Exception
    */
   protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
       final FileSystem fs, final ReplicationSourceManager manager,
@@ -523,7 +526,8 @@ public class ReplicationSourceManager implements ReplicationListener {
       clusterId, replicationEndpoint, metrics);
 
     // init replication endpoint
-    replicationEndpoint.init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(),
+    replicationEndpoint.init(new ReplicationEndpoint.Context(
+      conf, replicationPeer.getConfiguration(),
       fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server));
 
     return src;
@@ -535,7 +539,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * znodes and finally will delete the old znodes.
    *
    * It creates one old source for any type of source of the old rs.
-   * @param rsZnode
+   * @param rsZnode znode for region server from where to transfer the queues
    */
   private void transferQueues(String rsZnode) {
     NodeFailoverWorker transfer =
@@ -664,7 +668,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     private final UUID clusterId;
 
     /**
-     * @param rsZnode
+     * @param rsZnode znode for dead region server
      */
     public NodeFailoverWorker(String rsZnode) {
       this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId);
@@ -820,7 +824,9 @@ public class ReplicationSourceManager implements ReplicationListener {
    * Get the ReplicationPeers used by this ReplicationSourceManager
    * @return the ReplicationPeers used by this ReplicationSourceManager
    */
-  public ReplicationPeers getReplicationPeers() {return this.replicationPeers;}
+  public ReplicationPeers getReplicationPeers() {
+    return this.replicationPeers;
+  }
 
   /**
    * Get a string representation of all the sources' metrics
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index a5c61f2..45704ae 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -44,6 +44,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   String peerClusterId;
   Path currentPath;
   MetricsSource metrics;
+  public static final String fakeExceptionMessage = "Fake Exception";
 
   @Override
   public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummyWithNoTermination.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummyWithNoTermination.java
new file mode 100644
index 0000000..4a89917
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummyWithNoTermination.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+public class ReplicationSourceDummyWithNoTermination extends ReplicationSourceDummy {
+
+  @Override
+  public void terminate(String reason) {
+    // This is to block the zk listener to close the queues
+    // to simulate the znodes getting deleted without zk listener getting invoked
+    throw new RuntimeException(fakeExceptionMessage);
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index 84d2d8b..f85a52b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -104,7 +105,7 @@ public class TestReplicationSource {
   private static Configuration conf = TEST_UTIL.getConfiguration();
 
   /**
-   * @throws java.lang.Exception
+   * @throws java.lang.Exception exception
    */
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -112,9 +113,13 @@ public class TestReplicationSource {
     FS = TEST_UTIL.getDFSCluster().getFileSystem();
     Path rootDir = TEST_UTIL.createRootDir();
     oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
-    if (FS.exists(oldLogDir)) FS.delete(oldLogDir, true);
+    if (FS.exists(oldLogDir)) {
+      FS.delete(oldLogDir, true);
+    }
     logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
-    if (FS.exists(logDir)) FS.delete(logDir, true);
+    if (FS.exists(logDir)) {
+      FS.delete(logDir, true);
+    }
   }
 
   @Before
@@ -154,7 +159,7 @@ public class TestReplicationSource {
    * Sanity check that we can move logs around while we are reading
    * from them. Should this test fail, ReplicationSource would have a hard
    * time reading logs that are being archived.
-   * @throws Exception
+   * @throws Exception exception
    */
   @Test
   public void testLogMoving() throws Exception{
@@ -277,6 +282,14 @@ public class TestReplicationSource {
       when(manager.getTotalBufferUsed()).thenReturn(totalBufferUsed);
     }
 
+    // source manager throws the exception while cleaning logs
+    private void setReplicationSourceWithoutPeerException()
+      throws ReplicationSourceWithoutPeerException {
+      doThrow(new ReplicationSourceWithoutPeerException("No peer")).when(manager)
+        .logPositionAndCleanOldLogs(Mockito.<Path>anyObject(), Mockito.anyString(),
+          Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyBoolean());
+    }
+
     ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint endpoint)
             throws IOException {
       final ReplicationSource source = new ReplicationSource();
@@ -471,6 +484,65 @@ public class TestReplicationSource {
   }
 
   /**
+   * There can be a scenario of replication peer removed but the replication source
+   * still running since termination of source depends upon zk listener and there
+   * can a rare scenario where zk listener might not get invoked or get delayed.
+   * In that case, replication source manager will throw since it won't be able
+   * to remove the znode while removing the log. We should terminate the source
+   * in that case. See HBASE-25583
+   * @throws Exception any exception
+   */
+  @Test
+  public void testReplicationSourceTerminationWhenNoZnodeForPeerAndQueues() throws Exception {
+    Mocks mocks = new Mocks();
+    mocks.setReplicationSourceWithoutPeerException();
+    // set table cfs to filter all cells out
+    final TableName replicatedTable = TableName.valueOf("replicated_table");
+    final Map<TableName, List<String>> cfs =
+      Collections.singletonMap(replicatedTable, Collections.<String>emptyList());
+    when(mocks.peer.getTableCFs()).thenReturn(cfs);
+
+    // Append 3 entries in a log
+    final Path log1 = new Path(logDir, "log.1");
+    WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration());
+    appendEntries(writer1, 3);
+
+    // Replication end point with no filter
+    final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
+      @Override
+      public WALEntryFilter getWALEntryfilter() {
+        return null;
+      }
+    };
+
+    final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
+    source.run();
+    source.enqueueLog(log1);
+
+    // Wait for source to replicate
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        return endpoint.replicateCount.get() == 1;
+      }
+    });
+
+    // Wait for all the entries to get replicated
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        return endpoint.lastEntries.size() == 3;
+      }
+    });
+
+    // After that the source should be terminated
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        // wait until reader read all cells
+        return !source.isSourceActive();
+      }
+    });
+  }
+
+  /**
    * Tests that recovered queues are preserved on a regionserver shutdown.
    * See HBASE-18192
    */
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index c9cf6ab..f91c3a1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -18,7 +18,11 @@
 
 package org.apache.hadoop.hbase.replication;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -53,7 +57,6 @@ public abstract class TestReplicationStateBasic {
   protected static String KEY_TWO;
 
   // For testing when we try to replicate to ourself
-  protected String OUR_ID = "3";
   protected String OUR_KEY;
 
   protected static int zkTimeoutCount;
@@ -119,7 +122,6 @@ public abstract class TestReplicationStateBasic {
     // 3 replicators should exist
     assertEquals(3, rq1.getListOfReplicators().size());
     rq1.removeQueue("bogus");
-    rq1.removeLog("bogus", "bogus");
     rq1.removeAllQueues();
     assertNull(rq1.getAllQueues());
     assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
@@ -167,6 +169,19 @@ public abstract class TestReplicationStateBasic {
   }
 
   @Test
+  public void testLogRemovalWithNoZnode() throws ReplicationException {
+    rq1.init(server1);
+    Exception expectedException = null;
+    try {
+      rq1.removeLog("bogus", "bogus");
+    } catch (ReplicationException e) {
+      expectedException = e;
+    }
+
+    assertTrue(expectedException instanceof ReplicationSourceWithoutPeerException);
+  }
+
+  @Test
   public void testInvalidClusterKeys() throws ReplicationException, KeeperException {
     rp.init();
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index d6bf4ea..f8877dc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -31,10 +31,10 @@ import org.apache.hadoop.hbase.ClusterId;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java
new file mode 100644
index 0000000..e94985e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.replication.ReplicationSourceDummy.fakeExceptionMessage;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+public abstract class TestReplicationSourceBase {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestReplicationSourceBase.class);
+
+  protected static Configuration conf;
+  protected static HBaseTestingUtility utility;
+  protected static Replication replication;
+  protected static ReplicationSourceManager manager;
+  protected static ZooKeeperWatcher zkw;
+  protected static HTableDescriptor htd;
+  protected static HRegionInfo hri;
+
+  protected static final byte[] r1 = Bytes.toBytes("r1");
+  protected static final byte[] r2 = Bytes.toBytes("r2");
+  protected static final byte[] f1 = Bytes.toBytes("f1");
+  protected static final byte[] f2 = Bytes.toBytes("f2");
+  protected static final TableName test = TableName.valueOf("test");
+  protected static final String slaveId = "1";
+  protected static FileSystem fs;
+  protected static Path oldLogDir;
+  protected static Path logDir;
+  protected static DummyServer server;
+
+  @BeforeClass public static void setUpBeforeClass() throws Exception {
+
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",
+      ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
+    conf.setLong("replication.sleep.before.failover", 2000);
+    conf.setInt("replication.source.maxretriesmultiplier", 10);
+    utility = new HBaseTestingUtility(conf);
+    utility.startMiniZKCluster();
+
+    zkw = new ZooKeeperWatcher(conf, "test", null);
+    ZKUtil.createWithParents(zkw, "/hbase/replication");
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1", Bytes.toBytes(
+      conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
+        + ":/1"));
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
+      ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
+    ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+
+    ZKClusterId.setClusterId(zkw, new ClusterId());
+    FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
+    fs = FileSystem.get(conf);
+    oldLogDir = new Path(utility.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME);
+    logDir = new Path(utility.getDataTestDir(), HConstants.HREGION_LOGDIR_NAME);
+    server = new DummyServer(conf, "example.hostname.com", zkw);
+    replication = new Replication(server, fs, logDir, oldLogDir);
+    manager = replication.getReplicationManager();
+
+    manager.addSource(slaveId);
+
+    htd = new HTableDescriptor(test);
+    HColumnDescriptor col = new HColumnDescriptor(f1);
+    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    htd.addFamily(col);
+    col = new HColumnDescriptor(f2);
+    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
+    htd.addFamily(col);
+
+    hri = new HRegionInfo(htd.getTableName(), r1, r2);
+  }
+
+  @AfterClass public static void tearDownAfterClass() throws Exception {
+    try {
+      manager.join();
+    } catch (RuntimeException re) {
+      if (re.getMessage().equals(fakeExceptionMessage)) {
+        LOG.info("It is fine");
+      }
+    }
+    utility.shutdownMiniCluster();
+  }
+
+  @Rule public TestName testName = new TestName();
+
+  private void cleanLogDir() throws IOException {
+    fs.delete(logDir, true);
+    fs.delete(oldLogDir, true);
+  }
+
+  @Before public void setUp() throws Exception {
+    LOG.info("Start " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  @After public void tearDown() throws Exception {
+    LOG.info("End " + testName.getMethodName());
+    cleanLogDir();
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 297bc09..f0c18d3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -47,9 +47,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.ClusterId;
-import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -58,11 +56,9 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
@@ -78,6 +74,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
+import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -86,61 +83,25 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.junit.After;
+
 import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
 
 @Category(MediumTests.class)
-public class TestReplicationSourceManager {
+public class TestReplicationSourceManager extends TestReplicationSourceBase {
 
   private static final Log LOG =
-      LogFactory.getLog(TestReplicationSourceManager.class);
-
-  private static Configuration conf;
-
-  private static HBaseTestingUtility utility;
-
-  private static Replication replication;
-
-  private static ReplicationSourceManager manager;
-
-  private static ZooKeeperWatcher zkw;
-
-  private static HTableDescriptor htd;
-
-  private static HRegionInfo hri;
-
-  private static final byte[] r1 = Bytes.toBytes("r1");
-
-  private static final byte[] r2 = Bytes.toBytes("r2");
-
-  private static final byte[] f1 = Bytes.toBytes("f1");
-
-  private static final byte[] f2 = Bytes.toBytes("f2");
-
+    LogFactory.getLog(TestReplicationSourceManager.class);
   private static final TableName test =
       TableName.valueOf("test");
-
   private static final String slaveId = "1";
-
-  private static FileSystem fs;
-
-  private static Path oldLogDir;
-
-  private static Path logDir;
-
   private static CountDownLatch latch;
-
-  private static List<String> files = new ArrayList<String>();
+  private static List<String> files = new ArrayList<>();
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -174,7 +135,8 @@ public class TestReplicationSourceManager {
         HConstants.HREGION_OLDLOGDIR_NAME);
     logDir = new Path(utility.getDataTestDir(),
         HConstants.HREGION_LOGDIR_NAME);
-    replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
+    server = new DummyServer(conf, "example.hostname.com", zkw);
+    replication = new Replication(server, fs, logDir, oldLogDir);
     manager = replication.getReplicationManager();
 
     manager.addSource(slaveId);
@@ -196,26 +158,6 @@ public class TestReplicationSourceManager {
     utility.shutdownMiniCluster();
   }
 
-  @Rule
-  public TestName testName = new TestName();
-
-  private void cleanLogDir() throws IOException {
-    fs.delete(logDir, true);
-    fs.delete(oldLogDir, true);
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    LOG.info("Start " + testName.getMethodName());
-    cleanLogDir();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    LOG.info("End " + testName.getMethodName());
-    cleanLogDir();
-  }
-
   @Test
   public void testLogRoll() throws Exception {
     long baseline = 1000;
@@ -288,7 +230,7 @@ public class TestReplicationSourceManager {
   @Test
   public void testClaimQueues() throws Exception {
     conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
-    final Server server = new DummyServer("hostname0.example.org");
+    final Server server = new DummyServer(conf, "hostname0.example.org", zkw);
     ReplicationQueues rq =
         ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
           server);
@@ -300,9 +242,9 @@ public class TestReplicationSourceManager {
       rq.addLog("1", file);
     }
     // create 3 DummyServers
-    Server s1 = new DummyServer("dummyserver1.example.org");
-    Server s2 = new DummyServer("dummyserver2.example.org");
-    Server s3 = new DummyServer("dummyserver3.example.org");
+    Server s1 = new DummyServer(conf, "dummyserver1.example.org", zkw);
+    Server s2 = new DummyServer(conf, "dummyserver2.example.org", zkw);
+    Server s3 = new DummyServer(conf, "dummyserver3.example.org", zkw);
 
     // create 3 DummyNodeFailoverWorkers
     DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(
@@ -329,7 +271,7 @@ public class TestReplicationSourceManager {
 
   @Test
   public void testCleanupFailoverQueues() throws Exception {
-    final Server server = new DummyServer("hostname1.example.org");
+    final Server server = new DummyServer(conf, "hostname1.example.org", zkw);
     ReplicationQueues rq =
         ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
           server);
@@ -344,7 +286,7 @@ public class TestReplicationSourceManager {
     for (String file : files) {
       rq.addLog("1", file);
     }
-    Server s1 = new DummyServer("dummyserver1.example.org");
+    Server s1 = new DummyServer(conf, "dummyserver1.example.org", zkw);
     ReplicationQueues rq1 =
         ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
     rq1.init(s1.getServerName().toString());
@@ -368,7 +310,7 @@ public class TestReplicationSourceManager {
   public void testNodeFailoverDeadServerParsing() throws Exception {
     LOG.debug("testNodeFailoverDeadServerParsing");
     conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
-    final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
+    final Server server = new DummyServer(conf, "ec2-54-234-230-108.compute-1.amazonaws.com", zkw);
     ReplicationQueues repQueues =
         ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
     repQueues.init(server.getServerName().toString());
@@ -380,9 +322,9 @@ public class TestReplicationSourceManager {
     }
 
     // create 3 DummyServers
-    Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal");
-    Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com");
-    Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
+    Server s1 = new DummyServer(conf, "ip-10-8-101-114.ec2.internal", zkw);
+    Server s2 = new DummyServer(conf, "ec2-107-20-52-47.compute-1.amazonaws.com", zkw);
+    Server s3 = new DummyServer(conf, "ec2-23-20-187-167.compute-1.amazonaws.com", zkw);
 
     // simulate three servers fail sequentially
     ReplicationQueues rq1 =
@@ -423,7 +365,7 @@ public class TestReplicationSourceManager {
     LOG.debug("testFailoverDeadServerCversionChange");
 
     conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
-    final Server s0 = new DummyServer("cversion-change0.example.org");
+    final Server s0 = new DummyServer(conf, "cversion-change0.example.org", zkw);
     ReplicationQueues repQueues =
         ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0);
     repQueues.init(s0.getServerName().toString());
@@ -434,7 +376,7 @@ public class TestReplicationSourceManager {
       repQueues.addLog("1", file);
     }
     // simulate queue transfer
-    Server s1 = new DummyServer("cversion-change1.example.org");
+    Server s1 = new DummyServer(conf, "cversion-change1.example.org", zkw);
     ReplicationQueues rq1 =
         ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
     rq1.init(s1.getServerName().toString());
@@ -459,7 +401,7 @@ public class TestReplicationSourceManager {
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RU_INVOKE_RUN",
     justification="Intended")
   public void testCleanupUnknownPeerZNode() throws Exception {
-    final Server server = new DummyServer("hostname2.example.org");
+    final Server server = new DummyServer(conf, "hostname2.example.org", zkw);
     ReplicationQueues rq =
         ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
           server);
@@ -521,16 +463,15 @@ public class TestReplicationSourceManager {
    * corresponding ReplicationSourceInterface correctly cleans up the corresponding
    * replication queue and ReplicationPeer.
    * See HBASE-16096.
-   * @throws Exception
+   * @throws Exception exception
    */
   @Test
-  public void testPeerRemovalCleanup() throws Exception{
+  public void testPeerRemovalCleanup() throws Exception {
     String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
     final String peerId = "FakePeer";
     final ReplicationPeerConfig peerConfig =
         new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase");
     try {
-      DummyServer server = new DummyServer();
       final ReplicationQueues rq =
           ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
               server);
@@ -600,10 +541,10 @@ public class TestReplicationSourceManager {
 
   /**
    * Add a peer and wait for it to initialize
-   * @param peerId
-   * @param peerConfig
+   * @param peerId the replication peer Id
+   * @param peerConfig the replication peer config
    * @param waitForSource Whether to wait for replication source to initialize
-   * @throws Exception
+   * @throws Exception exception
    */
   private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
       final boolean waitForSource) throws Exception {
@@ -622,8 +563,8 @@ public class TestReplicationSourceManager {
 
   /**
    * Remove a peer and wait for it to get cleaned up
-   * @param peerId
-   * @throws Exception
+   * @param peerId the replication peer Id
+   * @throws Exception exception
    */
   private void removePeerAndWait(final String peerId) throws Exception {
     final ReplicationPeers rp = manager.getReplicationPeers();
@@ -639,7 +580,6 @@ public class TestReplicationSourceManager {
     });
   }
 
-
   private WALEdit getBulkLoadWALEdit() {
     // 1. Create store files for the families
     Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
@@ -742,70 +682,4 @@ public class TestReplicationSourceManager {
       throw new IOException("Failing deliberately");
     }
   }
-
-  static class DummyServer implements Server {
-    String hostname;
-
-    DummyServer() {
-      hostname = "hostname.example.org";
-    }
-
-    DummyServer(String hostname) {
-      this.hostname = hostname;
-    }
-
-    @Override
-    public Configuration getConfiguration() {
-      return conf;
-    }
-
-    @Override
-    public ZooKeeperWatcher getZooKeeper() {
-      return zkw;
-    }
-
-    @Override
-    public CoordinatedStateManager getCoordinatedStateManager() {
-      return null;
-    }
-    @Override
-    public ClusterConnection getConnection() {
-      return null;
-    }
-
-    @Override
-    public MetaTableLocator getMetaTableLocator() {
-      return null;
-    }
-
-    @Override
-    public ServerName getServerName() {
-      return ServerName.valueOf(hostname, 1234, 1L);
-    }
-
-    @Override
-    public void abort(String why, Throwable e) {
-      // To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    @Override
-    public boolean isAborted() {
-      return false;
-    }
-
-    @Override
-    public void stop(String why) {
-      // To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    @Override
-    public boolean isStopped() {
-      return false; // To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    @Override
-    public ChoreService getChoreService() {
-      return null;
-    }
-  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
new file mode 100644
index 0000000..095710d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
@@ -0,0 +1,85 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestReplicationSourceWithoutReplicationZnodes extends TestReplicationSourceBase {
+
+  /**
+   * When the peer is removed, hbase remove the peer znodes and there is zk watcher
+   * which terminates the replication sources. In case of zk watcher not getting invoked
+   * or a race condition between source deleting the log znode and zk watcher
+   * terminating the source, we might get the NoNode exception. In that case, the right
+   * thing is to terminate the replication source.
+   * @throws Exception throws exception
+   */
+  @Test
+  public void testReplicationSourceRunningWithoutPeerZnodes() throws Exception {
+    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+    KeyValue kv = new KeyValue(r1, f1, r1);
+    WALEdit edit = new WALEdit();
+    edit.add(kv);
+
+    List<WALActionsListener> listeners = new ArrayList<>();
+    listeners.add(replication);
+    final WALFactory wals = new WALFactory(utility.getConfiguration(), listeners,
+      URLEncoder.encode("regionserver:60020", "UTF8"));
+    final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
+    manager.init();
+
+    final long txid = wal.append(htd, hri,
+      new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
+      edit, true);
+    wal.sync(txid);
+
+    wal.rollWriter();
+    ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/peers/1");
+    ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/rs/"+ server.getServerName() + "/1");
+
+    ReplicationException exceptionThrown = null;
+    try {
+      manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
+        "1", 0, false, false);
+    } catch (ReplicationException e) {
+      exceptionThrown = e;
+    }
+
+    Assert.assertTrue(exceptionThrown instanceof ReplicationSourceWithoutPeerException);
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/helper/DummyServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/helper/DummyServer.java
new file mode 100644
index 0000000..59cf162
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/helper/DummyServer.java
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver.helper;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
+public class DummyServer implements Server {
+  Configuration conf;
+  String hostname;
+  ZooKeeperWatcher zkw;
+
+  public DummyServer(Configuration conf, String hostname, ZooKeeperWatcher zkw) {
+    this.conf = conf;
+    this.hostname = hostname;
+    this.zkw = zkw;
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public ZooKeeperWatcher getZooKeeper() {
+    return zkw;
+  }
+
+  @Override
+  public CoordinatedStateManager getCoordinatedStateManager() {
+    return null;
+  }
+  @Override
+  public ClusterConnection getConnection() {
+    return null;
+  }
+
+  @Override
+  public MetaTableLocator getMetaTableLocator() {
+    return null;
+  }
+
+  @Override
+  public ServerName getServerName() {
+    return ServerName.valueOf(hostname, 1234, 1L);
+  }
+
+  @Override
+  public void abort(String why, Throwable e) {
+    // To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public boolean isAborted() {
+    return false;
+  }
+
+  @Override
+  public void stop(String why) {
+    // To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public boolean isStopped() {
+    return false; // To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public ChoreService getChoreService() {
+    return null;
+  }
+}