You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/09/23 00:36:13 UTC

[hbase] branch master updated: HBASE-25074 Refactor ReplicationSinkManager: reduce code and make it easy to understand (#2430)

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e910a5  HBASE-25074 Refactor ReplicationSinkManager: reduce code and make it easy to understand (#2430)
7e910a5 is described below

commit 7e910a573f30a9995cb779fa55a6911629ac2e5f
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Wed Sep 23 08:30:43 2020 +0800

    HBASE-25074 Refactor ReplicationSinkManager: reduce code and make it easy to understand (#2430)
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../replication/HBaseReplicationEndpoint.java      | 215 +++++++++++++++------
 .../HBaseInterClusterReplicationEndpoint.java      |  51 +----
 .../regionserver/ReplicationSinkManager.java       | 193 ------------------
 .../replication/TestHBaseReplicationEndpoint.java  | 210 ++++++++++++++++++++
 .../regionserver/TestReplicationSinkManager.java   | 210 --------------------
 .../TestSerialReplicationEndpoint.java             |  10 +-
 6 files changed, 382 insertions(+), 507 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
index 3cde0d5..850a791 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
@@ -22,8 +22,16 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.zookeeper.ZKListener;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.Abortable;
@@ -38,6 +46,9 @@ import org.apache.zookeeper.KeeperException.SessionExpiredException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+
 /**
  * A {@link BaseReplicationEndpoint} for replication endpoints whose
  * target cluster is an HBase cluster.
@@ -50,8 +61,58 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
 
   private ZKWatcher zkw = null;
 
-  private List<ServerName> regionServers = new ArrayList<>(0);
-  private long lastRegionServerUpdate;
+  protected Configuration conf;
+
+  protected AsyncClusterConnection conn;
+
+  /**
+   * Default maximum number of times a replication sink can be reported as bad before
+   * it will no longer be provided as a sink for replication without the pool of
+   * replication sinks being refreshed.
+   */
+  public static final int DEFAULT_BAD_SINK_THRESHOLD = 3;
+
+  /**
+   * Default ratio of the total number of peer cluster region servers to consider
+   * replicating to.
+   */
+  public static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f;
+
+  // Ratio of total number of potential peer region servers to be used
+  private float ratio;
+
+  // Maximum number of times a sink can be reported as bad before the pool of
+  // replication sinks is refreshed
+  private int badSinkThreshold;
+  // Count of "bad replication sink" reports per peer sink
+  private Map<ServerName, Integer> badReportCounts;
+
+  private List<ServerName> sinkServers = new ArrayList<>(0);
+
+  /*
+   * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
+   * Connection implementations, or initialize it in a different way, so defining createConnection
+   * as protected for possible overridings.
+   */
+  protected AsyncClusterConnection createConnection(Configuration conf) throws IOException {
+    return ClusterConnectionFactory.createAsyncClusterConnection(conf,
+      null, User.getCurrent());
+  }
+
+  @Override
+  public void init(Context context) throws IOException {
+    super.init(context);
+    this.conf = HBaseConfiguration.create(ctx.getConfiguration());
+    // TODO: This connection is replication specific or we should make it particular to
+    // replication and make replication specific settings such as compression or codec to use
+    // passing Cells.
+    this.conn = createConnection(this.conf);
+    this.ratio =
+      ctx.getConfiguration().getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
+    this.badSinkThreshold =
+      ctx.getConfiguration().getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD);
+    this.badReportCounts = Maps.newHashMap();
+  }
 
   protected synchronized void disconnect() {
     if (zkw != null) {
@@ -63,7 +124,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
    * A private method used to re-establish a zookeeper session with a peer cluster.
    * @param ke
    */
-  protected void reconnect(KeeperException ke) {
+  private void reconnect(KeeperException ke) {
     if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
         || ke instanceof AuthFailedException) {
       String clusterKey = ctx.getPeerConfig().getClusterKey();
@@ -118,22 +179,16 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
   }
 
   /**
-   * Get the ZK connection to this peer
-   * @return zk connection
-   */
-  protected synchronized ZKWatcher getZkw() {
-    return zkw;
-  }
-
-  /**
    * Closes the current ZKW (if not null) and creates a new one
    * @throws IOException If anything goes wrong connecting
    */
-  synchronized void reloadZkWatcher() throws IOException {
-    if (zkw != null) zkw.close();
+  private synchronized void reloadZkWatcher() throws IOException {
+    if (zkw != null) {
+      zkw.close();
+    }
     zkw = new ZKWatcher(ctx.getConfiguration(),
         "connection to cluster: " + ctx.getPeerId(), this);
-    getZkw().registerListener(new PeerRegionServerListener(this));
+    zkw.registerListener(new PeerRegionServerListener(this));
   }
 
   @Override
@@ -150,13 +205,19 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
 
   /**
    * Get the list of all the region servers from the specified peer
-   * @param zkw zk connection to use
+   *
    * @return list of region server addresses or an empty list if the slave is unavailable
    */
-  protected static List<ServerName> fetchSlavesAddresses(ZKWatcher zkw)
-      throws KeeperException {
-    List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw,
-            zkw.getZNodePaths().rsZNode);
+  protected List<ServerName> fetchSlavesAddresses() {
+    List<String> children = null;
+    try {
+      children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.getZNodePaths().rsZNode);
+    } catch (KeeperException ke) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Fetch slaves addresses failed", ke);
+      }
+      reconnect(ke);
+    }
     if (children == null) {
       return Collections.emptyList();
     }
@@ -167,43 +228,70 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
     return addresses;
   }
 
+  protected synchronized void chooseSinks() {
+    List<ServerName> slaveAddresses = fetchSlavesAddresses();
+    if (slaveAddresses.isEmpty()) {
+      LOG.warn("No sinks available at peer. Will not be able to replicate");
+    }
+    Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
+    int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
+    this.sinkServers = slaveAddresses.subList(0, numSinks);
+    badReportCounts.clear();
+  }
+
+  protected synchronized int getNumSinks() {
+    return sinkServers.size();
+  }
+
   /**
-   * Get a list of all the addresses of all the available region servers
-   * for this peer cluster, or an empty list if no region servers available at peer cluster.
-   * @return list of addresses
+   * Get a randomly-chosen replication sink to replicate to.
+   * @return a replication sink to replicate to
    */
-  // Synchronize peer cluster connection attempts to avoid races and rate
-  // limit connections when multiple replication sources try to connect to
-  // the peer cluster. If the peer cluster is down we can get out of control
-  // over time.
-  public synchronized List<ServerName> getRegionServers() {
-    try {
-      setRegionServers(fetchSlavesAddresses(this.getZkw()));
-    } catch (KeeperException ke) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Fetch slaves addresses failed", ke);
-      }
-      reconnect(ke);
+  protected synchronized SinkPeer getReplicationSink() throws IOException {
+    if (sinkServers.isEmpty()) {
+      LOG.info("Current list of sinks is out of date or empty, updating");
+      chooseSinks();
     }
-    return regionServers;
+    if (sinkServers.isEmpty()) {
+      throw new IOException("No replication sinks are available");
+    }
+    ServerName serverName =
+      sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size()));
+    return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
   }
 
   /**
-   * Set the list of region servers for that peer
-   * @param regionServers list of addresses for the region servers
+   * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it
+   * failed). If a single SinkPeer is reported as bad more than
+   * replication.bad.sink.threshold times, it will be removed
+   * from the pool of potential replication targets.
+   *
+   * @param sinkPeer The SinkPeer that had a failed replication attempt on it
    */
-  public synchronized void setRegionServers(List<ServerName> regionServers) {
-    this.regionServers = regionServers;
-    lastRegionServerUpdate = System.currentTimeMillis();
+  protected synchronized void reportBadSink(SinkPeer sinkPeer) {
+    ServerName serverName = sinkPeer.getServerName();
+    int badReportCount = badReportCounts.compute(serverName, (k, v) -> v == null ? 1 : v + 1);
+    if (badReportCount > badSinkThreshold) {
+      this.sinkServers.remove(serverName);
+      if (sinkServers.isEmpty()) {
+        chooseSinks();
+      }
+    }
   }
 
   /**
-   * Get the timestamp at which the last change occurred to the list of region servers to replicate
-   * to.
-   * @return The System.currentTimeMillis at the last time the list of peer region servers changed.
+   * Report that a {@code SinkPeer} successfully replicated a chunk of data.
+   *
+   * @param sinkPeer
+   *          The SinkPeer that had a failed replication attempt on it
    */
-  public long getLastRegionServerUpdate() {
-    return lastRegionServerUpdate;
+  protected synchronized void reportSinkSuccess(SinkPeer sinkPeer) {
+    badReportCounts.remove(sinkPeer.getServerName());
+  }
+
+  @VisibleForTesting
+  List<ServerName> getSinkServers() {
+    return sinkServers;
   }
 
   /**
@@ -214,22 +302,39 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
     private final HBaseReplicationEndpoint replicationEndpoint;
     private final String regionServerListNode;
 
-    public PeerRegionServerListener(HBaseReplicationEndpoint replicationPeer) {
-      super(replicationPeer.getZkw());
-      this.replicationEndpoint = replicationPeer;
-      this.regionServerListNode = replicationEndpoint.getZkw().getZNodePaths().rsZNode;
+    public PeerRegionServerListener(HBaseReplicationEndpoint endpoint) {
+      super(endpoint.zkw);
+      this.replicationEndpoint = endpoint;
+      this.regionServerListNode = endpoint.zkw.getZNodePaths().rsZNode;
     }
 
     @Override
     public synchronized void nodeChildrenChanged(String path) {
       if (path.equals(regionServerListNode)) {
-        try {
-          LOG.info("Detected change to peer region servers, fetching updated list");
-          replicationEndpoint.setRegionServers(fetchSlavesAddresses(replicationEndpoint.getZkw()));
-        } catch (KeeperException e) {
-          LOG.error("Error reading slave addresses", e);
-        }
+        LOG.info("Detected change to peer region servers, fetching updated list");
+        replicationEndpoint.chooseSinks();
       }
     }
   }
+
+  /**
+   * Wraps a replication region server sink to provide the ability to identify it.
+   */
+  public static class SinkPeer {
+    private ServerName serverName;
+    private AsyncRegionServerAdmin regionServer;
+
+    public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) {
+      this.serverName = serverName;
+      this.regionServer = regionServer;
+    }
+
+    ServerName getServerName() {
+      return serverName;
+    }
+
+    public AsyncRegionServerAdmin getRegionServer() {
+      return regionServer;
+    }
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 4e0669c..b6e1f69 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -41,7 +41,6 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.CellUtil;
@@ -60,7 +59,6 @@ import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
 import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
@@ -100,8 +98,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   public static final String REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY =
       "hbase.replication.drop.on.deleted.columnfamily";
 
-  private AsyncClusterConnection conn;
-  private Configuration conf;
   // How long should we sleep for each retry
   private long sleepForRetries;
   // Maximum number of retries before taking bold actions
@@ -114,8 +110,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   private int replicationRpcLimit;
   //Metrics for this source
   private MetricsSource metrics;
-  // Handles connecting to peer region servers
-  private ReplicationSinkManager replicationSinkMgr;
   private boolean peersSelected = false;
   private String replicationClusterId = "";
   private ThreadPoolExecutor exec;
@@ -130,25 +124,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   //Initialising as 0 to guarantee at least one logging message
   private long lastSinkFetchTime = 0;
 
-  /*
-   * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
-   * Connection implementations, or initialize it in a different way, so defining createConnection
-   * as protected for possible overridings.
-   */
-  protected AsyncClusterConnection createConnection(Configuration conf) throws IOException {
-    return ClusterConnectionFactory.createAsyncClusterConnection(conf,
-      null, User.getCurrent());
-  }
-
-  /*
-   * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
-   * ReplicationSinkManager implementations, or initialize it in a different way,
-   * so defining createReplicationSinkManager as protected for possible overridings.
-   */
-  protected ReplicationSinkManager createReplicationSinkManager(AsyncClusterConnection conn) {
-    return new ReplicationSinkManager(conn, this, this.conf);
-  }
-
   @Override
   public void init(Context context) throws IOException {
     super.init(context);
@@ -171,8 +146,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     this.sleepForRetries =
         this.conf.getLong("replication.source.sleepforretries", 1000);
     this.metrics = context.getMetrics();
-    // ReplicationQueueInfo parses the peerId out of the znode for us
-    this.replicationSinkMgr = createReplicationSinkManager(conn);
     // per sink thread pool
     this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
       HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
@@ -211,14 +184,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   }
 
   private void connectToPeers() {
-    getRegionServers();
-
     int sleepMultiplier = 1;
-
     // Connect to peer cluster first, unless we have to stop
-    while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
-      replicationSinkMgr.chooseSinks();
-      if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
+    while (this.isRunning() && getNumSinks() == 0) {
+      chooseSinks();
+      if (this.isRunning() && getNumSinks() == 0) {
         if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
           sleepMultiplier++;
         }
@@ -253,7 +223,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   }
 
   private List<List<Entry>> createParallelBatches(final List<Entry> entries) {
-    int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1);
+    int numSinks = Math.max(getNumSinks(), 1);
     int n = Math.min(Math.min(this.maxThreads, entries.size() / 100 + 1), numSinks);
     List<List<Entry>> entryLists =
         Stream.generate(ArrayList<Entry>::new).limit(n).collect(Collectors.toList());
@@ -513,7 +483,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
       peersSelected = true;
     }
 
-    int numSinks = replicationSinkMgr.getNumSinks();
+    int numSinks = getNumSinks();
     if (numSinks == 0) {
       if((System.currentTimeMillis() - lastSinkFetchTime) >= (maxRetriesMultiplier*1000)) {
         LOG.warn(
@@ -561,7 +531,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
           } else {
             LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(),
                 ioe);
-            replicationSinkMgr.chooseSinks();
+            chooseSinks();
           }
         } else {
           if (ioe instanceof SocketTimeoutException) {
@@ -574,7 +544,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
               this.socketTimeoutMultiplier);
           } else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) {
             LOG.warn("{} Peer is unavailable, rechecking all sinks: ", logPeerId(), ioe);
-            replicationSinkMgr.chooseSinks();
+            chooseSinks();
           } else {
             LOG.warn("{} Can't replicate because of a local or network error: ", logPeerId(), ioe);
           }
@@ -629,7 +599,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
         LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}",
           logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
       }
