You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/09/20 01:03:14 UTC

[hbase] branch HBASE-24666 updated: HBASE-24998 Introduce a ReplicationSourceOverallController interface and decouple ReplicationSourceManager and ReplicationSource (#2364)

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-24666 by this push:
     new 2325696  HBASE-24998 Introduce a ReplicationSourceOverallController interface and decouple ReplicationSourceManager and ReplicationSource (#2364)
2325696 is described below

commit 232569664a5576ebfb920e2143f40b888718b547
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Sun Sep 20 09:02:53 2020 +0800

    HBASE-24998 Introduce a ReplicationSourceOverallController interface and decouple ReplicationSourceManager and ReplicationSource (#2364)
    
    Signed-off-by: meiyi <my...@gmail.com>
---
 .../java/org/apache/hadoop/hbase/HConstants.java   |   2 +
 .../hbase/replication/ReplicationListener.java     |   2 +-
 .../replication/ReplicationSourceController.java   |  31 +++--
 .../regionserver/RecoveredReplicationSource.java   |  18 ++-
 .../regionserver/ReplicationSource.java            |  32 ++---
 .../regionserver/ReplicationSourceInterface.java   |  25 ++--
 .../regionserver/ReplicationSourceManager.java     | 141 +++++++++++----------
 .../regionserver/ReplicationSourceWALReader.java   |  13 +-
 .../hbase/replication/ReplicationSourceDummy.java  |  21 ++-
 .../regionserver/TestReplicationSourceManager.java |  11 +-
 .../regionserver/TestWALEntryStream.java           |  15 ++-
 11 files changed, 168 insertions(+), 143 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index f6f00c5..d3800b6 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -979,6 +979,8 @@ public final class HConstants {
   /*
    * cluster replication constants.
    */
+  public static final String REPLICATION_OFFLOAD_ENABLE_KEY = "hbase.replication.offload.enabled";
+  public static final boolean REPLICATION_OFFLOAD_ENABLE_DEFAULT = false;
   public static final String
       REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service";
   public static final String
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
index f040bf9..6ecbb46 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
@@ -33,5 +33,5 @@ public interface ReplicationListener {
    * A region server has been removed from the local cluster
    * @param regionServer the removed region server
    */
-  public void regionServerRemoved(String regionServer);
+  void regionServerRemoved(String regionServer);
 }
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java
similarity index 50%
copy from hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
copy to hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java
index f040bf9..5bb9dd6 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,20 +17,32 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
- * The replication listener interface can be implemented if a class needs to subscribe to events
- * generated by the ReplicationTracker. These events include things like addition/deletion of peer
- * clusters or failure of a local region server. To receive events, the class also needs to register
- * itself with a Replication Tracker.
+ * Used to control all replication sources inside one RegionServer or ReplicationServer.
+ * Used by {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSource} or
+ * {@link RecoveredReplicationSource}.
  */
 @InterfaceAudience.Private
-public interface ReplicationListener {
+public interface ReplicationSourceController {
+
+  /**
+   * Returns the maximum size in bytes of edits held in memory which are pending replication
+   * across all sources inside this RegionServer or ReplicationServer.
+   */
+  long getTotalBufferLimit();
+
+  AtomicLong getTotalBufferUsed();
+
+  MetricsReplicationGlobalSourceSource getGlobalMetrics();
 
   /**
-   * A region server has been removed from the local cluster
-   * @param regionServer the removed region server
+   * Call this when the recovered replication source replicated all WALs.
    */
-  public void regionServerRemoved(String regionServer);
+  void finishRecoveredSource(RecoveredReplicationSource src);
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 62685ee..e0b626c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -45,18 +46,15 @@ public class RecoveredReplicationSource extends ReplicationSource {
 
   private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class);
 
-  private Path walDir;
-
   private String actualPeerId;
 
   @Override
-  public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
-    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
-    String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
-    MetricsSource metrics) throws IOException {
-    super.init(conf, fs, walDir, manager, queueStorage, replicationPeer, server, peerClusterZnode,
-      clusterId, walFileLengthProvider, metrics);
-    this.walDir = walDir;
+  public void init(Configuration conf, FileSystem fs, Path walDir,
+    ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
+    ReplicationPeer replicationPeer, Server server, String peerClusterZnode, UUID clusterId,
+    WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
+    super.init(conf, fs, walDir, overallController, queueStorage, replicationPeer, server,
+      peerClusterZnode, clusterId, walFileLengthProvider, metrics);
     this.actualPeerId = this.replicationQueueInfo.getPeerId();
   }
 
@@ -149,7 +147,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
   void tryFinish() {
     if (workerThreads.isEmpty()) {
       this.getSourceMetrics().clear();
-      manager.finishRecoveredSource(this);
+      controller.finishRecoveredSource(this);
     }
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index fd9fb31..65420cd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
@@ -99,8 +100,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
   protected Configuration conf;
   protected ReplicationQueueInfo replicationQueueInfo;
 
-  // The manager of all sources to which we ping back our progress
-  ReplicationSourceManager manager;
+  protected Path walDir;
+
+  protected ReplicationSourceController controller;
   // Should we stop everything?
   protected Server server;
   // How long should we sleep for each retry
@@ -177,23 +179,14 @@ public class ReplicationSource implements ReplicationSourceInterface {
     this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries);
   }
 
-  /**
-   * Instantiation method used by region servers
-   * @param conf configuration to use
-   * @param fs file system to use
-   * @param manager replication manager to ping to
-   * @param server the server for this region server
-   * @param queueId the id of our replication queue
-   * @param clusterId unique UUID for the cluster
-   * @param metrics metrics for replication source
-   */
   @Override
-  public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
-      ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
-      String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
-      MetricsSource metrics) throws IOException {
+  public void init(Configuration conf, FileSystem fs, Path walDir,
+    ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
+    ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
+    WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
     this.server = server;
     this.conf = HBaseConfiguration.create(conf);
+    this.walDir = walDir;
     this.waitOnEndpointSeconds =
       this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
     decorateConf();
@@ -204,7 +197,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
     this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
     this.queueStorage = queueStorage;
     this.replicationPeer = replicationPeer;
-    this.manager = manager;
+    this.controller = overallController;
     this.fs = fs;
     this.metrics = metrics;
     this.clusterId = clusterId;
@@ -217,6 +210,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
     currentBandwidth = getCurrentBandwidth();
     this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
     this.walFileLengthProvider = walFileLengthProvider;
+
     LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
       replicationPeer.getId(), this.currentBandwidth);
   }
@@ -734,9 +728,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
       throttler.addPushSize(batchSize);
     }
     totalReplicatedEdits.addAndGet(entries.size());
-    long newBufferUsed = manager.getTotalBufferUsed().addAndGet(-batchSize);
+    long newBufferUsed = controller.getTotalBufferUsed().addAndGet(-batchSize);
     // Record the new buffer usage
-    this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+    controller.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
   }
 
   @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 321edc2..f3bf8a4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -44,14 +45,22 @@ public interface ReplicationSourceInterface {
   /**
    * Initializer for the source
    *
-   * @param conf   the configuration to use
-   * @param fs     the file system to use
-   * @param server the server for this region server
-   */
-  void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
-    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
-    String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
-    MetricsSource metrics) throws IOException;
+   * @param conf configuration to use
+   * @param fs file system to use
+   * @param walDir the directory where the WAL is located
+   * @param overallController the overall controller of all replication sources
+   * @param queueStorage the replication queue storage
+   * @param replicationPeer the replication peer
+   * @param server the server which start and run this replication source
+   * @param queueId the id of our replication queue
+   * @param clusterId unique UUID for the cluster
+   * @param walFileLengthProvider used to get the WAL length
+   * @param metrics metrics for this replication source
+   */
+  void init(Configuration conf, FileSystem fs, Path walDir,
+    ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
+    ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
+    WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException;
 
   /**
    * Add a log to the list of logs to replicate
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 3212697..de9e21f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -92,7 +93,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
  * </ul>
  */
 @InterfaceAudience.Private
-public class ReplicationSourceManager implements ReplicationListener {
+public class ReplicationSourceManager implements ReplicationListener, ReplicationSourceController {
   private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class);
   // all the sources that read this RS's logs and every peer only has one replication source
   private final ConcurrentMap<String, ReplicationSourceInterface> sources;
@@ -126,12 +127,6 @@ public class ReplicationSourceManager implements ReplicationListener {
 
   private AtomicLong totalBufferUsed = new AtomicLong();
 
-  // How long should we sleep for each retry when deleting remote wal files for sync replication
-  // peer.
-  private final long sleepForRetries;
-  // Maximum number of retries before taking bold actions when deleting remote wal files for sync
-  // replication peer.
-  private final int maxRetriesMultiplier;
   // Total buffer size on this RegionServer for holding batched edits to be shipped.
   private final long totalBufferLimit;
   private final MetricsReplicationGlobalSourceSource globalMetrics;
@@ -139,6 +134,12 @@ public class ReplicationSourceManager implements ReplicationListener {
   private final Map<String, MetricsSource> sourceMetrics = new HashMap<>();
 
   /**
+   * When enable replication offload, will not create replication source and only write WAL to
+   * replication queue storage. The replication source will be started by ReplicationServer.
+   */
+  private final boolean replicationOffload;
+
+  /**
    * Creates a replication manager and sets the watch on all the other registered region servers
    * @param queueStorage the interface for manipulating replication queues
    * @param replicationPeers
@@ -186,12 +187,11 @@ public class ReplicationSourceManager implements ReplicationListener {
     this.latestPaths = new HashMap<>();
     this.replicationForBulkLoadDataEnabled = conf.getBoolean(
       HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
-    this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000);
-    this.maxRetriesMultiplier =
-      this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60);
     this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
         HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
     this.globalMetrics = globalMetrics;
+    this.replicationOffload = conf.getBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY,
+      HConstants.REPLICATION_OFFLOAD_ENABLE_DEFAULT);
   }
 
   /**
@@ -212,6 +212,47 @@ public class ReplicationSourceManager implements ReplicationListener {
     return this.executor.submit(this::adoptAbandonedQueues);
   }
 
+  @VisibleForTesting
+  @Override
+  public AtomicLong getTotalBufferUsed() {
+    return totalBufferUsed;
+  }
+
+  @Override
+  public long getTotalBufferLimit() {
+    return totalBufferLimit;
+  }
+
+  @Override
+  public void finishRecoveredSource(RecoveredReplicationSource src) {
+    synchronized (oldsources) {
+      if (!removeRecoveredSource(src)) {
+        return;
+      }
+    }
+    LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(),
+      src.getStats());
+  }
+
+  @Override
+  public MetricsReplicationGlobalSourceSource getGlobalMetrics() {
+    return this.globalMetrics;
+  }
+
+  /**
+   * Clear the metrics and related replication queue of the specified old source
+   * @param src source to clear
+   */
+  private boolean removeRecoveredSource(ReplicationSourceInterface src) {
+    if (!this.oldsources.remove(src)) {
+      return false;
+    }
+    LOG.info("Done with the recovered queue {}", src.getQueueId());
+    // Delete queue from storage and memory
+    deleteQueue(src.getQueueId());
+    return true;
+  }
+
   private void adoptAbandonedQueues() {
     List<ServerName> currentReplicators = null;
     try {
@@ -331,8 +372,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @param peerId the id of the replication peer
    * @return the source that was created
    */
-  @VisibleForTesting
-  ReplicationSourceInterface addSource(String peerId) throws IOException {
+  void addSource(String peerId) throws IOException {
     ReplicationPeer peer = replicationPeers.getPeer(peerId);
     ReplicationSourceInterface src = createSource(peerId, peer);
     // synchronized on latestPaths to avoid missing the new log
@@ -354,8 +394,9 @@ public class ReplicationSourceManager implements ReplicationListener {
     if (peerConfig.isSyncReplication()) {
       syncReplicationPeerMappingManager.add(peer.getId(), peerConfig);
     }
-    src.startup();
-    return src;
+    if (!replicationOffload) {
+      src.startup();
+    }
   }
 
   /**
@@ -373,7 +414,11 @@ public class ReplicationSourceManager implements ReplicationListener {
    * </p>
    * @param peerId the id of the sync replication peer
    */
-  public void drainSources(String peerId) throws IOException, ReplicationException {
+  void drainSources(String peerId) throws IOException, ReplicationException {
+    if (replicationOffload) {
+      throw new ReplicationException(
+        "Should not add use sync replication when replication offload enabled");
+    }
     String terminateMessage = "Sync replication peer " + peerId +
       " is transiting to STANDBY. Will close the previous replication source and open a new one";
     ReplicationPeer peer = replicationPeers.getPeer(peerId);
@@ -430,7 +475,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * replication queue storage and only to enqueue all logs to the new replication source
    * @param peerId the id of the replication peer
    */
-  public void refreshSources(String peerId) throws ReplicationException, IOException {
+  void refreshSources(String peerId) throws ReplicationException, IOException {
     String terminateMessage = "Peer " + peerId +
       " state or config changed. Will close the previous replication source and open a new one";
     ReplicationPeer peer = replicationPeers.getPeer(peerId);
@@ -447,7 +492,9 @@ public class ReplicationSourceManager implements ReplicationListener {
         .forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
     }
     LOG.info("Startup replication source for " + src.getPeerId());
-    src.startup();
+    if (!replicationOffload) {
+      src.startup();
+    }
 
     List<ReplicationSourceInterface> toStartup = new ArrayList<>();
     // synchronized on oldsources to avoid race with NodeFailoverWorker
@@ -470,41 +517,18 @@ public class ReplicationSourceManager implements ReplicationListener {
         toStartup.add(recoveredReplicationSource);
       }
     }
-    for (ReplicationSourceInterface replicationSource : toStartup) {
-      replicationSource.startup();
-    }
-  }
-
-  /**
-   * Clear the metrics and related replication queue of the specified old source
-   * @param src source to clear
-   */
-  private boolean removeRecoveredSource(ReplicationSourceInterface src) {
-    if (!this.oldsources.remove(src)) {
-      return false;
-    }
-    LOG.info("Done with the recovered queue {}", src.getQueueId());
-    // Delete queue from storage and memory
-    deleteQueue(src.getQueueId());
-    return true;
-  }
-
-  void finishRecoveredSource(ReplicationSourceInterface src) {
-    synchronized (oldsources) {
-      if (!removeRecoveredSource(src)) {
-        return;
+    if (!replicationOffload) {
+      for (ReplicationSourceInterface replicationSource : toStartup) {
+        replicationSource.startup();
       }
     }
-    LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(),
-      src.getStats());
   }
 
   /**
    * Clear the metrics and related replication queue of the specified old source
    * @param src source to clear
    */
-  void removeSource(ReplicationSourceInterface src) {
-    LOG.info("Done with the queue " + src.getQueueId());
+  private void removeSource(ReplicationSourceInterface src) {
     this.sources.remove(src.getPeerId());
     // Delete queue from storage and memory
     deleteQueue(src.getQueueId());
@@ -548,8 +572,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     }
   }
 
-  // public because of we call it in TestReplicationEmptyWALRecovery
-  @VisibleForTesting
+  @InterfaceAudience.Private
   public void preLogRoll(Path newLog) throws IOException {
     String logName = newLog.getName();
     String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
@@ -567,9 +590,8 @@ public class ReplicationSourceManager implements ReplicationListener {
     }
   }
 
-  // public because of we call it in TestReplicationEmptyWALRecovery
-  @VisibleForTesting
-  public void postLogRoll(Path newLog) throws IOException {
+  @InterfaceAudience.Private
+  public void postLogRoll(Path newLog) {
     // This only updates the sources we own, not the recovered ones
     for (ReplicationSourceInterface source : this.sources.values()) {
       source.enqueueLog(newLog);
@@ -739,7 +761,9 @@ public class ReplicationSourceManager implements ReplicationListener {
               LOG.trace("Enqueueing log from recovered queue for source: " + src.getQueueId());
               src.enqueueLog(new Path(oldLogDir, wal));
             }
-            src.startup();
+            if (!replicationOffload) {
+              src.startup();
+            }
           }
         } catch (IOException e) {
           // TODO manage it
@@ -849,19 +873,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     }
   }
 
-  @VisibleForTesting
-  public AtomicLong getTotalBufferUsed() {
-    return totalBufferUsed;
-  }
-
-  /**
-   * Returns the maximum size in bytes of edits held in memory which are pending replication
-   * across all sources inside this RegionServer.
-   */
-  public long getTotalBufferLimit() {
-    return totalBufferLimit;
-  }
-
   /**
    * Get the directory where wals are archived
    * @return the directory where wals are archived
@@ -967,10 +978,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     return executor.getActiveCount();
   }
 
-  MetricsReplicationGlobalSourceSource getGlobalMetrics() {
-    return this.globalMetrics;
-  }
-
   @InterfaceAudience.Private
   Server getServer() {
     return this.server;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 22cbd97..7b7d0d8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -267,10 +267,11 @@ class ReplicationSourceWALReader extends Thread {
   //returns false if we've already exceeded the global quota
   private boolean checkQuota() {
     // try not to go over total quota
-    if (source.manager.getTotalBufferUsed().get() > source.manager.getTotalBufferLimit()) {
+    if (source.controller.getTotalBufferUsed().get() > source.controller
+      .getTotalBufferLimit()) {
       LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
-        this.source.getPeerId(), source.manager.getTotalBufferUsed().get(),
-        source.manager.getTotalBufferLimit());
+        this.source.getPeerId(), source.controller.getTotalBufferUsed().get(),
+        source.controller.getTotalBufferLimit());
       Threads.sleep(sleepForRetries);
       return false;
     }
@@ -399,10 +400,10 @@ class ReplicationSourceWALReader extends Thread {
    * @return true if we should clear buffer and push all
    */
   private boolean acquireBufferQuota(long size) {
-    long newBufferUsed = source.manager.getTotalBufferUsed().addAndGet(size);
+    long newBufferUsed = source.controller.getTotalBufferUsed().addAndGet(size);
     // Record the new buffer usage
-    source.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
-    return newBufferUsed >= source.manager.getTotalBufferLimit();
+    source.controller.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+    return newBufferUsed >= source.controller.getTotalBufferLimit();
   }
 
   /**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index b75a7ed..66059c7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 import org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch;
 import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -40,21 +39,21 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
 public class ReplicationSourceDummy implements ReplicationSourceInterface {
 
   private ReplicationPeer replicationPeer;
-  private String peerClusterId;
+  private String queueId;
   private Path currentPath;
   private MetricsSource metrics;
   private WALFileLengthProvider walFileLengthProvider;
   private AtomicBoolean startup = new AtomicBoolean(false);
 
   @Override
-  public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
-    ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
-    UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
-    throws IOException {
-    this.peerClusterId = peerClusterId;
+  public void init(Configuration conf, FileSystem fs, Path walDir,
+    ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
+    ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
+    WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
+    this.queueId = queueId;
     this.metrics = metrics;
     this.walFileLengthProvider = walFileLengthProvider;
-    this.replicationPeer = rp;
+    this.replicationPeer = replicationPeer;
   }
 
   @Override
@@ -96,14 +95,14 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
 
   @Override
   public String getQueueId() {
-    return peerClusterId;
+    return queueId;
   }
 
   @Override
   public String getPeerId() {
-    String[] parts = peerClusterId.split("-", 2);
+    String[] parts = queueId.split("-", 2);
     return parts.length != 1 ?
-        parts[0] : peerClusterId;
+        parts[0] : queueId;
   }
 
   @Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 4b685ce..0e0353f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -43,6 +43,7 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.stream.Collectors;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -72,6 +73,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
@@ -816,10 +818,11 @@ public abstract class TestReplicationSourceManager {
 
   static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy {
 
-    @Override public void init(Configuration conf, FileSystem fs, Path walDir,
-      ReplicationSourceManager manager, ReplicationQueueStorage rq, ReplicationPeer rp,
-      Server server, String peerClusterId, UUID clusterId,
-      WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
+    @Override
+    public void init(Configuration conf, FileSystem fs, Path walDir,
+      ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
+      ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
+      WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException{
       throw new IOException("Failing deliberately");
     }
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 9410604..bafabb0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -375,19 +376,19 @@ public class TestWALEntryStream {
     when(source.getWALFileLengthProvider()).thenReturn(log);
     when(source.getServer()).thenReturn(mockServer);
     when(source.isRecovered()).thenReturn(recovered);
-    source.manager = mockReplicationSourceManager();
+    source.controller = mockReplicationSourceController();
     return source;
   }
 
-  private ReplicationSourceManager mockReplicationSourceManager() {
-    ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
+  private ReplicationSourceController mockReplicationSourceController() {
+    ReplicationSourceController controller = Mockito.mock(ReplicationSourceController.class);
     MetricsReplicationGlobalSourceSource globalMetrics =
       Mockito.mock(MetricsReplicationGlobalSourceSource.class);
-    when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
-    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
-    when(mockSourceManager.getTotalBufferLimit())
+    when(controller.getGlobalMetrics()).thenReturn(globalMetrics);
+    when(controller.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+    when(controller.getTotalBufferLimit())
       .thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
-    return mockSourceManager;
+    return controller;
   }
 
   private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {