You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by su...@apache.org on 2021/02/23 01:57:48 UTC

[hbase] 08/10: HBASE-24999 Master manages ReplicationServers (#2579)

This is an automated email from the ASF dual-hosted git repository.

sunxin pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit bd13d14bb59e5c4d1c69c19316f4bceb54ef7703
Author: XinSun <dd...@gmail.com>
AuthorDate: Wed Oct 28 18:59:57 2020 +0800

    HBASE-24999 Master manages ReplicationServers (#2579)
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
 .../server/master/ReplicationServerStatus.proto    |  34 ++++
 .../org/apache/hadoop/hbase/master/HMaster.java    |  10 +
 .../hadoop/hbase/master/MasterRpcServices.java     |  37 +++-
 .../apache/hadoop/hbase/master/MasterServices.java |   5 +
 .../hbase/master/ReplicationServerManager.java     | 204 ++++++++++++++++++++
 .../replication/HBaseReplicationEndpoint.java      | 148 ++++++--------
 .../hbase/replication/HReplicationServer.java      | 214 ++++++++++++++++++++-
 .../HBaseInterClusterReplicationEndpoint.java      |   1 -
 .../regionserver/ReplicationSyncUp.java            |   4 +-
 .../hbase/master/MockNoopMasterServices.java       |   5 +
 .../hbase/replication/TestReplicationBase.java     |   2 +
 .../hbase/replication/TestReplicationServer.java   |  57 +++++-
 12 files changed, 619 insertions(+), 102 deletions(-)

diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/ReplicationServerStatus.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/ReplicationServerStatus.proto
new file mode 100644
index 0000000..d39a043
--- /dev/null
+++ b/hbase-protocol-shaded/src/main/protobuf/server/master/ReplicationServerStatus.proto
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto2";
+
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
+option java_outer_classname = "ReplicationServerStatusProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "server/master/RegionServerStatus.proto";
+
+service ReplicationServerStatusService {
+
+  rpc ReplicationServerReport(RegionServerReportRequest)
+      returns(RegionServerReportResponse);
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 138a43f..5e8de56 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -303,6 +303,8 @@ public class HMaster extends HRegionServer implements MasterServices {
   // manager of assignment nodes in zookeeper
   private AssignmentManager assignmentManager;
 
+  // server manager to deal with replication server info
+  private ReplicationServerManager replicationServerManager;
 
   /**
    * Cache for the meta region replica's locations. Also tracks their changes to avoid stale
@@ -866,6 +868,8 @@ public class HMaster extends HRegionServer implements MasterServices {
         .collect(Collectors.toList());
     this.assignmentManager.setupRIT(ritList);
 
+    this.replicationServerManager = new ReplicationServerManager(this);
+
     // Start RegionServerTracker with listing of servers found with exiting SCPs -- these should
     // be registered in the deadServers set -- and with the list of servernames out on the
     // filesystem that COULD BE 'alive' (we'll schedule SCPs for each and let SCP figure it out).
@@ -1024,6 +1028,7 @@ public class HMaster extends HRegionServer implements MasterServices {
     this.hbckChore = new HbckChore(this);
     getChoreService().scheduleChore(hbckChore);
     this.serverManager.startChore();
+    this.replicationServerManager.startChore();
 
     // Only for rolling upgrade, where we need to migrate the data in namespace table to meta table.
     if (!waitForNamespaceOnline()) {
@@ -1283,6 +1288,11 @@ public class HMaster extends HRegionServer implements MasterServices {
   }
 
   @Override
+  public ReplicationServerManager getReplicationServerManager() {
+    return this.replicationServerManager;
+  }
+
+  @Override
   public MasterFileSystem getMasterFileSystem() {
     return this.fileSystemManager;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index df5f0b0..93f368f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -401,6 +401,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Trans
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerStatusProtos.ReplicationServerStatusService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsService;
 
@@ -412,7 +413,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.VisibilityLabelsProtos.
 public class MasterRpcServices extends RSRpcServices implements
     MasterService.BlockingInterface, RegionServerStatusService.BlockingInterface,
     LockService.BlockingInterface, HbckService.BlockingInterface,
-    ClientMetaService.BlockingInterface {
+    ClientMetaService.BlockingInterface, ReplicationServerStatusService.BlockingInterface {
 
   private static final Logger LOG = LoggerFactory.getLogger(MasterRpcServices.class.getName());
   private static final Logger AUDITLOG =
@@ -546,7 +547,7 @@ public class MasterRpcServices extends RSRpcServices implements
    */
   @Override
   protected List<BlockingServiceAndInterface> getServices() {
-    List<BlockingServiceAndInterface> bssi = new ArrayList<>(5);
+    List<BlockingServiceAndInterface> bssi = new ArrayList<>(6);
     bssi.add(new BlockingServiceAndInterface(
         MasterService.newReflectiveBlockingService(this),
         MasterService.BlockingInterface.class));
@@ -559,6 +560,9 @@ public class MasterRpcServices extends RSRpcServices implements
         HbckService.BlockingInterface.class));
     bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this),
         ClientMetaService.BlockingInterface.class));