-      sinkPeer = replicationSinkMgr.getReplicationSink();
+      sinkPeer = getReplicationSink();
       AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
       try {
         ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
@@ -644,10 +614,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
         }
         throw e;
       }
-      replicationSinkMgr.reportSinkSuccess(sinkPeer);
+      reportSinkSuccess(sinkPeer);
     } catch (IOException ioe) {
       if (sinkPeer != null) {
-        replicationSinkMgr.reportBadSink(sinkPeer);
+        reportBadSink(sinkPeer);
       }
       throw ioe;
     }
@@ -683,5 +653,4 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   private String logPeerId(){
     return "[Source for peer " + this.ctx.getPeerId() + "]:";
   }
-
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
deleted file mode 100644
index db12dc0..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
-import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
-
-/**
- * Maintains a collection of peers to replicate to, and randomly selects a
- * single peer to replicate to per set of data to replicate. Also handles
- * keeping track of peer availability.
- */
-@InterfaceAudience.Private
-public class ReplicationSinkManager {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSinkManager.class);
-
-  /**
-   * Default maximum number of times a replication sink can be reported as bad before
-   * it will no longer be provided as a sink for replication without the pool of
-   * replication sinks being refreshed.
-   */
-  static final int DEFAULT_BAD_SINK_THRESHOLD = 3;
-
-  /**
-   * Default ratio of the total number of peer cluster region servers to consider
-   * replicating to.
-   */
-  static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f;
-
-
-  private final AsyncClusterConnection conn;
-
-  private final HBaseReplicationEndpoint endpoint;
-
-  // Count of "bad replication sink" reports per peer sink
-  private final Map<ServerName, Integer> badReportCounts;
-
-  // Ratio of total number of potential peer region servers to be used
-  private final float ratio;
-
-  // Maximum number of times a sink can be reported as bad before the pool of
-  // replication sinks is refreshed
-  private final int badSinkThreshold;
-
-  // A timestamp of the last time the list of replication peers changed
-  private long lastUpdateToPeers;
-
-  // The current pool of sinks to which replication can be performed
-  private List<ServerName> sinks = Lists.newArrayList();
-
-  /**
-   * Instantiate for a single replication peer cluster.
-   * @param conn connection to the peer cluster
-   * @param endpoint replication endpoint for inter cluster replication
-   * @param conf HBase configuration, used for determining replication source ratio and bad peer
-   *          threshold
-   */
-  public ReplicationSinkManager(AsyncClusterConnection conn, HBaseReplicationEndpoint endpoint,
-      Configuration conf) {
-    this.conn = conn;
-    this.endpoint = endpoint;
-    this.badReportCounts = Maps.newHashMap();
-    this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
-    this.badSinkThreshold =
-      conf.getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD);
-  }
-
-  /**
-   * Get a randomly-chosen replication sink to replicate to.
-   * @return a replication sink to replicate to
-   */
-  public synchronized SinkPeer getReplicationSink() throws IOException {
-    if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers || sinks.isEmpty()) {
-      LOG.info("Current list of sinks is out of date or empty, updating");
-      chooseSinks();
-    }
-
-    if (sinks.isEmpty()) {
-      throw new IOException("No replication sinks are available");
-    }
-    ServerName serverName = sinks.get(ThreadLocalRandom.current().nextInt(sinks.size()));
-    return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
-  }
-
-  /**
-   * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it
-   * failed). If a single SinkPeer is reported as bad more than
-   * replication.bad.sink.threshold times, it will be removed
-   * from the pool of potential replication targets.
-   *
-   * @param sinkPeer
-   *          The SinkPeer that had a failed replication attempt on it
-   */
-  public synchronized void reportBadSink(SinkPeer sinkPeer) {
-    ServerName serverName = sinkPeer.getServerName();
-    int badReportCount = (badReportCounts.containsKey(serverName)
-                    ? badReportCounts.get(serverName) : 0) + 1;
-    badReportCounts.put(serverName, badReportCount);
-    if (badReportCount > badSinkThreshold) {
-      this.sinks.remove(serverName);
-      if (sinks.isEmpty()) {
-        chooseSinks();
-      }
-    }
-  }
-
-  /**
-   * Report that a {@code SinkPeer} successfully replicated a chunk of data.
-   *
-   * @param sinkPeer
-   *          The SinkPeer that had a failed replication attempt on it
-   */
-  public synchronized void reportSinkSuccess(SinkPeer sinkPeer) {
-    badReportCounts.remove(sinkPeer.getServerName());
-  }
-
-  /**
-   * Refresh the list of sinks.
-   */
-  public synchronized void chooseSinks() {
-    List<ServerName> slaveAddresses = endpoint.getRegionServers();
-    if(slaveAddresses.isEmpty()){
-      LOG.warn("No sinks available at peer. Will not be able to replicate");
-    }
-    Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
-    int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
-    sinks = slaveAddresses.subList(0, numSinks);
-    lastUpdateToPeers = System.currentTimeMillis();
-    badReportCounts.clear();
-  }
-
-  public synchronized int getNumSinks() {
-    return sinks.size();
-  }
-
-  @VisibleForTesting
-  protected List<ServerName> getSinksForTesting() {
-    return Collections.unmodifiableList(sinks);
-  }
-
-  /**
-   * Wraps a replication region server sink to provide the ability to identify
-   * it.
-   */
-  public static class SinkPeer {
-    private ServerName serverName;
-    private AsyncRegionServerAdmin regionServer;
-
-    public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) {
-      this.serverName = serverName;
-      this.regionServer = regionServer;
-    }
-
-    ServerName getServerName() {
-      return serverName;
-    }
-
-    public AsyncRegionServerAdmin getRegionServer() {
-      return regionServer;
-    }
-  }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
new file mode 100644
index 0000000..4160141
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.SinkPeer;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+@Category({ReplicationTests.class, SmallTests.class})
+public class TestHBaseReplicationEndpoint {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestHBaseReplicationEndpoint.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestHBaseReplicationEndpoint.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private HBaseReplicationEndpoint endpoint;
+
+  @Before
+  public void setUp() throws Exception {
+    try {
+      ReplicationEndpoint.Context context =
+        new ReplicationEndpoint.Context(null, UTIL.getConfiguration(), UTIL.getConfiguration(),
+          null, null, null, null, null, null, null);
+      endpoint = new DummyHBaseReplicationEndpoint();
+      endpoint.init(context);
+    } catch (Exception e) {
+      LOG.info("Failed", e);
+    }
+  }
+
+  @Test
+  public void testChooseSinks() {
+    List<ServerName> serverNames = Lists.newArrayList();
+    int totalServers = 20;
+    for (int i = 0; i < totalServers; i++) {
+      serverNames.add(mock(ServerName.class));
+    }
+    ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
+    endpoint.chooseSinks();
+    int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
+    assertEquals(expected, endpoint.getNumSinks());
+  }
+
+  @Test
+  public void testChooseSinksLessThanRatioAvailable() {
+    List<ServerName> serverNames = Lists.newArrayList(mock(ServerName.class),
+      mock(ServerName.class));
+    ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
+    endpoint.chooseSinks();
+    assertEquals(1, endpoint.getNumSinks());
+  }
+
+  @Test
+  public void testReportBadSink() {
+    ServerName serverNameA = mock(ServerName.class);
+    ServerName serverNameB = mock(ServerName.class);
+    ((DummyHBaseReplicationEndpoint) endpoint)
+      .setRegionServers(Lists.newArrayList(serverNameA, serverNameB));
+    endpoint.chooseSinks();
+    // Sanity check
+    assertEquals(1, endpoint.getNumSinks());
+
+    SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
+    endpoint.reportBadSink(sinkPeer);
+    // Just reporting a bad sink once shouldn't have an effect
+    assertEquals(1, endpoint.getNumSinks());
+  }
+
+  /**
+   * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not
+   * be replicated to anymore.
+   */
+  @Test
+  public void testReportBadSinkPastThreshold() {
+    List<ServerName> serverNames = Lists.newArrayList();
+    int totalServers = 30;
+    for (int i = 0; i < totalServers; i++) {
+      serverNames.add(mock(ServerName.class));
+    }
+    ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
+    endpoint.chooseSinks();
+    // Sanity check
+    int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
+    assertEquals(expected, endpoint.getNumSinks());
+
+    ServerName badSinkServer0 = endpoint.getSinkServers().get(0);
+    SinkPeer sinkPeer = new SinkPeer(badSinkServer0, mock(AsyncRegionServerAdmin.class));
+    for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
+      endpoint.reportBadSink(sinkPeer);
+    }
+    // Reporting a bad sink more than the threshold count should remove it
+    // from the list of potential sinks
+    assertEquals(expected - 1, endpoint.getNumSinks());
+
+    // now try a sink that has some successes
+    ServerName badSinkServer1 = endpoint.getSinkServers().get(0);
+    sinkPeer = new SinkPeer(badSinkServer1, mock(AsyncRegionServerAdmin.class));
+    for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
+      endpoint.reportBadSink(sinkPeer);
+    }
+    endpoint.reportSinkSuccess(sinkPeer); // one success
+    endpoint.reportBadSink(sinkPeer);
+    // did not remove the sink, since we had one successful try
+    assertEquals(expected - 1, endpoint.getNumSinks());
+
+    for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD - 1; i++) {
+      endpoint.reportBadSink(sinkPeer);
+    }
+    // still not remove, since the success reset the counter
+    assertEquals(expected - 1, endpoint.getNumSinks());
+    endpoint.reportBadSink(sinkPeer);
+    // but we exhausted the tries
+    assertEquals(expected - 2, endpoint.getNumSinks());
+  }
+
+  @Test
+  public void testReportBadSinkDownToZeroSinks() {
+    List<ServerName> serverNames = Lists.newArrayList();
+    int totalServers = 4;
+    for (int i = 0; i < totalServers; i++) {
+      serverNames.add(mock(ServerName.class));
+    }
+    ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
+    endpoint.chooseSinks();
+    // Sanity check
+    int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
+    assertEquals(expected, endpoint.getNumSinks());
+
+    ServerName serverNameA = endpoint.getSinkServers().get(0);
+    ServerName serverNameB = endpoint.getSinkServers().get(1);
+
+    SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
+    SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));
+
+    for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
+      endpoint.reportBadSink(sinkPeerA);
+      endpoint.reportBadSink(sinkPeerB);
+    }
+
+    // We've gone down to 0 good sinks, so the replication sinks
+    // should have been refreshed now, so out of 4 servers, 2 are not considered as they are
+    // reported as bad.
+    expected =
+      (int) ((totalServers - 2) * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
+    assertEquals(expected, endpoint.getNumSinks());
+  }
+
+  private static class DummyHBaseReplicationEndpoint extends HBaseReplicationEndpoint {
+
+    List<ServerName> regionServers;
+
+    public void setRegionServers(List<ServerName> regionServers) {
+      this.regionServers = regionServers;
+    }
+
+    @Override
+    public List<ServerName> fetchSlavesAddresses() {
+      return regionServers;
+    }
+
+    @Override
+    public boolean replicate(ReplicateContext replicateContext) {
+      return false;
+    }
+
+    @Override
+    public AsyncClusterConnection createConnection(Configuration conf) throws IOException {
+      return null;
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
deleted file mode 100644
index f8a2ab9..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
-import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-
-@Category({ReplicationTests.class, SmallTests.class})
-public class TestReplicationSinkManager {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestReplicationSinkManager.class);
-
-  private ReplicationSinkManager sinkManager;
-  private HBaseReplicationEndpoint replicationEndpoint;
-
-  /**
-   * Manage the 'getRegionServers' for the tests below. Override the base class handling
-   * of Regionservers. We used to use a mock for this but updated guava/errorprone disallows
-   * mocking of classes that implement Service.
-   */
-  private static class SetServersHBaseReplicationEndpoint extends HBaseReplicationEndpoint {
-    List<ServerName> regionServers;
-
-    @Override
-    public boolean replicate(ReplicateContext replicateContext) {
-      return false;
-    }
-
-    @Override
-    public synchronized void setRegionServers(List<ServerName> regionServers) {
-      this.regionServers = regionServers;
-    }
-
-    @Override
-    public List<ServerName> getRegionServers() {
-      return this.regionServers;
-    }
-  }
-
-  @Before
-  public void setUp() {
-    this.replicationEndpoint = new SetServersHBaseReplicationEndpoint();
-    this.sinkManager = new ReplicationSinkManager(mock(AsyncClusterConnection.class),
-      replicationEndpoint, new Configuration());
-  }
-
-  @Test
-  public void testChooseSinks() {
-    List<ServerName> serverNames = Lists.newArrayList();
-    int totalServers = 20;
-    for (int i = 0; i < totalServers; i++) {
-      serverNames.add(mock(ServerName.class));
-    }
-    replicationEndpoint.setRegionServers(serverNames);
-    sinkManager.chooseSinks();
-    int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
-    assertEquals(expected, sinkManager.getNumSinks());
-
-  }
-
-  @Test
-  public void testChooseSinks_LessThanRatioAvailable() {
-    List<ServerName> serverNames = Lists.newArrayList(mock(ServerName.class),
-      mock(ServerName.class));
-    replicationEndpoint.setRegionServers(serverNames);
-    sinkManager.chooseSinks();
-    assertEquals(1, sinkManager.getNumSinks());
-  }
-
-  @Test
-  public void testReportBadSink() {
-    ServerName serverNameA = mock(ServerName.class);
-    ServerName serverNameB = mock(ServerName.class);
-    replicationEndpoint.setRegionServers(Lists.newArrayList(serverNameA, serverNameB));
-    sinkManager.chooseSinks();
-    // Sanity check
-    assertEquals(1, sinkManager.getNumSinks());
-
-    SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
-
-    sinkManager.reportBadSink(sinkPeer);
-
-    // Just reporting a bad sink once shouldn't have an effect
-    assertEquals(1, sinkManager.getNumSinks());
-
-  }
-
-  /**
-   * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not
-   * be replicated to anymore.
-   */
-  @Test
-  public void testReportBadSink_PastThreshold() {
-    List<ServerName> serverNames = Lists.newArrayList();
-    int totalServers = 30;
-    for (int i = 0; i < totalServers; i++) {
-      serverNames.add(mock(ServerName.class));
-    }
-    replicationEndpoint.setRegionServers(serverNames);
-    sinkManager.chooseSinks();
-    // Sanity check
-    int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
-    assertEquals(expected, sinkManager.getNumSinks());
-
-    ServerName serverName = sinkManager.getSinksForTesting().get(0);
-
-    SinkPeer sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class));
-
-    sinkManager.reportSinkSuccess(sinkPeer); // has no effect, counter does not go negative
-    for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
-      sinkManager.reportBadSink(sinkPeer);
-    }
-
-    // Reporting a bad sink more than the threshold count should remove it
-    // from the list of potential sinks
-    assertEquals(expected - 1, sinkManager.getNumSinks());
-
-    //
-    // now try a sink that has some successes
-    //
-    serverName = sinkManager.getSinksForTesting().get(0);
-
-    sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class));
-    for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) {
-      sinkManager.reportBadSink(sinkPeer);
-    }
-    sinkManager.reportSinkSuccess(sinkPeer); // one success
-    sinkManager.reportBadSink(sinkPeer);
-
-    // did not remove the sink, since we had one successful try
-    assertEquals(expected - 1, sinkManager.getNumSinks());
-
-    for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-2; i++) {
-      sinkManager.reportBadSink(sinkPeer);
-    }
-    // still not remove, since the success reset the counter
-    assertEquals(expected - 1, sinkManager.getNumSinks());
-
-    sinkManager.reportBadSink(sinkPeer);
-    // but we exhausted the tries
-    assertEquals(expected - 2, sinkManager.getNumSinks());
-  }
-
-  @Test
-  public void testReportBadSink_DownToZeroSinks() {
-    List<ServerName> serverNames = Lists.newArrayList();
-    int totalServers = 4;
-    for (int i = 0; i < totalServers; i++) {
-      serverNames.add(mock(ServerName.class));
-    }
-    replicationEndpoint.setRegionServers(serverNames);
-    sinkManager.chooseSinks();
-    // Sanity check
-    List<ServerName> sinkList = sinkManager.getSinksForTesting();
-    int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
-    assertEquals(expected, sinkList.size());
-
-    ServerName serverNameA = sinkList.get(0);
-    ServerName serverNameB = sinkList.get(1);
-
-    SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
-    SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));
-
-    for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
-      sinkManager.reportBadSink(sinkPeerA);
-      sinkManager.reportBadSink(sinkPeerB);
-    }
-
-    // We've gone down to 0 good sinks, so the replication sinks
-    // should have been refreshed now, so out of 4 servers, 2 are not considered as they are
-    // reported as bad.
-    expected = (int) ((totalServers - 2) * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
-    assertEquals(expected, sinkManager.getNumSinks());
-  }
-
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
index 3c88ab3..0901291 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
@@ -175,14 +174,9 @@ public class TestSerialReplicationEndpoint {
     }
 
     @Override
-    public synchronized List<ServerName> getRegionServers() {
+    public synchronized int getNumSinks() {
       // Return multiple server names for endpoint parallel replication.
-      return new ArrayList<>(
-          ImmutableList.of(ServerName.valueOf("www.example.com", 12016, 1525245876026L),
-            ServerName.valueOf("www.example2.com", 12016, 1525245876026L),
-            ServerName.valueOf("www.example3.com", 12016, 1525245876026L),
-            ServerName.valueOf("www.example4.com", 12016, 1525245876026L),
-            ServerName.valueOf("www.example4.com", 12016, 1525245876026L)));
+      return 10;
     }
   }
 }