You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by su...@apache.org on 2021/07/09 06:56:03 UTC

[hbase] 09/12: HBASE-25071 ReplicationServer support start ReplicationSource internal (#2452)

This is an automated email from the ASF dual-hosted git repository.

sunxin pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit c8c85f4f4205ebcd6c7fbecf55072386c3bb842b
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Mon Nov 9 11:46:02 2020 +0800

    HBASE-25071 ReplicationServer support start ReplicationSource internal (#2452)
    
    Signed-off-by: XinSun <dd...@gmail.com>
---
 .../server/replication/ReplicationServer.proto     |  14 +-
 .../replication/ZKReplicationQueueStorage.java     |   4 +-
 .../replication/ZKReplicationStorageBase.java      |   4 +
 .../hadoop/hbase/master/MasterRpcServices.java     |   2 +-
 .../hadoop/hbase/regionserver/RSRpcServices.java   |   2 +-
 .../replication/HBaseReplicationEndpoint.java      |  14 +-
 .../hbase/replication/HReplicationServer.java      | 175 ++++++++++++++++++---
 .../replication/ReplicationServerRpcServices.java  |  15 ++
 .../regionserver/RecoveredReplicationSource.java   |   9 +-
 .../regionserver/ReplicationSource.java            |  54 ++++++-
 .../regionserver/ReplicationSourceFactory.java     |   2 +-
 .../regionserver/ReplicationSourceInterface.java   |   6 +-
 .../regionserver/ReplicationSourceManager.java     |   9 +-
 .../hbase/replication/ReplicationSourceDummy.java  |   5 +-
 .../replication/TestReplicationFetchServers.java   |  43 +++--
 ...nServer.java => TestReplicationServerSink.java} |  25 +--
 .../replication/TestReplicationServerSource.java   |  69 ++++++++
 .../regionserver/TestReplicationSource.java        |  20 +--
 .../regionserver/TestReplicationSourceManager.java |  18 ++-
 19 files changed, 400 insertions(+), 90 deletions(-)

diff --git a/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto b/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto
index ed334c4..925aed4 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto
@@ -24,9 +24,21 @@ option java_generic_services = true;
 option java_generate_equals_and_hash = true;
 option optimize_for = SPEED;
 
+import "HBase.proto";
 import "server/region/Admin.proto";
 
+message StartReplicationSourceRequest {
+  required ServerName server_name = 1;
+  required string queue_id = 2;
+}
+
+message StartReplicationSourceResponse {
+}
+
 service ReplicationServerService {
   rpc ReplicateWALEntry(ReplicateWALEntryRequest)
     returns(ReplicateWALEntryResponse);
-}
\ No newline at end of file
+
+  rpc StartReplicationSource(StartReplicationSourceRequest)
+    returns(StartReplicationSourceResponse);
+}
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index 5c480ba..08ac142 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -79,7 +79,7 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti
  * </pre>
  */
 @InterfaceAudience.Private
-class ZKReplicationQueueStorage extends ZKReplicationStorageBase
+public class ZKReplicationQueueStorage extends ZKReplicationStorageBase
     implements ReplicationQueueStorage {
 
   private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class);
@@ -121,7 +121,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
     return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName());
   }
 
-  private String getQueueNode(ServerName serverName, String queueId) {
+  public String getQueueNode(ServerName serverName, String queueId) {
     return ZNodePaths.joinZNode(getRsNode(serverName), queueId);
   }
 
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
index 596167f..a239bf8 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
@@ -74,4 +74,8 @@ public abstract class ZKReplicationStorageBase {
       throw new RuntimeException(e);
     }
   }
