You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by su...@apache.org on 2021/02/23 01:57:43 UTC

[hbase] 03/10: HBASE-24735: Refactor ReplicationSourceManager: move logPositionAndCleanOldLogs/cleanUpHFileRefs to ReplicationSource inside (#2064)

This is an automated email from the ASF dual-hosted git repository.

sunxin pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit d4bcf8d9357e3b4e9984255742505649d892681b
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Tue Aug 11 20:07:09 2020 +0800

    HBASE-24735: Refactor ReplicationSourceManager: move logPositionAndCleanOldLogs/cleanUpHFileRefs to ReplicationSource inside (#2064)
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
 .../regionserver/CatalogReplicationSource.java     |  13 +-
 .../regionserver/RecoveredReplicationSource.java   |  18 ++-
 .../regionserver/ReplicationSource.java            | 166 ++++++++++++++++++---
 .../regionserver/ReplicationSourceInterface.java   |  39 +++--
 .../regionserver/ReplicationSourceManager.java     | 147 ++----------------
 .../regionserver/ReplicationSourceShipper.java     |  21 +--
 .../regionserver/ReplicationSourceWALReader.java   |  16 +-
 .../replication/regionserver/WALEntryBatch.java    |   2 +-
 .../hbase/replication/ReplicationSourceDummy.java  |  24 +--
 .../regionserver/TestReplicationSource.java        |  16 +-
 .../regionserver/TestReplicationSourceManager.java |  50 ++++---
 .../regionserver/TestWALEntryStream.java           |  20 ++-
 12 files changed, 276 insertions(+), 256 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
index 8cb7860..15370e0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
@@ -35,7 +35,18 @@ class CatalogReplicationSource extends ReplicationSource {
   }
 
   @Override
-  public void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
+  public void setWALPosition(WALEntryBatch entryBatch) {
+    // Noop. This CatalogReplicationSource implementation does not persist state to backing storage
+    // nor does it keep its WALs in a general map up in ReplicationSourceManager --
+    // CatalogReplicationSource is used by the Catalog Read Replica feature which resets everytime
+    // the WAL source process crashes. Skip calling through to the default implementation.
+    // See "4.1 Skip maintaining zookeeper replication queue (offsets/WALs)" in the
+    // design doc attached to HBASE-18070 'Enable memstore replication for meta replica for detail'
+    // for background on why no need to keep WAL state.
+  }
+
+  @Override
+  public void cleanOldWALs(String log, boolean inclusive) {
     // Noop. This CatalogReplicationSource implementation does not persist state to backing storage
     // nor does it keep its WALs in a general map up in ReplicationSourceManager --
     // CatalogReplicationSource is used by the Catalog Read Replica feature which resets everytime
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 526c3e3..abbc046 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.PriorityBlockingQueue;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -44,15 +45,18 @@ public class RecoveredReplicationSource extends ReplicationSource {
 
   private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class);
 
+  private Path walDir;
+
   private String actualPeerId;
 
   @Override
-  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
-      ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
-      String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
-      MetricsSource metrics) throws IOException {
-    super.init(conf, fs, manager, queueStorage, replicationPeer, server, peerClusterZnode,
+  public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
+    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
+    String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
+    MetricsSource metrics) throws IOException {
+    super.init(conf, fs, walDir, manager, queueStorage, replicationPeer, server, peerClusterZnode,
       clusterId, walFileLengthProvider, metrics);
+    this.walDir = walDir;
     this.actualPeerId = this.replicationQueueInfo.getPeerId();
   }
 
@@ -93,7 +97,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
               deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) };
           for (Path possibleLogLocation : locs) {
             LOG.info("Possible location " + possibleLogLocation.toUri().toString());
-            if (manager.getFs().exists(possibleLogLocation)) {
+            if (this.fs.exists(possibleLogLocation)) {
               // We found the right new location
               LOG.info("Log " + path + " still exists at " + possibleLogLocation);
               newPaths.add(possibleLogLocation);
@@ -126,7 +130,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
   // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
   // area rather than to the wal area for a particular region server.
   private Path getReplSyncUpPath(Path path) throws IOException {
-    FileStatus[] rss = fs.listStatus(manager.getLogDir());
+    FileStatus[] rss = fs.listStatus(walDir);
     for (FileStatus rs : rss) {
       Path p = rs.getPath();
       FileStatus[] logs = fs.listStatus(p);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index fdf7d89..a592a79 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -26,7 +26,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -34,6 +36,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -50,15 +53,19 @@ import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
 import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -89,7 +96,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
   protected ReplicationQueueInfo replicationQueueInfo;
 
   // The manager of all sources to which we ping back our progress
-  protected ReplicationSourceManager manager;
+  ReplicationSourceManager manager;
   // Should we stop everything?
   protected Server server;
   // How long should we sleep for each retry
@@ -133,8 +140,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
   protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
       new ConcurrentHashMap<>();
 
-  private AtomicLong totalBufferUsed;
-
   public static final String WAIT_ON_ENDPOINT_SECONDS =
     "hbase.replication.wait.on.endpoint.seconds";
   public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
@@ -186,7 +191,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
    * @param metrics metrics for replication source
    */
   @Override
-  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
+  public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
       ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
       String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
       MetricsSource metrics) throws IOException {
@@ -215,7 +220,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
     defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
     currentBandwidth = getCurrentBandwidth();
     this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
-    this.totalBufferUsed = manager.getTotalBufferUsed();
     this.walFileLengthProvider = walFileLengthProvider;
 
     this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort",
@@ -386,11 +390,11 @@ public class ReplicationSource implements ReplicationSourceInterface {
   }
 
   private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) {
-    return replicationPeer.getPeerConfig().isSerial()
-      ? new SerialReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter,
-      this, walGroupId)
-      : new ReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter,
-      this, walGroupId);
+    return replicationPeer.getPeerConfig().isSerial() ?
+      new SerialReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter,
+        this, walGroupId) :
+      new ReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter,
+        this, walGroupId);
   }
 
   /**
@@ -416,7 +420,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
             t.getName());
           manager.refreshSources(peerId);
           break;
-        } catch (IOException e1) {
+        } catch (ReplicationException | IOException e1) {
           LOG.error("Replication sources refresh failed.", e1);
           sleepForRetries("Sleeping before try refreshing sources again",
             maxRetriesMultiplier);
@@ -431,11 +435,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
   }
 
   @Override
-  public ReplicationSourceManager getSourceManager() {
-    return this.manager;
-  }
-
-  @Override
   public void tryThrottle(int batchSize) throws InterruptedException {
     checkBandwidthChangeAndResetThrottler();
     if (throttler.isEnabled()) {
@@ -752,7 +751,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
       throttler.addPushSize(batchSize);
     }
     totalReplicatedEdits.addAndGet(entries.size());
-    long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
+    long newBufferUsed = manager.getTotalBufferUsed().addAndGet(-batchSize);
     // Record the new buffer usage
     this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
   }
@@ -788,4 +787,137 @@ public class ReplicationSource implements ReplicationSourceInterface {
   public String logPeerId(){
     return "peerId=" + this.getPeerId() + ",";
   }
+
+  @Override
+  public void setWALPosition(WALEntryBatch entryBatch) {
+    String fileName = entryBatch.getLastWalPath().getName();
+    interruptOrAbortWhenFail(() -> this.queueStorage
+      .setWALPosition(server.getServerName(), getQueueId(), fileName,
+        entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds()));
+  }
+
+  @Override
+  public void cleanOldWALs(String log, boolean inclusive) {
+    NavigableSet<String> walsToRemove = getWalsToRemove(log, inclusive);
+    if (walsToRemove.isEmpty()) {
+      return;
+    }
+    // cleanOldWALs may spend some time, especially for sync replication where we may want to
+    // remove remote wals as the remote cluster may have already been down, so we do it outside
+    // the lock to avoid block preLogRoll
+    cleanOldWALs(walsToRemove);
+  }
+
+  private NavigableSet<String> getWalsToRemove(String log, boolean inclusive) {
+    NavigableSet<String> walsToRemove = new TreeSet<>();
+    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
+    try {
+      this.queueStorage.getWALsInQueue(this.server.getServerName(), getQueueId()).forEach(wal -> {
+        LOG.debug("getWalsToRemove wal {}", wal);
+        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+        if (walPrefix.equals(logPrefix)) {
+          walsToRemove.add(wal);
+        }
+      });
+    } catch (ReplicationException e) {
+      // Just log the exception here, as the recovered replication source will try to cleanup again.
+      LOG.warn("Failed to read wals in queue {}", getQueueId(), e);
+    }
+    return walsToRemove.headSet(log, inclusive);
+  }
+
+  private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
+    throws IOException {
+    Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
+    FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
+    for (String wal : wals) {
+      Path walFile = new Path(remoteWALDirForPeer, wal);
+      try {
+        if (!fs.delete(walFile, false) && fs.exists(walFile)) {
+          throw new IOException("Can not delete " + walFile);
+        }
+      } catch (FileNotFoundException e) {
+        // Just ignore since this means the file has already been deleted.
+        // The javadoc of the FileSystem.delete methods does not specify the behavior of deleting an
+        // inexistent file, so here we deal with both, i.e, check the return value of the
+        // FileSystem.delete, and also catch FNFE.
+        LOG.debug("The remote wal {} has already been deleted?", walFile, e);
+      }
+    }
+  }
+
+  private void cleanOldWALs(NavigableSet<String> wals) {
+    LOG.debug("Removing {} logs in the list: {}", wals.size(), wals);
+    // The intention here is that, we want to delete the remote wal files ASAP as it may effect the
+    // failover time if you want to transit the remote cluster from S to A. And the infinite retry
+    // is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can
+    // not contact with the HBase cluster either, so the replication will be blocked either.
+    if (isSyncReplication()) {
+      String peerId = getPeerId();
+      String remoteWALDir = replicationPeer.getPeerConfig().getRemoteWALDir();
+      // Filter out the wals need to be removed from the remote directory. Its name should be the
+      // special format, and also, the peer id in its name should match the peer id for the
+      // replication source.
+      List<String> remoteWals = wals.stream().filter(w -> SyncReplicationWALProvider
+        .getSyncReplicationPeerIdFromWALName(w).map(peerId::equals).orElse(false))
+        .collect(Collectors.toList());
+      LOG.debug("Removing {} logs from remote dir {} in the list: {}", remoteWals.size(),
+        remoteWALDir, remoteWals);
+      if (!remoteWals.isEmpty()) {
+        for (int sleepMultiplier = 0;;) {
+          try {
+            removeRemoteWALs(peerId, remoteWALDir, remoteWals);
+            break;
+          } catch (IOException e) {
+            LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir,
+              peerId);
+          }
+          if (!isSourceActive()) {
+            // skip the following operations
+            return;
+          }
+          if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries,
+            sleepMultiplier, maxRetriesMultiplier)) {
+            sleepMultiplier++;
+          }
+        }
+      }
+    }
+    for (String wal : wals) {
+      interruptOrAbortWhenFail(
+        () -> this.queueStorage.removeWAL(server.getServerName(), getQueueId(), wal));
+    }
+  }
+
+  public void cleanUpHFileRefs(List<String> files) {
+    interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(getPeerId(), files));
+  }
+
+  @FunctionalInterface
+  private interface ReplicationQueueOperation {
+    void exec() throws ReplicationException;
+  }
+
+  /**
+   * Refresh replication source will terminate the old source first, then the source thread will be
+   * interrupted. Need to handle it instead of abort the region server.
+   */
+  private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
+    try {
+      op.exec();
+    } catch (ReplicationException e) {
+      if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException
+        && e.getCause().getCause() != null && e.getCause()
+        .getCause() instanceof InterruptedException) {
+        // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is
+        // that thread is interrupted deep down in the stack, it should pass the following
+        // processing logic and propagate to the most top layer which can handle this exception
+        // properly. In this specific case, the top layer is ReplicationSourceShipper#run().
+        throw new ReplicationRuntimeException(
+          "Thread is interrupted, the replication source may be terminated",
+          e.getCause().getCause());
+      }
+      server.abort("Failed to operate on replication queue", e);
+    }
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 352cdd3..77bba90 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -41,15 +41,15 @@ import org.apache.yetus.audience.InterfaceAudience;
 public interface ReplicationSourceInterface {
   /**
    * Initializer for the source
-   * @param conf the configuration to use
-   * @param fs the file system to use
-   * @param manager the manager to use
+   *
+   * @param conf   the configuration to use
+   * @param fs     the file system to use
    * @param server the server for this region server
    */
-  void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
-      ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
-      String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
-      MetricsSource metrics) throws IOException;
+  void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
+    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
+    String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
+    MetricsSource metrics) throws IOException;
 
   /**
    * Add a log to the list of logs to replicate
@@ -146,11 +146,6 @@ public interface ReplicationSourceInterface {
   ReplicationEndpoint getReplicationEndpoint();
 
   /**
-   * @return the replication source manager
-   */
-  ReplicationSourceManager getSourceManager();
-
-  /**
    * @return the wal file length provider
    */
   WALFileLengthProvider getWALFileLengthProvider();
@@ -196,14 +191,16 @@ public interface ReplicationSourceInterface {
   ReplicationQueueStorage getReplicationQueueStorage();
 
   /**
-   * Log the current position to storage. Also clean old logs from the replication queue.
-   * Use to bypass the default call to
-   * {@link ReplicationSourceManager#logPositionAndCleanOldLogs(ReplicationSourceInterface,
-   * WALEntryBatch)} whem implementation does not need to persist state to backing storage.
-   * @param entryBatch the wal entry batch we just shipped
-   * @return The instance of queueStorage used by this ReplicationSource.
+   * Set the current position of WAL to {@link ReplicationQueueStorage}
+   * @param entryBatch a batch of WAL entries to replicate
    */
-  default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
-    getSourceManager().logPositionAndCleanOldLogs(this, entryBatch);
-  }
+  void setWALPosition(WALEntryBatch entryBatch);
+
+  /**
+   * Cleans a WAL and all older WALs from replication queue. Called when we are sure that a WAL is
+   * closed and has no more entries.
+   * @param walName the name of WAL
+   * @param inclusive whether we should also remove the given WAL
+   */
+  void cleanOldWALs(String walName, boolean inclusive);
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 00ee6a5..717b1ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -17,10 +17,8 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -42,7 +40,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -64,17 +61,14 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
-import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -339,7 +333,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     WALFileLengthProvider walFileLengthProvider =
       this.walFactory.getWALProvider() != null?
         this.walFactory.getWALProvider().getWALFileLengthProvider() : p -> OptionalLong.empty();