+    bssi.add(new BlockingServiceAndInterface(
+        ReplicationServerStatusService.newReflectiveBlockingService(this),
+        ReplicationServerStatusService.BlockingInterface.class));
     bssi.addAll(super.getServices());
     return bssi;
   }
@@ -3414,4 +3418,33 @@ public class MasterRpcServices extends RSRpcServices implements
     }
     return builder.build();
   }
+
+  @Override
+  public RegionServerReportResponse replicationServerReport(RpcController controller,
+      RegionServerReportRequest request) throws ServiceException {
+    try {
+      master.checkServiceStarted();
+      int versionNumber = 0;
+      String version = "0.0.0";
+      VersionInfo versionInfo = VersionInfoUtil.getCurrentClientVersionInfo();
+      if (versionInfo != null) {
+        version = versionInfo.getVersion();
+        versionNumber = VersionInfoUtil.getVersionNumber(versionInfo);
+      }
+      ClusterStatusProtos.ServerLoad sl = request.getLoad();
+      ServerName serverName = ProtobufUtil.toServerName(request.getServer());
+      ServerMetrics oldMetrics = master.getReplicationServerManager().getServerMetrics(serverName);
+      ServerMetrics newMetrics =
+          ServerMetricsBuilder.toServerMetrics(serverName, versionNumber, version, sl);
+      master.getReplicationServerManager().serverReport(serverName, newMetrics);
+      if (sl != null && master.metricsMaster != null) {
+        // Up our metrics.
+        master.metricsMaster.incrementRequests(sl.getTotalNumberOfRequests()
+            - (oldMetrics != null ? oldMetrics.getRequestCount() : 0));
+      }
+    } catch (IOException ioe) {
+      throw new ServiceException(ioe);
+    }
+    return RegionServerReportResponse.newBuilder().build();
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 3f7dc02..bb8fdca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -103,6 +103,11 @@ public interface MasterServices extends Server {
   ServerManager getServerManager();
 
   /**
+   * @return Master's {@link ReplicationServerManager} instance.
+   */
+  ReplicationServerManager getReplicationServerManager();
+
+  /**
    * @return Master's instance of {@link ExecutorService}
    */
   ExecutorService getExecutorService();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ReplicationServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ReplicationServerManager.java
new file mode 100644
index 0000000..273b7f2
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ReplicationServerManager.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.ServerMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The ReplicationServerManager class manages info about replication servers.
+ * <p>
+ * Maintains lists of online and dead servers.
+ * <p>
+ * Servers are distinguished in two different ways.  A given server has a
+ * location, specified by hostname and port, and of which there can only be one
+ * online at any given time.  A server instance is specified by the location
+ * (hostname and port) as well as the startcode (timestamp from when the server
+ * was started).  This is used to differentiate a restarted instance of a given
+ * server from the original instance.
+ */
+@InterfaceAudience.Private
+public class ReplicationServerManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReplicationServerManager.class);
+
+  public static final String ONLINE_SERVER_REFRESH_INTERVAL =
+      "hbase.master.replication.server.refresh.interval";
+  public static final int ONLINE_SERVER_REFRESH_INTERVAL_DEFAULT = 60 * 1000; // 1 mins
+
+  private final MasterServices master;
+
+  /** Map of registered servers to their current load */
+  private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
+    new ConcurrentSkipListMap<>();
+
+  private OnlineServerRefresher onlineServerRefresher;
+  private int refreshPeriod;
+
+  /**
+   * Constructor.
+   */
+  public ReplicationServerManager(final MasterServices master) {
+    this.master = master;
+  }
+
+  /**
+   * start chore in ServerManager
+   */
+  public void startChore() {
+    Configuration conf = master.getConfiguration();
+    refreshPeriod = conf.getInt(ONLINE_SERVER_REFRESH_INTERVAL,
+        ONLINE_SERVER_REFRESH_INTERVAL_DEFAULT);
+    onlineServerRefresher = new OnlineServerRefresher("ReplicationServerRefresher", refreshPeriod);
+    master.getChoreService().scheduleChore(onlineServerRefresher);
+  }
+
+  /**
+   * Stop the ServerManager.
+   */
+  public void stop() {
+    if (onlineServerRefresher != null) {
+      onlineServerRefresher.cancel();
+    }
+  }
+
+  public void serverReport(ServerName sn, ServerMetrics sl) {
+    if (null == this.onlineServers.replace(sn, sl)) {
+      if (!checkAndRecordNewServer(sn, sl)) {
+        LOG.info("ReplicationServerReport ignored, could not record the server: {}", sn);
+      }
+    }
+  }
+
+  /**
+   * Check is a server of same host and port already exists,
+   * if not, or the existed one got a smaller start code, record it.
+   *
+   * @param serverName the server to check and record
+   * @param sl the server load on the server
+   * @return true if the server is recorded, otherwise, false
+   */
+  private boolean checkAndRecordNewServer(final ServerName serverName, final ServerMetrics sl) {
+    ServerName existingServer = null;
+    synchronized (this.onlineServers) {
+      existingServer = findServerWithSameHostnamePort(serverName);
+      if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
+        LOG.info("ReplicationServer serverName={} rejected; we already have {} registered with "
+          + "same hostname and port", serverName, existingServer);
+        return false;
+      }
+      recordNewServer(serverName, sl);
+      // Note that we assume that same ts means same server, and don't expire in that case.
+      if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
+        LOG.info("Triggering server recovery; existingServer {} looks stale, new server: {}",
+            existingServer, serverName);
+        expireServer(existingServer);
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Assumes onlineServers is locked.
+   * @return ServerName with matching hostname and port.
+   */
+  private ServerName findServerWithSameHostnamePort(final ServerName serverName) {
+    ServerName end = ServerName.valueOf(serverName.getHostname(), serverName.getPort(),
+      Long.MAX_VALUE);
+
+    ServerName r = onlineServers.lowerKey(end);
+    if (r != null && ServerName.isSameAddress(r, serverName)) {
+      return r;
+    }
+    return null;
+  }
+
+  /**
+   * Assumes onlineServers is locked.
+   */
+  private void recordNewServer(final ServerName serverName, final ServerMetrics sl) {
+    LOG.info("Registering ReplicationServer={}", serverName);
+    this.onlineServers.put(serverName, sl);
+  }
+
+  /**
+   * Assumes onlineServers is locked.
+   * Expire the passed server. Remove it from list of online servers
+   */
+  public void expireServer(final ServerName serverName) {
+    LOG.info("Expiring ReplicationServer={}", serverName);
+    onlineServers.remove(serverName);
+  }
+
+  /**
+   * @return Read-only map of servers to serverinfo
+   */
+  public Map<ServerName, ServerMetrics> getOnlineServers() {
+    // Presumption is that iterating the returned Map is OK.
+    synchronized (this.onlineServers) {
+      return Collections.unmodifiableMap(this.onlineServers);
+    }
+  }
+
+  /**
+   * @return A copy of the internal list of online servers.
+   */
+  public List<ServerName> getOnlineServersList() {
+    return new ArrayList<>(this.onlineServers.keySet());
+  }
+
+  /**
+   * @param serverName server name
+   * @return ServerMetrics if serverName is known else null
+   */
+  public ServerMetrics getServerMetrics(final ServerName serverName) {
+    return this.onlineServers.get(serverName);
+  }
+
+  private class OnlineServerRefresher extends ScheduledChore {
+
+    public OnlineServerRefresher(String name, int p) {
+      super(name, master, p, 60 * 1000); // delay one minute before first execute
+    }
+
+    @Override
+    protected void chore() {
+      synchronized (onlineServers) {
+        List<ServerName> servers = getOnlineServersList();
+        servers.forEach(s -> {
+          ServerMetrics metrics = onlineServers.get(s);
+          if (metrics.getReportTimestamp() + refreshPeriod < System.currentTimeMillis()) {
+            expireServer(s);
+          }
+        });
+      }
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
index e788d8c..115df76 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
@@ -18,8 +18,8 @@
 
 package org.apache.hadoop.hbase.replication;
 
-import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT;
-import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -32,16 +32,15 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
 import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
-import org.apache.hadoop.hbase.ScheduledChore;
-import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@@ -67,6 +66,13 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterServ
 /**
  * A {@link BaseReplicationEndpoint} for replication endpoints whose
  * target cluster is an HBase cluster.
+ * <p>
+ * Compatible with two implementations to fetch sink servers, fetching replication servers by
+ * accessing master and fetching region servers by listening to ZK.
+ * Give priority to fetch replication servers as sink servers by accessing master. if slave cluster
+ * isn't supported(version < 3.x) or exceptions occur, fetch region servers as sink servers via ZK.
+ * So we always register ZK listener, but ignored the ZK event if replication servers are available.
+ * </p>
  */
 @InterfaceAudience.Private
 public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
@@ -74,9 +80,6 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
 
   private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class);
 
-  public static final String FETCH_SERVERS_USE_ZK_CONF_KEY =
-      "hbase.replication.fetch.servers.usezk";
-
   public static final String FETCH_SERVERS_INTERVAL_CONF_KEY =
       "hbase.replication.fetch.servers.interval";
   public static final int DEFAULT_FETCH_SERVERS_INTERVAL = 10 * 60 * 1000; // 10 mins
@@ -112,10 +115,9 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
 
   private List<ServerName> sinkServers = new ArrayList<>(0);
 
-  private AsyncClusterConnection peerConnection;
-  private boolean fetchServersUseZk = false;
+  private volatile boolean fetchServersUseZk = false;
   private FetchServersChore fetchServersChore;
-  private int shortOperationTimeout;
+  private int operationTimeout;
 
   /*
    * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
@@ -136,6 +138,8 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
     this.badSinkThreshold =
       ctx.getConfiguration().getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD);
     this.badReportCounts = Maps.newHashMap();
+    this.operationTimeout = ctx.getLocalConfiguration().getInt(
+        HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
   }
 
   protected void disconnect() {
@@ -144,20 +148,12 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
         zkw.close();
       }
     }
-    if (this.conn != null) {
-      try {
-        this.conn.close();
-        this.conn = null;
-      } catch (IOException e) {
-        LOG.warn("{} Failed to close the connection", ctx.getPeerId());
-      }
-    }
     if (fetchServersChore != null) {
       fetchServersChore.cancel();
     }
-    if (peerConnection != null) {
+    if (conn != null) {
       try {
-        peerConnection.close();
+        conn.close();
       } catch (IOException e) {
         LOG.warn("Attempt to close peerConnection failed.", e);
       }
@@ -192,27 +188,10 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
   }
 
   @Override
-  protected synchronized void doStart() {
-    this.shortOperationTimeout = ctx.getLocalConfiguration().getInt(
-        HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
+  protected void doStart() {
     try {
-      if (ctx.getLocalConfiguration().getBoolean(FETCH_SERVERS_USE_ZK_CONF_KEY, false)) {
-        fetchServersUseZk = true;
-      } else {
-        try {
-          if (ReplicationUtils.isPeerClusterSupportReplicationOffload(getPeerConnection())) {
-            fetchServersChore = new FetchServersChore(ctx.getServer(), this);
-            ctx.getServer().getChoreService().scheduleChore(fetchServersChore);
-            fetchServersUseZk = false;
-          } else {
-            fetchServersUseZk = true;
-          }
-        } catch (Throwable t) {
-          fetchServersUseZk = true;
-          LOG.warn("Peer {} try to fetch servers by admin failed. Using zk impl.",
-              ctx.getPeerId(), t);
-        }
-      }
+      fetchServersChore = new FetchServersChore(ctx.getServer(), this);
+      ctx.getServer().getChoreService().scheduleChore(fetchServersChore);
       reloadZkWatcher();
       connectPeerCluster();
       notifyStarted();
@@ -255,9 +234,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
       }
       zkw = new ZKWatcher(ctx.getConfiguration(),
           "connection to cluster: " + ctx.getPeerId(), this);
-      if (fetchServersUseZk) {
-        zkw.registerListener(new PeerRegionServerListener(this));
-      }
+      zkw.registerListener(new PeerRegionServerListener(this));
     }
   }
 
@@ -283,38 +260,25 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
   }
 
   /**
-   * Get the connection to peer cluster
-   * @return connection to peer cluster
-   * @throws IOException If anything goes wrong connecting
-   */
-  private synchronized AsyncClusterConnection getPeerConnection() throws IOException {
-    if (peerConnection == null) {
-      Configuration conf = ctx.getConfiguration();
-      peerConnection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null,
-          UserProvider.instantiate(conf).getCurrent());
-    }
-    return peerConnection;
-  }
-
-  /**
    * Get the list of all the servers that are responsible for replication sink
    * from the specified peer master
-   * @return list of server addresses or an empty list if the slave is unavailable
+   * @return list of server addresses
    */
-  protected List<ServerName> fetchSlavesAddresses() {
+  protected List<ServerName> fetchSlavesAddresses() throws IOException {
     try {
-      AsyncClusterConnection peerConn = getPeerConnection();
-      ServerName master = FutureUtils.get(peerConn.getAdmin().getMaster());
+      ServerName master = FutureUtils.get(conn.getAdmin().getMaster());
       MasterService.BlockingInterface masterStub = MasterService.newBlockingStub(
-        peerConn.getRpcClient()
-          .createBlockingRpcChannel(master, User.getCurrent(), shortOperationTimeout));
+        conn.getRpcClient().createBlockingRpcChannel(master, User.getCurrent(), operationTimeout));
       ListReplicationSinkServersResponse resp = masterStub
         .listReplicationSinkServers(null, ListReplicationSinkServersRequest.newBuilder().build());
       return ProtobufUtil.toServerNameList(resp.getServerNameList());
-    } catch (ServiceException | IOException e) {
+    } catch (ServiceException e) {
       LOG.error("Peer {} fetches servers failed", ctx.getPeerId(), e);
+      throw ProtobufUtil.getRemoteException(e);
+    } catch (IOException e) {
+      LOG.error("Peer {} fetches servers failed", ctx.getPeerId(), e);
+      throw e;
     }
-    return Collections.emptyList();
   }
 
   /**
@@ -344,20 +308,34 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
     return addresses;
   }
 
-  protected synchronized void chooseSinks() {
-    List<ServerName> slaveAddresses = Collections.emptyList();
-    if (fetchServersUseZk) {
+  protected void chooseSinks() {
+    List<ServerName> slaveAddresses = Collections.EMPTY_LIST;
+    boolean useZk = fetchServersUseZk;
+    try {
+      if (!useZk || ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
+        useZk = false;
+        slaveAddresses = fetchSlavesAddresses();
+      } else {
+        useZk = true;
+      }
+    } catch (Throwable t) {
+      LOG.warn("Peer {} try to fetch servers by admin failed. Using zk impl.", ctx.getPeerId(), t);
+      useZk = true;
+    }
+    if (useZk) {
       slaveAddresses = fetchSlavesAddressesByZK();
-    } else {
-      slaveAddresses = fetchSlavesAddresses();
     }
+
     if (slaveAddresses.isEmpty()) {
       LOG.warn("No sinks available at peer. Will not be able to replicate");
     }
     Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
     int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
-    this.sinkServers = slaveAddresses.subList(0, numSinks);
-    badReportCounts.clear();
+    synchronized (this) {
+      this.fetchServersUseZk = useZk;
+      this.sinkServers = slaveAddresses.subList(0, numSinks);
+      badReportCounts.clear();
+    }
   }
 
   protected synchronized int getNumSinks() {
@@ -368,16 +346,18 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
    * Get a randomly-chosen replication sink to replicate to.
    * @return a replication sink to replicate to
    */
-  protected synchronized SinkPeer getReplicationSink() throws IOException {
-    if (sinkServers.isEmpty()) {
-      LOG.info("Current list of sinks is out of date or empty, updating");
-      chooseSinks();
-    }
-    if (sinkServers.isEmpty()) {
-      throw new IOException("No replication sinks are available");
+  protected SinkPeer getReplicationSink() throws IOException {
+    ServerName serverName;
+    synchronized (this) {
+      if (sinkServers.isEmpty()) {
+        LOG.info("Current list of sinks is out of date or empty, updating");
+        chooseSinks();
+      }
+      if (sinkServers.isEmpty()) {
+        throw new IOException("No replication sinks are available");
+      }
+      serverName = sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size()));
     }
-    ServerName serverName =
-      sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size()));
     return createSinkPeer(serverName);
   }
 