+
+  public ZKWatcher getZookeeper() {
+    return this.zookeeper;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index c677458..c17d699 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -3475,7 +3475,7 @@ public class MasterRpcServices extends RSRpcServices implements
       if (master.getMasterCoprocessorHost() != null) {
         master.getMasterCoprocessorHost().preListReplicationSinkServers();
       }
-      builder.addAllServerName(master.listReplicationSinkServers().stream()
+      builder.addAllServerName(master.getReplicationServerManager().getOnlineServersList().stream()
         .map(ProtobufUtil::toServerName).collect(Collectors.toList()));
       if (master.getMasterCoprocessorHost() != null) {
         master.getMasterCoprocessorHost().postListReplicationSinkServers();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 72fea23..91bf9cb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -273,7 +273,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
 @SuppressWarnings("deprecation")
 public class RSRpcServices implements HBaseRPCErrorHandler,
     AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
-    ConfigurationObserver, ReplicationServerService.BlockingInterface {
+    ConfigurationObserver {
   private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
 
   /** RPC scheduler to use for the region server. */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
index 115df76..d17bb7f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
@@ -315,6 +315,10 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
       if (!useZk || ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
         useZk = false;
         slaveAddresses = fetchSlavesAddresses();
+        if (slaveAddresses.isEmpty()) {
+          LOG.warn("No sinks available at peer. Try fetch sinks by using zk.");
+          useZk = true;
+        }
       } else {
         useZk = true;
       }
@@ -322,13 +326,15 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
       LOG.warn("Peer {} try to fetch servers by admin failed. Using zk impl.", ctx.getPeerId(), t);
       useZk = true;
     }
+
     if (useZk) {
       slaveAddresses = fetchSlavesAddressesByZK();
     }
 
     if (slaveAddresses.isEmpty()) {
-      LOG.warn("No sinks available at peer. Will not be able to replicate");
+      LOG.warn("No sinks available at peer. Will not be able to replicate.");
     }
+
     Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
     int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
     synchronized (this) {
@@ -362,10 +368,10 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
   }
 
   private SinkPeer createSinkPeer(ServerName serverName) throws IOException {
-    if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
-      return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName));
-    } else {
+    if (fetchServersUseZk) {
       return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName));
+    } else {
+      return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName));
     }
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
index e679a98..2d0336d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
@@ -20,10 +20,19 @@ package org.apache.hadoop.hbase.replication;
 import java.io.IOException;
 import java.lang.management.MemoryUsage;
 import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
@@ -33,17 +42,30 @@ import org.apache.hadoop.hbase.YouAreDeadException;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.regionserver.ReplicationService;
 import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceFactory;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceFactory;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
+import org.apache.hadoop.hbase.security.SecurityConstants;
+import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -65,7 +87,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerStatus
  */
 @InterfaceAudience.Private
 @SuppressWarnings({ "deprecation"})
-public class HReplicationServer extends Thread implements Server {
+public class HReplicationServer extends Thread implements Server, ReplicationSourceController  {
 
   private static final Logger LOG = LoggerFactory.getLogger(HReplicationServer.class);
 
@@ -75,7 +97,7 @@ public class HReplicationServer extends Thread implements Server {
   /**
    * This servers start code.
    */
-  protected final long startCode;
+  private final long startCode;
 
   private volatile boolean stopped = false;
 
@@ -84,7 +106,11 @@ public class HReplicationServer extends Thread implements Server {
   private AtomicBoolean abortRequested;
 
   // flag set after we're done setting up server threads
-  final AtomicBoolean online = new AtomicBoolean(false);
+  private final AtomicBoolean online = new AtomicBoolean(false);
+
+  private final int msgInterval;
+  // A sleeper that sleeps for msgInterval.
+  private final Sleeper sleeper;
 
   /**
    * The server name the Master sees us as.  Its made from the hostname the
@@ -93,18 +119,22 @@ public class HReplicationServer extends Thread implements Server {
    */
   private ServerName serverName;
 
-  protected final Configuration conf;
+  private final Configuration conf;
 
-  private ReplicationSinkService replicationSinkService;
+  // zookeeper connection and watcher
+  private final ZKWatcher zooKeeper;
 
-  final int msgInterval;
-  // A sleeper that sleeps for msgInterval.
-  protected final Sleeper sleeper;
+  private final UUID clusterId;
 
   private final int shortOperationTimeout;
 
-  // zookeeper connection and watcher
-  protected final ZKWatcher zooKeeper;
+  private HFileSystem walFs;
+  private Path walRootDir;
+
+  /**
+   * ChoreService used to schedule tasks that we want to run periodically
+   */
+  private ChoreService choreService;
 
   // master address tracker
   private final MasterAddressTracker masterAddressTracker;
@@ -112,11 +142,23 @@ public class HReplicationServer extends Thread implements Server {
   /**
    * The asynchronous cluster connection to be shared by services.
    */
-  protected AsyncClusterConnection asyncClusterConnection;
+  private AsyncClusterConnection asyncClusterConnection;
 
   private UserProvider userProvider;
 
-  protected final ReplicationServerRpcServices rpcServices;
+  final ReplicationServerRpcServices rpcServices;
+
+  // Total buffer size on this RegionServer for holding batched edits to be shipped.
+  private final long totalBufferLimit;
+  private AtomicLong totalBufferUsed = new AtomicLong();
+
+  private final MetricsReplicationGlobalSourceSource globalMetrics;
+  private final Map<String, MetricsSource> sourceMetrics = new HashMap<>();
+  private final ConcurrentMap<String, ReplicationSourceInterface> sources =
+    new ConcurrentHashMap<>();
+
+  private final ReplicationQueueStorage queueStorage;
+  private final ReplicationPeers replicationPeers;
 
   // Stub to do region server status calls against the master.
   private volatile ReplicationServerStatusService.BlockingInterface rssStub;
@@ -124,12 +166,9 @@ public class HReplicationServer extends Thread implements Server {
   // RPC client. Used to make the stub above that does region server status checking.
   private RpcClient rpcClient;
 
-  /**
-   * ChoreService used to schedule tasks that we want to run periodically
-   */
-  private ChoreService choreService;
+  private ReplicationSinkService replicationSinkService;
 
-  public HReplicationServer(final Configuration conf) throws IOException {
+  public HReplicationServer(final Configuration conf) throws Exception {
     try {
       this.startCode = System.currentTimeMillis();
       this.conf = conf;
@@ -142,12 +181,29 @@ public class HReplicationServer extends Thread implements Server {
       serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startCode);
 
       this.userProvider = UserProvider.instantiate(conf);
+      // login the zookeeper client principal (if using security)
+      ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
+        HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
+      // login the server principal (if using secure Hadoop)
+      this.userProvider.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
+        SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, hostName);
+      // init superusers and add the server principal (if using security)
+      // or process owner as default super user.
+      Superusers.initialize(conf);
 
       this.msgInterval = conf.getInt("hbase.replicationserver.msginterval", 3 * 1000);
       this.sleeper = new Sleeper(this.msgInterval, this);
 
       this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
           HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
+      this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
+        HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+      this.globalMetrics =
+        CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
+          .getGlobalSource();
+
+      initializeFileSystem();
+      this.choreService = new ChoreService(getName(), true);
 
       // Some unit tests don't need a cluster, so no zookeeper at all
       if (!conf.getBoolean("hbase.testing.nocluster", false)) {
@@ -160,6 +216,12 @@ public class HReplicationServer extends Thread implements Server {
         zooKeeper = null;
         masterAddressTracker = null;
       }
+
+      this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zooKeeper, conf);
+      this.replicationPeers =
+        ReplicationFactory.getReplicationPeers(zooKeeper, this.conf);
+      this.replicationPeers.init();
+      this.clusterId = ZKClusterId.getUUIDForCluster(zooKeeper);
       this.rpcServices.start(zooKeeper);
       this.choreService = new ChoreService(getName(), true);
     } catch (Throwable t) {
@@ -170,6 +232,15 @@ public class HReplicationServer extends Thread implements Server {
     }
   }
 
+  private void initializeFileSystem() throws IOException {
+    // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase
+    // checksum verification enabled, then automatically switch off hdfs checksum verification.
+    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
+    CommonFSUtils.setFsDefault(this.conf, CommonFSUtils.getWALRootDir(this.conf));
+    this.walFs = new HFileSystem(this.conf, useHBaseChecksum);
+    this.walRootDir = CommonFSUtils.getWALRootDir(this.conf);
+  }
+
   public String getProcessName() {
     return REPLICATION_SERVER;
   }
@@ -289,6 +360,9 @@ public class HReplicationServer extends Thread implements Server {
     if (this.replicationSinkService != null) {
       this.replicationSinkService.stopReplicationService();
     }
+    if (this.choreService != null) {
+      this.choreService.shutdown();
+    }
   }
 
   @Override
@@ -328,7 +402,7 @@ public class HReplicationServer extends Thread implements Server {
 
   @Override
   public ChoreService getChoreService() {
-    return this.choreService;
+    return choreService;
   }
 
   @Override
@@ -592,4 +666,69 @@ public class HReplicationServer extends Thread implements Server {
     }
     return interrupted;
   }
+
+  @Override
+  public long getTotalBufferLimit() {
+    return this.totalBufferLimit;
+  }
+
+  @Override
+  public AtomicLong getTotalBufferUsed() {
+    return this.totalBufferUsed;
+  }
+
+  @Override
+  public MetricsReplicationGlobalSourceSource getGlobalMetrics() {
+    return this.globalMetrics;
+  }
+
+  @Override
+  public void finishRecoveredSource(RecoveredReplicationSource src) {
+    this.sources.remove(src.getQueueId());
+    this.sourceMetrics.remove(src.getQueueId());
+    deleteQueue(src.getQueueId());
+    LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(),
+      src.getStats());
+  }
+
+  public void startReplicationSource(ServerName producer, String queueId)
+    throws IOException, ReplicationException {
+    ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId);
+    String peerId = replicationQueueInfo.getPeerId();
+    this.replicationPeers.addPeer(peerId);
+    Path walDir =
+      new Path(walRootDir, AbstractFSWALProvider.getWALDirectoryName(producer.toString()));
+    MetricsSource metrics = new MetricsSource(queueId);
+
+    ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);
+    // init replication source
+    src.init(conf, walFs, walDir, this, queueStorage, replicationPeers.getPeer(peerId), this,
+      producer, queueId, clusterId, p -> OptionalLong.empty(), metrics);
+    queueStorage.getWALsInQueue(producer, queueId)
+      .forEach(walName -> src.enqueueLog(new Path(walDir, walName)));
+    src.startup();
+    sources.put(queueId, src);
+    sourceMetrics.put(queueId, metrics);
+  }
+
+  /**
+   * Delete a complete queue of wals associated with a replication source
+   * @param queueId the id of replication queue to delete
+   */
+  private void deleteQueue(String queueId) {
+    abortWhenFail(() -> this.queueStorage.removeQueue(getServerName(), queueId));
+  }
+
+  @FunctionalInterface
+  private interface ReplicationQueueOperation {
+    void exec() throws ReplicationException;
+  }
+
+  private void abortWhenFail(ReplicationQueueOperation op) {
+    try {
+      op.exec();
+    } catch (ReplicationException e) {
+      abort("Failed to operate on replication queue", e);
+    }
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
index 15d4f8c..b8c3884 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
@@ -56,11 +56,14 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.StartReplicationSourceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.StartReplicationSourceResponse;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@@ -321,4 +324,16 @@ public class ReplicationServerRpcServices implements HBaseRPCErrorHandler,
       throw new ServiceException(ie);
     }
   }
+
+  @Override
+  public StartReplicationSourceResponse startReplicationSource(RpcController controller,
+    StartReplicationSourceRequest request) throws ServiceException {
+    try {
+      replicationServer.startReplicationSource(ProtobufUtil.toServerName(request.getServerName()),
+        request.getQueueId());
+      return StartReplicationSourceResponse.newBuilder().build();
+    } catch (Exception e) {
+      throw new ServiceException(e);
+    }
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 7cb159e..147556f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -51,10 +51,11 @@ public class RecoveredReplicationSource extends ReplicationSource {
   @Override
   public void init(Configuration conf, FileSystem fs, Path walDir,
     ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
-    ReplicationPeer replicationPeer, Server server, String peerClusterZnode, UUID clusterId,
-    WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
-    super.init(conf, fs, walDir, overallController, queueStorage, replicationPeer, server,
-      peerClusterZnode, clusterId, walFileLengthProvider, metrics);
+    ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId,
+    UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
+    throws IOException {
+    super.init(conf, fs, walDir, overallController, queueStorage, replicationPeer, server, producer,
+      queueId, clusterId, walFileLengthProvider, metrics);
     this.actualPeerId = this.replicationQueueInfo.getPeerId();
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 27f2ce7..0d9ee4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -62,10 +62,13 @@ import org.apache.hadoop.hbase.replication.ReplicationSourceController;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorage;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.zookeeper.ZKListener;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -149,6 +152,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
   private int waitOnEndpointSeconds = -1;
 
   private Thread initThread;
+  private Thread fetchWALsThread;
 
   /**
    * WALs to replicate.
@@ -186,8 +190,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
   @Override
   public void init(Configuration conf, FileSystem fs, Path walDir,
     ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
-    ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
-    WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
+    ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId,
+    UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
+    throws IOException {
     this.server = server;
     this.conf = HBaseConfiguration.create(conf);
     this.walDir = walDir;
@@ -219,6 +224,19 @@ public class ReplicationSource implements ReplicationSourceInterface {
     this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort",
       true);
 
+    if (conf.getBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY,
+      HConstants.REPLICATION_OFFLOAD_ENABLE_DEFAULT)) {
+      if (queueStorage instanceof ZKReplicationQueueStorage) {
+        ZKReplicationQueueStorage zkQueueStorage = (ZKReplicationQueueStorage) queueStorage;
+        zkQueueStorage.getZookeeper().registerListener(
+          new ReplicationQueueListener(this, zkQueueStorage, producer, queueId, walDir));
+        LOG.info("Register a ZKListener to track the WALs from {}'s replication queue, queueId={}",
+          producer, queueId);
+      } else {
+        throw new UnsupportedOperationException(
+          "hbase.replication.offload.enabled=true only support ZKReplicationQueueStorage");
+      }
+    }
     LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
       replicationPeer.getId(), this.currentBandwidth);
   }
@@ -928,4 +946,36 @@ public class ReplicationSource implements ReplicationSourceInterface {
       server.abort("Failed to operate on replication queue", e);
     }
   }
+
+  /**
+   * Tracks changes to the WALs in the replication queue.
+   */
+  public static class ReplicationQueueListener extends ZKListener {
+
+    private final ReplicationSource source;
+    private final String queueNode;
+    private final Path walDir;
+
+    public ReplicationQueueListener(ReplicationSource source,
+      ZKReplicationQueueStorage zkQueueStorage, ServerName producer, String queueId, Path walDir) {
+      super(zkQueueStorage.getZookeeper());
+      this.source = source;
+      this.queueNode = zkQueueStorage.getQueueNode(producer, queueId);
+      this.walDir = walDir;
+    }
+
+    @Override
+    public synchronized void nodeChildrenChanged(String path) {
+      if (path.equals(queueNode)) {
+        LOG.info("Detected change to the WALs in the replication queue {}", queueNode);
+        try {
+          ZKUtil.listChildrenNoWatch(watcher, queueNode).forEach(walName -> {
+            source.enqueueLog(new Path(walDir, walName));
+          });
+        } catch (KeeperException e) {
+          LOG.warn("Failed to read WALs in the replication queue {}", queueNode, e);
+        }
+      }
+    }
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
index 8863f14..56c8ee4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
@@ -35,7 +35,7 @@ public final class ReplicationSourceFactory {
 
   private ReplicationSourceFactory() {}
 
-  static ReplicationSourceInterface create(Configuration conf, String queueId) {
+  public static ReplicationSourceInterface create(Configuration conf, String queueId) {
     ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId);
     boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered();
     ReplicationSourceInterface src;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 296bd27..461276e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -50,6 +50,7 @@ public interface ReplicationSourceInterface {
    * @param queueStorage the replication queue storage
    * @param replicationPeer the replication peer
    * @param server the server which start and run this replication source
+   * @param producer the name of region server which produce WAL to the replication queue
    * @param queueId the id of our replication queue
    * @param clusterId unique UUID for the cluster
    * @param walFileLengthProvider used to get the WAL length
@@ -57,8 +58,9 @@ public interface ReplicationSourceInterface {
    */
   void init(Configuration conf, FileSystem fs, Path walDir,
     ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
-    ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
-    WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException;
+    ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId,
+    UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
+    throws IOException;
 
   /**
    * Add a log to the list of logs to replicate
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index b6cb087..3dc2d12 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -304,8 +304,8 @@ public class ReplicationSourceManager implements ReplicationSourceController {
     WALFileLengthProvider walFileLengthProvider =
       this.walFactory.getWALProvider() != null?
         this.walFactory.getWALProvider().getWALFileLengthProvider() : p -> OptionalLong.empty();
-    src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server, queueId, clusterId,
-      walFileLengthProvider, new MetricsSource(queueId));
+    src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server, server.getServerName(),
+      queueId, clusterId, walFileLengthProvider, new MetricsSource(queueId));
     return src;
   }
 
@@ -925,8 +925,9 @@ public class ReplicationSourceManager implements ReplicationSourceController {
     CatalogReplicationSourcePeer peer = new CatalogReplicationSourcePeer(this.conf,
       this.clusterId.toString());
     final ReplicationSourceInterface crs = new CatalogReplicationSource();
-    crs.init(conf, fs, logDir, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(),
-      clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId()));
+    crs.init(conf, fs, logDir, this, new NoopReplicationQueueStorage(), peer, server,
+      server.getServerName(), peer.getId(), clusterId, walProvider.getWALFileLengthProvider(),
+      new MetricsSource(peer.getId()));
     // Add listener on the provider so we can pick up the WAL to replicate on roll.
     WALActionsListener listener = new WALActionsListener() {
       @Override public void postLogRoll(Path oldPath, Path newPath) throws IOException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 8a32e94..8f28dee 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -48,8 +48,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   @Override
   public void init(Configuration conf, FileSystem fs, Path walDir,
     ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
-    ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
-    WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
+    ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId,
+    UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
+    throws IOException {
     this.queueId = queueId;
     this.metrics = metrics;
     this.walFileLengthProvider = walFileLengthProvider;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java
index 9ceacee..db4152e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java
@@ -18,10 +18,10 @@
 package org.apache.hadoop.hbase.replication;
 
 import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,13 +38,14 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
@@ -53,11 +54,14 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterServ
 
 @Category({ ReplicationTests.class, MediumTests.class })
 public class TestReplicationFetchServers extends TestReplicationBase {
+  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationFetchServers.class);
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestReplicationFetchServers.class);
 
+  private static HReplicationServer replicationServer;
+
   private static AtomicBoolean fetchFlag = new AtomicBoolean(false);
 
   public static class MyObserver implements MasterCoprocessor, MasterObserver {
@@ -77,6 +81,17 @@ public class TestReplicationFetchServers extends TestReplicationBase {
   public static void setUpBeforeClass() throws Exception {
     CONF2.set(MASTER_COPROCESSOR_CONF_KEY, MyObserver.class.getName());
     TestReplicationBase.setUpBeforeClass();
+    replicationServer = new HReplicationServer(CONF2);
+    replicationServer.start();
+    UTIL2.waitFor(60000, () -> replicationServer.isOnline());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TestReplicationBase.tearDownAfterClass();
+    if (!replicationServer.isStopped()) {
+      replicationServer.stop("test");
+    }
   }
 
   @Before
@@ -85,15 +100,23 @@ public class TestReplicationFetchServers extends TestReplicationBase {
   }
 
   @Test
-  public void testMasterListReplicationPeerServers() throws IOException, ServiceException {
+  public void testMasterListReplicationPeerServers() throws IOException {
     AsyncClusterConnection conn = UTIL2.getAsyncConnection();
     ServerName master = UTIL2.getAdmin().getMaster();
-    MasterService.BlockingInterface masterStub = MasterService.newBlockingStub(
-        conn.getRpcClient().createBlockingRpcChannel(master, User.getCurrent(), 1000));
-    ListReplicationSinkServersResponse resp = masterStub.listReplicationSinkServers(
-        null, ListReplicationSinkServersRequest.newBuilder().build());
-    List<ServerName> servers = ProtobufUtil.toServerNameList(resp.getServerNameList());
-    assertFalse(servers.isEmpty());
+    // Wait for the replication server report to master
+    UTIL2.waitFor(60000, () -> {
+      List<ServerName> servers = new ArrayList<>();
+      try {
+        MasterService.BlockingInterface masterStub = MasterService.newBlockingStub(
+          conn.getRpcClient().createBlockingRpcChannel(master, User.getCurrent(), 1000));
+        ListReplicationSinkServersResponse resp = masterStub.listReplicationSinkServers(
+          null, ListReplicationSinkServersRequest.newBuilder().build());
+        servers = ProtobufUtil.toServerNameList(resp.getServerNameList());
+      } catch (Exception e) {
+        LOG.debug("Failed to list replication servers", e);
+      }
+      return servers.size() == 1;
+    });
     assertTrue(fetchFlag.get());
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java
similarity index 89%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java
index 30660c6..d97667b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.ReplicationServerManager;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.ReplicationServerSinkPeer;
-import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.SinkPeer;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -64,13 +63,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Category({ReplicationTests.class, MediumTests.class})
-public class TestReplicationServer {
+public class TestReplicationServerSink {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestReplicationServer.class);
+      HBaseClassTestRule.forClass(TestReplicationServerSink.class);
 
-  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationServer.class);
+  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationServerSink.class);
 
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
@@ -126,6 +125,7 @@ public class TestReplicationServer {
     if (!replicationServer.isStopped()) {
       replicationServer.stop("test");
     }
+    TEST_UTIL.waitFor(10000, () -> !replicationServer.isAlive());
     replicationServer = null;
     replicationServerName = null;
   }
@@ -145,22 +145,7 @@ public class TestReplicationServer {
     replicateWALEntryAndVerify(sinkPeer);
   }
 
-  /**
-   * Requests region server using {@link AsyncReplicationServerAdmin}
-   */
-  @Test
-  public void testReplicateWAL2() throws Exception {
-    AsyncClusterConnection conn =
-        TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection();
-    ServerName rs = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0)
-        .getRegionServer().getServerName();
-    AsyncReplicationServerAdmin replAdmin = conn.getReplicationServerAdmin(rs);
-
-    ReplicationServerSinkPeer sinkPeer = new ReplicationServerSinkPeer(rs, replAdmin);
-    replicateWALEntryAndVerify(sinkPeer);
-  }
-
-  private void replicateWALEntryAndVerify(SinkPeer sinkPeer) throws Exception {
+  private void replicateWALEntryAndVerify(ReplicationServerSinkPeer sinkPeer) throws Exception {
     Entry[] entries = new Entry[BATCH_SIZE];
     for(int i = 0; i < BATCH_SIZE; i++) {
       entries[i] = generateEdit(i, TABLENAME, Bytes.toBytes(i));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSource.java
new file mode 100644
index 0000000..843e5b1
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSource.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestReplicationServerSource extends TestReplicationBase {
+
+  @ClassRule public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestReplicationServerSource.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationServerSource.class);
+
+  private static HReplicationServer replicationServer;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    UTIL1.getConfiguration().setBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY, true);
+    TestReplicationBase.setUpBeforeClass();
+    replicationServer = new HReplicationServer(UTIL1.getConfiguration());
+    replicationServer.start();
+    UTIL1.waitFor(60000, () -> replicationServer.isOnline());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    replicationServer.stop("Tear down after test");
+    TestReplicationBase.tearDownAfterClass();
+  }
+
+  @Test
+  public void test() throws Exception {
+    try {
+      // Only start one region server in source cluster
+      ServerName producer = UTIL1.getMiniHBaseCluster().getRegionServer(0).getServerName();
+      replicationServer.startReplicationSource(producer, PEER_ID2);
+    } catch (Throwable e) {
+      LOG.info("Failed to start replicaiton source", e);
+    }
+    runSmallBatchTest();
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 697a5ec..bd673bc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -139,7 +139,7 @@ public class TestReplicationSource {
     String queueId = "qid";
     RegionServerServices rss =
       TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
-    rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, null,
+    rs.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), queueId, null,
       p -> OptionalLong.empty(), new MetricsSource(queueId));
     try {
       rs.startup();
@@ -177,8 +177,8 @@ public class TestReplicationSource {
     String queueId = "qid";
     RegionServerServices rss =
       TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
-    rs.init(conf, null, null, manager, null, mockPeer, rss, queueId,
-      uuid, p -> OptionalLong.empty(), new MetricsSource(queueId));
+    rs.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), queueId, uuid,
+      p -> OptionalLong.empty(), new MetricsSource(queueId));
     try {
       rs.startup();
       TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null);
@@ -264,9 +264,9 @@ public class TestReplicationSource {
       Configuration testConf = HBaseConfiguration.create();
       testConf.setInt("replication.source.maxretriesmultiplier", 1);
       ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
-      Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
-      source.init(testConf, null, null, manager, null, mockPeer, null, "testPeer",
-        null, p -> OptionalLong.empty(), null);
+      Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+      source.init(testConf, null, null, manager, null, mockPeer, null, null, "testPeer", null,
+        p -> OptionalLong.empty(), null);
       ExecutorService executor = Executors.newSingleThreadExecutor();
       Future<?> future = executor.submit(
         () -> source.terminate("testing source termination"));
@@ -289,7 +289,7 @@ public class TestReplicationSource {
     ReplicationPeer mockPeer = mock(ReplicationPeer.class);
     Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
     Configuration testConf = HBaseConfiguration.create();
-    source.init(testConf, null, null, mockManager, null, mockPeer, null,
+    source.init(testConf, null, null, mockManager, null, mockPeer, null, null,
       "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
     ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
       conf, null, 0, null, source, null);
@@ -315,7 +315,7 @@ public class TestReplicationSource {
     reader.addEntryToBatch(batch, mockEntry);
     reader.entryBatchQueue.put(batch);
     source.terminate("test");
-    assertEquals(0, source.controller.getTotalBufferUsed().get());
+    assertEquals(0, mockManager.getTotalBufferUsed().get());
   }
 
   /**
@@ -536,7 +536,7 @@ public class TestReplicationSource {
     String queueId = "qid";
     RegionServerServices rss =
       TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
-    rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, null,
+    rs.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), queueId, null,
       p -> OptionalLong.empty(), new MetricsSource(queueId));
     return rss;
   }
@@ -655,7 +655,7 @@ public class TestReplicationSource {
         TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
 
       ReplicationSource source = new ReplicationSource();
-      source.init(conf, null, null, manager, null, mockPeer, rss, id, null,
+      source.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), id, null,
         p -> OptionalLong.empty(), metrics);
 
       final Path log1 = new Path(logDir, "log-walgroup-a.8");
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 44914a5..e6b745e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -414,7 +414,8 @@ public abstract class TestReplicationSourceManager {
     assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
     ReplicationSourceInterface source = new ReplicationSource();
     source.init(conf, fs, null, manager, manager.getQueueStorage(), rp1.getPeer("1"),
-      manager.getServer(), id, null, p -> OptionalLong.empty(), null);
+      manager.getServer(), manager.getServer().getServerName(), id, null, p -> OptionalLong.empty(),
+      null);
     source.cleanOldWALs(file2, false);
     // log1 should be deleted
     assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
@@ -630,16 +631,16 @@ public abstract class TestReplicationSourceManager {
 
       ReplicationSourceInterface source = new ReplicationSource();
       source.init(conf, fs, null, manager, manager.getQueueStorage(),
-        mockReplicationPeerForSyncReplication(peerId2), manager.getServer(), peerId2, null,
-        p -> OptionalLong.empty(), null);
+        mockReplicationPeerForSyncReplication(peerId2), manager.getServer(),
+        manager.getServer().getServerName(), peerId2, null, p -> OptionalLong.empty(), null);
       source.cleanOldWALs(walName, true);
       // still there if peer id does not match
       assertTrue(fs.exists(remoteWAL));
 
       source = new ReplicationSource();
       source.init(conf, fs, null, manager, manager.getQueueStorage(),
-        mockReplicationPeerForSyncReplication(slaveId), manager.getServer(), slaveId, null,
-        p -> OptionalLong.empty(), null);
+        mockReplicationPeerForSyncReplication(slaveId), manager.getServer(),
+        manager.getServer().getServerName(), slaveId, null, p -> OptionalLong.empty(), null);
       source.cleanOldWALs(walName, true);
       assertFalse(fs.exists(remoteWAL));
     } finally {
@@ -819,9 +820,10 @@ public abstract class TestReplicationSourceManager {
 
     @Override
     public void init(Configuration conf, FileSystem fs, Path walDir,
-      ReplicationSourceController overallController, ReplicationQueueStorage rq, ReplicationPeer rp,
-      Server server, String peerClusterId, UUID clusterId,
-      WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
+      ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
+      ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId,
+      UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
+      throws IOException {
       throw new IOException("Failing deliberately");
     }
   }