-    src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId,
+    src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server, queueId, clusterId,
       walFileLengthProvider, new MetricsSource(queueId));
     return src;
   }
@@ -542,29 +536,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     void exec() throws ReplicationException;
   }
 
-  /**
-   * Refresh replication source will terminate the old source first, then the source thread will be
-   * interrupted. Need to handle it instead of abort the region server.
-   */
-  private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
-    try {
-      op.exec();
-    } catch (ReplicationException e) {
-      if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException
-          && e.getCause().getCause() != null && e.getCause()
-          .getCause() instanceof InterruptedException) {
-        // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is
-        // that thread is interrupted deep down in the stack, it should pass the following
-        // processing logic and propagate to the most top layer which can handle this exception
-        // properly. In this specific case, the top layer is ReplicationSourceShipper#run().
-        throw new ReplicationRuntimeException(
-          "Thread is interrupted, the replication source may be terminated",
-          e.getCause().getCause());
-      }
-      server.abort("Failed to operate on replication queue", e);
-    }
-  }
-
   private void abortWhenFail(ReplicationQueueOperation op) {
     try {
       op.exec();
@@ -590,106 +561,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     }
   }
 
-  /**
-   * This method will log the current position to storage. And also clean old logs from the
-   * replication queue.
-   * @param source the replication source
-   * @param entryBatch the wal entry batch we just shipped
-   */
-  public void logPositionAndCleanOldLogs(ReplicationSourceInterface source,
-      WALEntryBatch entryBatch) {
-    String fileName = entryBatch.getLastWalPath().getName();
-    interruptOrAbortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(),
-      source.getQueueId(), fileName, entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds()));
-    cleanOldLogs(fileName, entryBatch.isEndOfFile(), source);
-  }
-
-  /**
-   * Cleans a log file and all older logs from replication queue. Called when we are sure that a log
-   * file is closed and has no more entries.
-   * @param log Path to the log
-   * @param inclusive whether we should also remove the given log file
-   * @param source the replication source
-   */
-  void cleanOldLogs(String log, boolean inclusive,
-    ReplicationSourceInterface source) {
-    NavigableSet<String> walsToRemove;
-    synchronized (this.latestPaths) {
-      walsToRemove = getWalsToRemove(source.getQueueId(), log, inclusive);
-    }
-    if (walsToRemove.isEmpty()) {
-      return;
-    }
-    // cleanOldLogs may spend some time, especially for sync replication where we may want to
-    // remove remote wals as the remote cluster may have already been down, so we do it outside
-    // the lock to avoid block preLogRoll
-    cleanOldLogs(walsToRemove, source);
-  }
-
-  private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
-      throws IOException {
-    Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
-    FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
-    for (String wal : wals) {
-      Path walFile = new Path(remoteWALDirForPeer, wal);
-      try {
-        if (!fs.delete(walFile, false) && fs.exists(walFile)) {
-          throw new IOException("Can not delete " + walFile);
-        }
-      } catch (FileNotFoundException e) {
-        // Just ignore since this means the file has already been deleted.
-        // The javadoc of the FileSystem.delete methods does not specify the behavior of deleting an
-        // inexistent file, so here we deal with both, i.e, check the return value of the
-        // FileSystem.delete, and also catch FNFE.
-        LOG.debug("The remote wal {} has already been deleted?", walFile, e);
-      }
-    }
-  }
-
-  private void cleanOldLogs(NavigableSet<String> wals, ReplicationSourceInterface source) {
-    LOG.debug("Removing {} logs in the list: {}", wals.size(), wals);
-    // The intention here is that, we want to delete the remote wal files ASAP as it may effect the
-    // failover time if you want to transit the remote cluster from S to A. And the infinite retry
-    // is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can
-    // not contact with the HBase cluster either, so the replication will be blocked either.
-    if (source.isSyncReplication()) {
-      String peerId = source.getPeerId();
-      String remoteWALDir = source.getPeer().getPeerConfig().getRemoteWALDir();
-      // Filter out the wals need to be removed from the remote directory. Its name should be the
-      // special format, and also, the peer id in its name should match the peer id for the
-      // replication source.
-      List<String> remoteWals = wals.stream().filter(w -> SyncReplicationWALProvider
-        .getSyncReplicationPeerIdFromWALName(w).map(peerId::equals).orElse(false))
-        .collect(Collectors.toList());
-      LOG.debug("Removing {} logs from remote dir {} in the list: {}", remoteWals.size(),
-        remoteWALDir, remoteWals);
-      if (!remoteWals.isEmpty()) {
-        for (int sleepMultiplier = 0;;) {
-          try {
-            removeRemoteWALs(peerId, remoteWALDir, remoteWals);
-            break;
-          } catch (IOException e) {
-            LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir,
-              peerId);
-          }
-          if (!source.isSourceActive()) {
-            // skip the following operations
-            return;
-          }
-          if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries,
-            sleepMultiplier, maxRetriesMultiplier)) {
-            sleepMultiplier++;
-          }
-        }
-      }
-    }
-    String queueId = source.getQueueId();
-    for (String wal : wals) {
-      interruptOrAbortWhenFail(
-        () -> this.queueStorage.removeWAL(server.getServerName(), queueId, wal));
-    }
-  }
-
   // public because of we call it in TestReplicationEmptyWALRecovery
   public void preLogRoll(Path newLog) throws IOException {
     String logName = newLog.getName();
@@ -1098,10 +969,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     }
   }
 
-  public void cleanUpHFileRefs(String peerId, List<String> files) {
-    interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files));
-  }
-
   int activeFailoverTaskCount() {
     return executor.getActiveCount();
   }
@@ -1110,6 +977,16 @@ public class ReplicationSourceManager implements ReplicationListener {
     return this.globalMetrics;
   }
 
+  @InterfaceAudience.Private
+  Server getServer() {
+    return this.server;
+  }
+
+  @InterfaceAudience.Private
+  ReplicationQueueStorage getQueueStorage() {
+    return this.queueStorage;
+  }
+
   /**
    * Add an hbase:meta Catalog replication source. Called on open of an hbase:meta Region.
    * Create it once only. If exists already, use the existing one.
@@ -1161,7 +1038,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     CatalogReplicationSourcePeer peer = new CatalogReplicationSourcePeer(this.conf,
       this.clusterId.toString());
     final ReplicationSourceInterface crs = new CatalogReplicationSource();
-    crs.init(conf, fs, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(),
+    crs.init(conf, fs, logDir, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(),
       clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId()));
     // Add listener on the provider so we can pick up the WAL to replicate on roll.
     WALActionsListener listener = new WALActionsListener() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 098ba02..16f4994 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -239,12 +239,6 @@ public class ReplicationSourceShipper extends Thread {
   }
 
   private void cleanUpHFileRefs(WALEdit edit) throws IOException {
-    String peerId = source.getPeerId();
-    if (peerId.contains("-")) {
-      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
-      // A peerId will not have "-" in its name, see HBASE-11394
-      peerId = peerId.split("-")[0];
-    }
     List<Cell> cells = edit.getCells();
     int totalCells = cells.size();
     for (int i = 0; i < totalCells; i++) {
@@ -255,7 +249,7 @@ public class ReplicationSourceShipper extends Thread {
         int totalStores = stores.size();
         for (int j = 0; j < totalStores; j++) {
           List<String> storeFileList = stores.get(j).getStoreFileList();
-          source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
+          source.cleanUpHFileRefs(storeFileList);
           source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
         }
       }
@@ -267,10 +261,11 @@ public class ReplicationSourceShipper extends Thread {
     // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file
     // record on zk, so let's call it. The last wal position maybe zero if end of file is true and
     // there is no entry in the batch. It is OK because that the queue storage will ignore the zero
-    // position and the file will be removed soon in cleanOldLogs.
-    if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) ||
-      batch.getLastWalPosition() != currentPosition) {
-      source.logPositionAndCleanOldLogs(batch);
+    // position and the file will be removed soon in cleanOldWALs.
+    if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath)
+      || batch.getLastWalPosition() != currentPosition) {
+      source.setWALPosition(batch);
+      source.cleanOldWALs(batch.getLastWalPath().getName(), batch.isEndOfFile());
       updated = true;
     }
     // if end of file is true, then we can just skip to the next file in queue.
@@ -372,8 +367,8 @@ public class ReplicationSourceShipper extends Thread {
       LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
         totalToDecrement.longValue());
     }
-    long newBufferUsed = source.getSourceManager().getTotalBufferUsed()
+    long newBufferUsed = source.manager.getTotalBufferUsed()
       .addAndGet(-totalToDecrement.longValue());
-    source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+    source.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 301a9e8..fdbc7ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -25,7 +25,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -75,8 +74,6 @@ class ReplicationSourceWALReader extends Thread {
   //Indicates whether this particular worker is running
   private boolean isReaderRunning = true;
 
-  private AtomicLong totalBufferUsed;
-  private long totalBufferQuota;
   private final String walGroupId;
 
   /**
@@ -104,8 +101,6 @@ class ReplicationSourceWALReader extends Thread {
     // memory used will be batchSizeCapacity * (nb.batches + 1)
     // the +1 is for the current thread reading before placing onto the queue
     int batchCount = conf.getInt("replication.source.nb.batches", 1);
-    this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
-    this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
     this.sleepForRetries =
         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
     this.maxRetriesMultiplier =
@@ -283,9 +278,10 @@ class ReplicationSourceWALReader extends Thread {
   //returns false if we've already exceeded the global quota
   private boolean checkQuota() {
     // try not to go over total quota
-    if (totalBufferUsed.get() > totalBufferQuota) {
+    if (source.manager.getTotalBufferUsed().get() > source.manager.getTotalBufferLimit()) {
       LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
-          this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
+        this.source.getPeerId(), source.manager.getTotalBufferUsed().get(),
+        source.manager.getTotalBufferLimit());
       Threads.sleep(sleepForRetries);
       return false;
     }
@@ -414,10 +410,10 @@ class ReplicationSourceWALReader extends Thread {
    * @return true if we should clear buffer and push all
    */
   private boolean acquireBufferQuota(long size) {
-    long newBufferUsed = totalBufferUsed.addAndGet(size);
+    long newBufferUsed = source.manager.getTotalBufferUsed().addAndGet(size);
     // Record the new buffer usage
-    this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
-    return newBufferUsed >= totalBufferQuota;
+    source.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+    return newBufferUsed >= source.manager.getTotalBufferLimit();
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index 4f96c96..591b44d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -31,7 +31,7 @@ import org.apache.yetus.audience.InterfaceAudience;
  * Holds a batch of WAL entries to replicate, along with some statistics
  */
 @InterfaceAudience.Private
-class WALEntryBatch {
+public class WALEntryBatch {
 
   // used by recovered replication queue to indicate that all the entries have been read.
   public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 4f656b1..42445a6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch;
 import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
@@ -38,7 +39,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
  */
 public class ReplicationSourceDummy implements ReplicationSourceInterface {
 
-  private ReplicationSourceManager manager;
   private ReplicationPeer replicationPeer;
   private String peerClusterId;
   private Path currentPath;
@@ -47,11 +47,10 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   private AtomicBoolean startup = new AtomicBoolean(false);
 
   @Override
-  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
-      ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
-      UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
-      throws IOException {
-    this.manager = manager;
+  public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
+    ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
+    UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
+    throws IOException {
     this.peerClusterId = peerClusterId;
     this.metrics = metrics;
     this.walFileLengthProvider = walFileLengthProvider;
@@ -134,11 +133,6 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   }
 
   @Override
-  public ReplicationSourceManager getSourceManager() {
-    return manager;
-  }
-
-  @Override
   public void tryThrottle(int batchSize) throws InterruptedException {
   }
 
@@ -162,6 +156,14 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   }
 
   @Override
+  public void setWALPosition(WALEntryBatch entryBatch) {
+  }
+
+  @Override
+  public void cleanOldWALs(String walName, boolean inclusive) {
+  }
+
+  @Override
   public ReplicationPeer getPeer() {
     return replicationPeer;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 86a71c9..94826ff 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -139,7 +139,7 @@ public class TestReplicationSource {
     String queueId = "qid";
     RegionServerServices rss =
       TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
-    rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
+    rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, null,
       p -> OptionalLong.empty(), new MetricsSource(queueId));
     try {
       rs.startup();
@@ -173,11 +173,11 @@ public class TestReplicationSource {
       thenReturn(DoNothingReplicationEndpoint.class.getName());
     Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
     ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
-    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
+    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
     String queueId = "qid";
     RegionServerServices rss =
       TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
-    rs.init(conf, null, manager, null, mockPeer, rss, queueId,
+    rs.init(conf, null, null, manager, null, mockPeer, rss, queueId,
       uuid, p -> OptionalLong.empty(), new MetricsSource(queueId));
     try {
       rs.startup();
@@ -265,7 +265,7 @@ public class TestReplicationSource {
       testConf.setInt("replication.source.maxretriesmultiplier", 1);
       ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
       Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
-      source.init(testConf, null, manager, null, mockPeer, null, "testPeer",
+      source.init(testConf, null, null, manager, null, mockPeer, null, "testPeer",
         null, p -> OptionalLong.empty(), null);
       ExecutorService executor = Executors.newSingleThreadExecutor();
       Future<?> future = executor.submit(
@@ -289,7 +289,7 @@ public class TestReplicationSource {
     ReplicationPeer mockPeer = mock(ReplicationPeer.class);
     Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
     Configuration testConf = HBaseConfiguration.create();
-    source.init(testConf, null, mockManager, null, mockPeer, null,
+    source.init(testConf, null, null, mockManager, null, mockPeer, null,
       "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
     ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
       conf, null, 0, null, source, null);
@@ -315,7 +315,7 @@ public class TestReplicationSource {
     reader.addEntryToBatch(batch, mockEntry);
     reader.entryBatchQueue.put(batch);
     source.terminate("test");
-    assertEquals(0, source.getSourceManager().getTotalBufferUsed().get());
+    assertEquals(0, source.manager.getTotalBufferUsed().get());
   }
 
   /**
@@ -524,7 +524,7 @@ public class TestReplicationSource {
     String queueId = "qid";
     RegionServerServices rss =
       TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
-    rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
+    rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, null,
       p -> OptionalLong.empty(), new MetricsSource(queueId));
     return rss;
   }
@@ -624,7 +624,7 @@ public class TestReplicationSource {
         TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
 
       ReplicationSource source = new ReplicationSource();
-      source.init(conf, null, manager, null, mockPeer, rss, id, null,
+      source.init(conf, null, null, manager, null, mockPeer, rss, id, null,
         p -> OptionalLong.empty(), metrics);
 
       final Path log1 = new Path(logDir, "log-walgroup-a.8");
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 2e35ef4..597349a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
@@ -334,8 +335,9 @@ public abstract class TestReplicationSourceManager {
     when(source.getQueueId()).thenReturn("1");
     when(source.isRecovered()).thenReturn(false);
     when(source.isSyncReplication()).thenReturn(false);
-    manager.logPositionAndCleanOldLogs(source,
-      new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath()));
+    WALEntryBatch batch = new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath());
+    source.setWALPosition(batch);
+    source.cleanOldWALs(batch.getLastWalPath().getName(), batch.isEndOfFile());
 
     wal.appendData(hri,
       new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
@@ -409,11 +411,10 @@ public abstract class TestReplicationSourceManager {
     assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
     String id = "1-" + server.getServerName().getServerName();
     assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
-    ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
-    when(source.getQueueId()).thenReturn(id);
-    when(source.isRecovered()).thenReturn(true);
-    when(source.isSyncReplication()).thenReturn(false);
-    manager.cleanOldLogs(file2, false, source);
+    ReplicationSourceInterface source = new ReplicationSource();
+    source.init(conf, fs, null, manager, manager.getQueueStorage(), rp1.getPeer("1"),
+      manager.getServer(), id, null, p -> OptionalLong.empty(), null);
+    source.cleanOldWALs(file2, false);
     // log1 should be deleted
     assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
   }
@@ -590,19 +591,15 @@ public abstract class TestReplicationSourceManager {
     }
   }
 
-  private ReplicationSourceInterface mockReplicationSource(String peerId) {
-    ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
-    when(source.getPeerId()).thenReturn(peerId);
-    when(source.getQueueId()).thenReturn(peerId);
-    when(source.isRecovered()).thenReturn(false);
-    when(source.isSyncReplication()).thenReturn(true);
+  private ReplicationPeer mockReplicationPeerForSyncReplication(String peerId) {
     ReplicationPeerConfig config = mock(ReplicationPeerConfig.class);
     when(config.getRemoteWALDir())
       .thenReturn(remoteLogDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
+    when(config.isSyncReplication()).thenReturn(true);
     ReplicationPeer peer = mock(ReplicationPeer.class);
     when(peer.getPeerConfig()).thenReturn(config);
-    when(source.getPeer()).thenReturn(peer);
-    return source;
+    when(peer.getId()).thenReturn(peerId);
+    return peer;
   }
 
   @Test
@@ -631,13 +628,19 @@ public abstract class TestReplicationSourceManager {
       manager.preLogRoll(wal);
       manager.postLogRoll(wal);
 
-      ReplicationSourceInterface source = mockReplicationSource(peerId2);
-      manager.cleanOldLogs(walName, true, source);
+      ReplicationSourceInterface source = new ReplicationSource();
+      source.init(conf, fs, null, manager, manager.getQueueStorage(),
+        mockReplicationPeerForSyncReplication(peerId2), manager.getServer(), peerId2, null,
+        p -> OptionalLong.empty(), null);
+      source.cleanOldWALs(walName, true);
       // still there if peer id does not match
       assertTrue(fs.exists(remoteWAL));
 
-      source = mockReplicationSource(slaveId);
-      manager.cleanOldLogs(walName, true, source);
+      source = new ReplicationSource();
+      source.init(conf, fs, null, manager, manager.getQueueStorage(),
+        mockReplicationPeerForSyncReplication(slaveId), manager.getServer(), slaveId, null,
+        p -> OptionalLong.empty(), null);
+      source.cleanOldWALs(walName, true);
       assertFalse(fs.exists(remoteWAL));
     } finally {
       removePeerAndWait(peerId2);
@@ -814,11 +817,10 @@ public abstract class TestReplicationSourceManager {
 
   static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy {
 
-    @Override
-    public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
-        ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
-        UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
-        throws IOException {
+    @Override public void init(Configuration conf, FileSystem fs, Path walDir,
+      ReplicationSourceManager manager, ReplicationQueueStorage rq, ReplicationPeer rp,
+      Server server, String peerClusterId, UUID clusterId,
+      WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
       throw new IOException("Failing deliberately");
     }
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 9c6fafc..ca78cf0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -383,23 +383,27 @@ public class TestWALEntryStream {
   }
 
   private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
-    ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
-    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
-    when(mockSourceManager.getTotalBufferLimit()).thenReturn(
-        (long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
     Server mockServer = Mockito.mock(Server.class);
     ReplicationSource source = Mockito.mock(ReplicationSource.class);
-    when(source.getSourceManager()).thenReturn(mockSourceManager);
     when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
     when(source.getWALFileLengthProvider()).thenReturn(log);
     when(source.getServer()).thenReturn(mockServer);
     when(source.isRecovered()).thenReturn(recovered);
-    MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(
-        MetricsReplicationGlobalSourceSource.class);
-    when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
+    source.manager = mockReplicationSourceManager();
     return source;
   }
 
+  private ReplicationSourceManager mockReplicationSourceManager() {
+    ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
+    MetricsReplicationGlobalSourceSource globalMetrics =
+      Mockito.mock(MetricsReplicationGlobalSourceSource.class);
+    when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
+    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+    when(mockSourceManager.getTotalBufferLimit())
+      .thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+    return mockSourceManager;
+  }
+
   private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {
     ReplicationSource source = mockReplicationSource(recovered, conf);
     when(source.isPeerEnabled()).thenReturn(true);