@@ -438,7 +418,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
 
     @Override
     public synchronized void nodeChildrenChanged(String path) {
-      if (path.equals(regionServerListNode)) {
+      if (replicationEndpoint.fetchServersUseZk && path.equals(regionServerListNode)) {
         LOG.info("Detected change to peer region servers, fetching updated list");
         replicationEndpoint.chooseSinks();
       }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
index 31dec0c..4b53bb7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.io.IOException;
+import java.lang.management.MemoryUsage;
 import java.net.InetSocketAddress;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -28,22 +29,37 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.YouAreDeadException;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.regionserver.ReplicationService;
 import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.trace.TraceUtil;
 import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerStatusProtos.ReplicationServerStatusService;
+
 /**
  * HReplicationServer which is responsible to all replication stuff. It checks in with
  * the HMaster. There are many HReplicationServers in a single HBase deployment.
@@ -86,9 +102,14 @@ public class HReplicationServer extends Thread implements Server {
   // A sleeper that sleeps for msgInterval.
   protected final Sleeper sleeper;
 
+  private final int shortOperationTimeout;
+
   // zookeeper connection and watcher
   protected final ZKWatcher zooKeeper;
 
+  // master address tracker
+  private final MasterAddressTracker masterAddressTracker;
+
   /**
    * The asynchronous cluster connection to be shared by services.
    */
@@ -98,6 +119,17 @@ public class HReplicationServer extends Thread implements Server {
 
   protected final ReplicationServerRpcServices rpcServices;
 
+  // Stub to do region server status calls against the master.
+  private volatile ReplicationServerStatusService.BlockingInterface rssStub;
+
+  // RPC client. Used to make the stub above that does region server status checking.
+  private RpcClient rpcClient;
+
+  /**
+   * ChoreService used to schedule tasks that we want to run periodically
+   */
+  private ChoreService choreService;
+
   public HReplicationServer(final Configuration conf) throws IOException {
     TraceUtil.initTracer(conf);
     try {
@@ -116,16 +148,22 @@ public class HReplicationServer extends Thread implements Server {
       this.msgInterval = conf.getInt("hbase.replicationserver.msginterval", 3 * 1000);
       this.sleeper = new Sleeper(this.msgInterval, this);
 
+      this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
+          HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
+
       // Some unit tests don't need a cluster, so no zookeeper at all
       if (!conf.getBoolean("hbase.testing.nocluster", false)) {
         // Open connection to zookeeper and set primary watcher
         zooKeeper = new ZKWatcher(conf, getProcessName() + ":" +
             rpcServices.isa.getPort(), this, false);
+        masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
+        masterAddressTracker.start();
       } else {
         zooKeeper = null;
+        masterAddressTracker = null;
       }
-
       this.rpcServices.start(zooKeeper);
+      this.choreService = new ChoreService(getName(), true);
     } catch (Throwable t) {
       // Make sure we log the exception. HReplicationServer is often started via reflection and the
       // cause of failed startup is lost.
@@ -150,6 +188,7 @@ public class HReplicationServer extends Thread implements Server {
     } catch (Throwable e) {
       abort("Fatal exception during initialization", e);
     }
+
     try {
       setupReplication();
       startReplicationService();
@@ -161,6 +200,7 @@ public class HReplicationServer extends Thread implements Server {
       while (!isStopped()) {
         long now = System.currentTimeMillis();
         if ((now - lastMsg) >= msgInterval) {
+          tryReplicationServerReport(lastMsg, now);
           lastMsg = System.currentTimeMillis();
         }
         if (!isStopped() && !isAborted()) {
@@ -177,6 +217,22 @@ public class HReplicationServer extends Thread implements Server {
       abort(t.getMessage(), t);
     }
 
+    if (this.asyncClusterConnection != null) {
+      try {
+        this.asyncClusterConnection.close();
+      } catch (IOException e) {
+        // Although the {@link Closeable} interface throws an {@link
+        // IOException}, in reality, the implementation would never do that.
+        LOG.warn("Attempt to close server's AsyncClusterConnection failed.", e);
+      }
+    }
+    if (rssStub != null) {
+      rssStub = null;
+    }
+    if (rpcClient != null) {
+      this.rpcClient.close();
+    }
+
     if (this.zooKeeper != null) {
       this.zooKeeper.close();
     }
@@ -204,11 +260,13 @@ public class HReplicationServer extends Thread implements Server {
   private void preRegistrationInitialization() {
     try {
       setupClusterConnection();
+      // Setup RPC client for master communication
+      this.rpcClient = asyncClusterConnection.getRpcClient();
     } catch (Throwable t) {
       // Call stop if error or process will stick around for ever since server
       // puts up non-daemon threads.
       this.rpcServices.stop();
-      abort("Initialization of RS failed.  Hence aborting RS.", t);
+      abort("Initialization of ReplicationServer failed. Hence aborting ReplicationServer.", t);
     }
   }
 
@@ -272,7 +330,7 @@ public class HReplicationServer extends Thread implements Server {
 
   @Override
   public ChoreService getChoreService() {
-    return null;
+    return this.choreService;
   }
 
   @Override
@@ -329,7 +387,7 @@ public class HReplicationServer extends Thread implements Server {
       throws IOException {
     // read in the name of the sink replication class from the config file.
     String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
-        HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
+        HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT);
 
     server.replicationSinkService = newReplicationInstance(sinkClassname,
         ReplicationSinkService.class, conf, server);
@@ -388,4 +446,152 @@ public class HReplicationServer extends Thread implements Server {
   protected boolean setAbortRequested() {
     return abortRequested.compareAndSet(false, true);
   }
+
+  private void tryReplicationServerReport(long reportStartTime, long reportEndTime)
+      throws IOException {
+    ReplicationServerStatusService.BlockingInterface rss = rssStub;
+    if (rss == null) {
+      ServerName masterServerName = createReplicationServerStatusStub(true);
+      rss = rssStub;
+      if (masterServerName == null || rss == null) {
+        return;
+      }
+    }
+    ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
+    try {
+      RegionServerReportRequest.Builder request = RegionServerReportRequest
+          .newBuilder();
+      request.setServer(ProtobufUtil.toServerName(this.serverName));
+      request.setLoad(sl);
+      rss.replicationServerReport(null, request.build());
+    } catch (ServiceException se) {
+      IOException ioe = ProtobufUtil.getRemoteException(se);
+      if (ioe instanceof YouAreDeadException) {
+        // This will be caught and handled as a fatal error in run()
+        throw ioe;
+      }
+      if (rssStub == rss) {
+        rssStub = null;
+      }
+      // Couldn't connect to the master, get location from zk and reconnect
+      // Method blocks until new master is found or we are stopped
+      createReplicationServerStatusStub(true);
+    }
+  }
+
+  private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) {
+    long usedMemory = -1L;
+    long maxMemory = -1L;
+    final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage();
+    if (usage != null) {
+      usedMemory = usage.getUsed();
+      maxMemory = usage.getMax();
+    }
+
+    ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder();
+    serverLoad.setTotalNumberOfRequests(rpcServices.requestCount.sum());
+    serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024));
+    serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
+
+    serverLoad.setReportStartTime(reportStartTime);
+    serverLoad.setReportEndTime(reportEndTime);
+
+    // for the replicationLoad purpose. Only need to get from one executorService
+    // either source or sink will get the same info
+    ReplicationSinkService sinks = getReplicationSinkService();
+
+    if (sinks != null) {
+      // always refresh first to get the latest value
+      ReplicationLoad rLoad = sinks.refreshAndGetReplicationLoad();
+      if (rLoad != null) {
+        serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
+      }
+    }
+    return serverLoad.build();
+  }
+
+  /**
+   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
+   * connection, the current rssStub must be null. Method will block until a master is available.
+   * You can break from this block by requesting the server stop.
+   * @param refresh If true then master address will be read from ZK, otherwise use cached data
+   * @return master + port, or null if server has been stopped
+   */
+  private synchronized ServerName createReplicationServerStatusStub(boolean refresh) {
+    if (rssStub != null) {
+      return masterAddressTracker.getMasterAddress();
+    }
+    ServerName sn = null;
+    long previousLogTime = 0;
+    ReplicationServerStatusService.BlockingInterface intRssStub = null;
+    boolean interrupted = false;
+    try {
+      while (keepLooping()) {
+        sn = this.masterAddressTracker.getMasterAddress(refresh);
+        if (sn == null) {
+          if (!keepLooping()) {
+            // give up with no connection.
+            LOG.debug("No master found and cluster is stopped; bailing out");
+            return null;
+          }
+          if (System.currentTimeMillis() > (previousLogTime + 1000)) {
+            LOG.debug("No master found; retry");
+            previousLogTime = System.currentTimeMillis();
+          }
+          refresh = true; // let's try pull it from ZK directly
+          if (sleepInterrupted(200)) {
+            interrupted = true;
+          }
+          continue;
+        }
+
+        try {
+          BlockingRpcChannel channel =
+              this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(),
+                  shortOperationTimeout);
+          intRssStub = ReplicationServerStatusService.newBlockingStub(channel);
+          break;
+        } catch (IOException e) {
+          if (System.currentTimeMillis() > (previousLogTime + 1000)) {
+            e = e instanceof RemoteException ?
+                ((RemoteException)e).unwrapRemoteException() : e;
+            if (e instanceof ServerNotRunningYetException) {
+              LOG.info("Master isn't available yet, retrying");
+            } else {
+              LOG.warn("Unable to connect to master. Retrying. Error was:", e);
+            }
+            previousLogTime = System.currentTimeMillis();
+          }
+          if (sleepInterrupted(200)) {
+            interrupted = true;
+          }
+        }
+      }
+    } finally {
+      if (interrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+    this.rssStub = intRssStub;
+    return sn;
+  }
+
+  /**
+   * @return True if we should break loop because cluster is going down or
+   *   this server has been stopped or hdfs has gone bad.
+   */
+  private boolean keepLooping() {
+    return !this.stopped;
+  }
+
+  private static boolean sleepInterrupted(long millis) {
+    boolean interrupted = false;
+    try {
+      Thread.sleep(millis);
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while sleeping");
+      interrupted = true;
+    }
+    return interrupted;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index d8517b0..f0448b1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.ipc.RpcServer;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index a43be29..55b20c6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -104,11 +104,13 @@ public class ReplicationSyncUp extends Configured implements Tool {
   class DummyServer implements Server {
     String hostname;
     ZKWatcher zkw;
+    ChoreService choreService;
 
     DummyServer(ZKWatcher zkw) {
       // a unique name in case the first run fails
       hostname = System.currentTimeMillis() + ".SyncUpTool.replication.org";
       this.zkw = zkw;
+      this.choreService = new ChoreService("ReplicationSyncUpDummyServer", true);
     }
 
     DummyServer(String hostname) {
@@ -160,7 +162,7 @@ public class ReplicationSyncUp extends Configured implements Tool {
 
     @Override
     public ChoreService getChoreService() {
-      return null;
+      return choreService;
     }
 
     @Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index ed2edb0..52ca278 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -154,6 +154,11 @@ public class MockNoopMasterServices implements MasterServices {
   }
 
   @Override
+  public ReplicationServerManager getReplicationServerManager() {
+    return null;
+  }
+
+  @Override
   public ZKWatcher getZooKeeper() {
     return null;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index eca0d67..78fec0e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -198,6 +199,7 @@ public class TestReplicationBase {
     conf.setFloat("replication.source.ratio", 1.0f);
     conf.setBoolean("replication.source.eof.autorecovery", true);
     conf.setLong("hbase.serial.replication.waiting.ms", 100);
+    conf.setLong(HBASE_CLIENT_OPERATION_TIMEOUT, 5000);
   }
 
   static void configureClusters(HBaseTestingUtility util1,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
index 0ef23f2..30660c6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
@@ -17,7 +17,10 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
+import static org.apache.hadoop.hbase.master.ReplicationServerManager.ONLINE_SERVER_REFRESH_INTERVAL;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -38,7 +41,9 @@ import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.ReplicationServerManager;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.ReplicationServerSinkPeer;
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.SinkPeer;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -74,6 +79,7 @@ public class TestReplicationServer {
   private static HMaster MASTER;
 
   private static HReplicationServer replicationServer;
+  private static ServerName replicationServerName;
 
   private static Path baseNamespaceDir;
   private static Path hfileArchiveDir;
@@ -86,14 +92,11 @@ public class TestReplicationServer {
 
   @BeforeClass
   public static void beforeClass() throws Exception {
+    CONF.setLong(HBASE_CLIENT_OPERATION_TIMEOUT, 1000);
+    CONF.setLong(ONLINE_SERVER_REFRESH_INTERVAL, 10000);
     TEST_UTIL.startMiniCluster();
     MASTER = TEST_UTIL.getMiniHBaseCluster().getMaster();
-
-    replicationServer = new HReplicationServer(CONF);
-    replicationServer.start();
-
     TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster();
-    TEST_UTIL.waitFor(60000, () -> replicationServer.isOnline());
 
     Path rootDir = CommonFSUtils.getRootDir(CONF);
     baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR));
@@ -108,6 +111,11 @@ public class TestReplicationServer {
 
   @Before
   public void before() throws Exception {
+    replicationServer = new HReplicationServer(CONF);
+    replicationServer.start();
+    TEST_UTIL.waitFor(60000, () -> replicationServer.isOnline());
+    replicationServerName = replicationServer.getServerName();
+
     TEST_UTIL.createTable(TABLENAME, FAMILY);
     TEST_UTIL.waitTableAvailable(TABLENAME);
   }
@@ -115,6 +123,11 @@ public class TestReplicationServer {
   @After
   public void after() throws IOException {
     TEST_UTIL.deleteTableIfAny(TABLENAME);
+    if (!replicationServer.isStopped()) {
+      replicationServer.stop("test");
+    }
+    replicationServer = null;
+    replicationServerName = null;
   }
 
   /**
@@ -125,10 +138,10 @@ public class TestReplicationServer {
     AsyncClusterConnection conn =
         TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection();
     AsyncReplicationServerAdmin replAdmin =
-        conn.getReplicationServerAdmin(replicationServer.getServerName());
+        conn.getReplicationServerAdmin(replicationServerName);
 
     ReplicationServerSinkPeer sinkPeer =
-        new ReplicationServerSinkPeer(replicationServer.getServerName(), replAdmin);
+        new ReplicationServerSinkPeer(replicationServerName, replAdmin);
     replicateWALEntryAndVerify(sinkPeer);
   }
 
@@ -143,12 +156,11 @@ public class TestReplicationServer {
         .getRegionServer().getServerName();
     AsyncReplicationServerAdmin replAdmin = conn.getReplicationServerAdmin(rs);
 
-    ReplicationServerSinkPeer
-      sinkPeer = new ReplicationServerSinkPeer(rs, replAdmin);
+    ReplicationServerSinkPeer sinkPeer = new ReplicationServerSinkPeer(rs, replAdmin);
     replicateWALEntryAndVerify(sinkPeer);
   }
 
-  private void replicateWALEntryAndVerify(ReplicationServerSinkPeer sinkPeer) throws Exception {
+  private void replicateWALEntryAndVerify(SinkPeer sinkPeer) throws Exception {
     Entry[] entries = new Entry[BATCH_SIZE];
     for(int i = 0; i < BATCH_SIZE; i++) {
       entries[i] = generateEdit(i, TABLENAME, Bytes.toBytes(i));
@@ -175,4 +187,29 @@ public class TestReplicationServer {
     edit.add(new KeyValue(row, Bytes.toBytes(FAMILY), Bytes.toBytes(FAMILY), timestamp, row));
     return new WAL.Entry(key, edit);
   }
+
+  @Test
+  public void testReplicationServerReport() throws Exception {
+    ReplicationServerManager replicationServerManager = MASTER.getReplicationServerManager();
+    assertNotNull(replicationServerManager);
+    TEST_UTIL.waitFor(60000, () -> !replicationServerManager.getOnlineServers().isEmpty()
+        && null != replicationServerManager.getServerMetrics(replicationServerName));
+    // put data via replication server
+    testReplicateWAL();
+    TEST_UTIL.waitFor(60000, () -> replicationServer.rpcServices.requestCount.sum() > 0
+        && replicationServer.rpcServices.requestCount.sum() == replicationServerManager
+        .getServerMetrics(replicationServerName).getRequestCount());
+  }
+
+  @Test
+  public void testReplicationServerExpire() throws Exception {
+    ReplicationServerManager replicationServerManager = MASTER.getReplicationServerManager();
+    TEST_UTIL.waitFor(60000, () -> !replicationServerManager.getOnlineServers().isEmpty()
+        && null != replicationServerManager.getServerMetrics(replicationServerName));
+
+    replicationServer.stop("test");
+
+    TEST_UTIL.waitFor(180000, 1000, replicationServerManager.getOnlineServers()::isEmpty);
+    assertNull(replicationServerManager.getServerMetrics(replicationServerName));
+  }
 }