You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by su...@apache.org on 2021/07/09 06:56:04 UTC

[hbase] 10/12: HBASE-25113 [testing] HBaseCluster support ReplicationServer for UTs (#2662)

This is an automated email from the ASF dual-hosted git repository.

sunxin pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 6ae60979a8a61c79c737f7a65d9b5318230b2537
Author: XinSun <dd...@gmail.com>
AuthorDate: Mon Nov 23 11:01:55 2020 +0800

    HBASE-25113 [testing] HBaseCluster support ReplicationServer for UTs (#2662)
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
 .../org/apache/hadoop/hbase/LocalHBaseCluster.java | 63 ++++++++++++++++++-
 .../hbase/replication/HReplicationServer.java      | 13 ++++
 .../apache/hadoop/hbase/util/JVMClusterUtil.java   | 57 +++++++++++++++++-
 .../apache/hadoop/hbase/HBaseTestingUtility.java   |  8 +--
 .../org/apache/hadoop/hbase/MiniHBaseCluster.java  | 70 ++++++++++++++++++----
 .../hadoop/hbase/StartMiniClusterOption.java       | 24 ++++++--
 .../replication/TestReplicationServerSink.java     | 45 +++++++-------
 hbase-server/src/test/resources/hbase-site.xml     |  7 +++
 8 files changed, 242 insertions(+), 45 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
index f4847b9..24b658f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
@@ -32,9 +32,11 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.replication.HReplicationServer;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.ReplicationServerThread;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -60,7 +62,10 @@ import org.slf4j.LoggerFactory;
 public class LocalHBaseCluster {
   private static final Logger LOG = LoggerFactory.getLogger(LocalHBaseCluster.class);
   private final List<JVMClusterUtil.MasterThread> masterThreads = new CopyOnWriteArrayList<>();
-  private final List<JVMClusterUtil.RegionServerThread> regionThreads = new CopyOnWriteArrayList<>();
+  private final List<JVMClusterUtil.RegionServerThread> regionThreads =
+      new CopyOnWriteArrayList<>();
+  private final List<JVMClusterUtil.ReplicationServerThread> replicationThreads =
+      new CopyOnWriteArrayList<>();
   private final static int DEFAULT_NO = 1;
   /** local mode */
   public static final String LOCAL = "local";
@@ -259,6 +264,26 @@ public class LocalHBaseCluster {
         });
   }
 
+  @SuppressWarnings("unchecked")
+  public JVMClusterUtil.ReplicationServerThread addReplicationServer(
+      Configuration config, final int index) throws IOException {
+    // Create each replication server with its own Configuration instance so each has
+    // its Connection instance rather than share (see HBASE_INSTANCES down in
+    // the guts of ConnectionManager).
+    JVMClusterUtil.ReplicationServerThread rst =
+        JVMClusterUtil.createReplicationServerThread(config, index);
+    this.replicationThreads.add(rst);
+    return rst;
+  }
+
+  public JVMClusterUtil.ReplicationServerThread addReplicationServer(
+      final Configuration config, final int index, User user)
+      throws IOException, InterruptedException {
+    return user.runAs(
+        (PrivilegedExceptionAction<ReplicationServerThread>) () -> addReplicationServer(config,
+            index));
+  }
+
   /**
    * @param serverNumber
    * @return region server
@@ -290,6 +315,40 @@ public class LocalHBaseCluster {
   }
 
   /**
+   * @param serverNumber replication server number
+   * @return replication server
+   */
+  public HReplicationServer getReplicationServer(int serverNumber) {
+    return replicationThreads.get(serverNumber).getReplicationServer();
+  }
+
+  /**
+   * @return Read-only list of replication server threads.
+   */
+  public List<JVMClusterUtil.ReplicationServerThread> getReplicationServers() {
+    return Collections.unmodifiableList(this.replicationThreads);
+  }
+
+  /**
+   * @return List of running servers (Some servers may have been killed or
+   *   aborted during lifetime of cluster; these servers are not included in this
+   *   list).
+   */
+  public List<JVMClusterUtil.ReplicationServerThread> getLiveReplicationServers() {
+    List<JVMClusterUtil.ReplicationServerThread> liveServers = new ArrayList<>();
+    List<ReplicationServerThread> list = getReplicationServers();
+    for (JVMClusterUtil.ReplicationServerThread rst: list) {
+      if (rst.isAlive()) {
+        liveServers.add(rst);
+      }
+      else {
+        LOG.info("Not alive {}", rst.getName());
+      }
+    }
+    return liveServers;
+  }
+
+  /**
    * @return the Configuration used by this LocalHBaseCluster
    */
   public Configuration getConfiguration() {
@@ -430,7 +489,7 @@ public class LocalHBaseCluster {
    * Start the cluster.
    */
   public void startup() throws IOException {
-    JVMClusterUtil.startup(this.masterThreads, this.regionThreads);
+    JVMClusterUtil.startup(this.masterThreads, this.regionThreads, this.replicationThreads);
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
index 2d0336d..8d85b85 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
@@ -443,6 +443,19 @@ public class HReplicationServer extends Thread implements Server, ReplicationSou
     return this.stopped;
   }
 
+  public void waitForServerOnline(){
+    while (!isStopped() && !isOnline()) {
+      synchronized (online) {
+        try {
+          online.wait(msgInterval);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+          break;
+        }
+      }
+    }
+  }
+
   /**
    * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
    * be hooked up to WAL.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
index 1e2ac3e..1f76864 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.replication.HReplicationServer;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,6 +73,33 @@ public class JVMClusterUtil {
   }
 
   /**
+   * Datastructure to hold ReplicationServer Thread and ReplicationServer instance
+   */
+  public static class ReplicationServerThread extends Thread {
+    private final HReplicationServer replicationServer;
+
+    public ReplicationServerThread(final HReplicationServer r, final int index) {
+      super(r, "ReplicationServer:" + index + ";" + r.getServerName().toShortString());
+      this.replicationServer = r;
+    }
+
+    /**
+     * @return the replication server
+     */
+    public HReplicationServer getReplicationServer() {
+      return this.replicationServer;
+    }
+
+    /**
+     * Block until the replication server has come online, indicating it is ready
+     * to be used.
+     */
+    public void waitForServerOnline() {
+      replicationServer.waitForServerOnline();
+    }
+  }
+
+  /**
    * Creates a {@link RegionServerThread}.
    * Call 'start' on the returned thread to make it run.
    * @param c Configuration to use.
@@ -98,6 +126,24 @@ public class JVMClusterUtil {
     return new JVMClusterUtil.RegionServerThread(server, index);
   }
 
+  /**
+   * Creates a {@link ReplicationServerThread}.
+   * Call 'start' on the returned thread to make it run.
+   * @param c Configuration to use.
+   * @param index Used distinguishing the object returned.
+   * @throws IOException
+   * @return Replication server added.
+   */
+  public static JVMClusterUtil.ReplicationServerThread createReplicationServerThread(
+      final Configuration c, final int index) throws IOException {
+    HReplicationServer server;
+    try {
+      server = new HReplicationServer(c);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+    return new JVMClusterUtil.ReplicationServerThread(server, index);
+  }
 
   /**
    * Datastructure to hold Master Thread and Master instance
@@ -122,7 +168,7 @@ public class JVMClusterUtil {
    * @param c Configuration to use.
    * @param hmc Class to create.
    * @param index Used distinguishing the object returned.
-   * @throws IOException
+   * @throws IOException exception
    * @return Master added.
    */
   public static JVMClusterUtil.MasterThread createMasterThread(final Configuration c,
@@ -165,7 +211,8 @@ public class JVMClusterUtil {
    * @return Address to use contacting primary master.
    */
   public static String startup(final List<JVMClusterUtil.MasterThread> masters,
-      final List<JVMClusterUtil.RegionServerThread> regionservers) throws IOException {
+      final List<JVMClusterUtil.RegionServerThread> regionservers,
+      final List<JVMClusterUtil.ReplicationServerThread> replicationServers) throws IOException {
     // Implementation note: This method relies on timed sleeps in a loop. It's not great, and
     // should probably be re-written to use actual synchronization objects, but it's ok for now
 
@@ -193,6 +240,12 @@ public class JVMClusterUtil {
       }
     }
 
+    if (replicationServers != null) {
+      for (JVMClusterUtil.ReplicationServerThread t: replicationServers) {
+        t.start();
+      }
+    }
+
     // Wait for an active master to be initialized (implies being master)
     //  with this, when we return the cluster is complete
     final int initTimeout = configuration != null ? Integer.parseInt(
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 840b9e0..fb31b3c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -1111,8 +1111,8 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
 
     Configuration c = new Configuration(this.conf);
     this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(),
-      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
-      option.getMasterClass(), option.getRsClass());
+        option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
+        option.getNumReplicationServers(), option.getMasterClass(), option.getRsClass());
     // Populate the master address configuration from mini cluster configuration.
     conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
     // Don't leave here till we've done a successful scan of the hbase:meta
@@ -1237,8 +1237,8 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
     closeConnection();
     this.hbaseCluster =
         new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumAlwaysStandByMasters(),
-            option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(),
-            option.getRsClass());
+            option.getNumRegionServers(), option.getRsPorts(), option.getNumReplicationServers(),
+            option.getMasterClass(), option.getRsClass());
     // Don't leave here till we've done a successful scan of the hbase:meta
     Connection conn = ConnectionFactory.createConnection(this.conf);
     Table t = conn.getTable(TableName.META_TABLE_NAME);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
index 990867e..eb71623 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
@@ -32,12 +32,14 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.replication.HReplicationServer;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.test.MetricsAssertHelper;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.ReplicationServerThread;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -87,10 +89,10 @@ public class MiniHBaseCluster extends HBaseCluster {
    * @param numRegionServers initial number of region servers to start.
    */
   public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
-         Class<? extends HMaster> masterClass,
-         Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
+      Class<? extends HMaster> masterClass,
+      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
       throws IOException, InterruptedException {
-    this(conf, numMasters, 0, numRegionServers, null, masterClass, regionserverClass);
+    this(conf, numMasters, 0, numRegionServers, null, 0, masterClass, regionserverClass);
   }
 
   /**
@@ -98,20 +100,22 @@ public class MiniHBaseCluster extends HBaseCluster {
    *   restart where for sure the regionservers come up on same address+port (but
    *   just with different startcode); by default mini hbase clusters choose new
    *   arbitrary ports on each cluster start.
+   * @param numReplicationServers initial number of replication servers to start.
    * @throws IOException
    * @throws InterruptedException
    */
   public MiniHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters,
-         int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
-         Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
+      int numRegionServers, List<Integer> rsPorts, int numReplicationServers,
+      Class<? extends HMaster> masterClass,
+      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
       throws IOException, InterruptedException {
     super(conf);
 
     // Hadoop 2
     CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();
 
-    init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, masterClass,
-        regionserverClass);
+    init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, numReplicationServers,
+        masterClass, regionserverClass);
     this.initialClusterStatus = getClusterMetrics();
   }
 
@@ -228,7 +232,8 @@ public class MiniHBaseCluster extends HBaseCluster {
   }
 
   private void init(final int nMasterNodes, final int numAlwaysStandByMasters,
-      final int nRegionNodes, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
+      final int nRegionNodes, List<Integer> rsPorts, int numReplicationServers,
+      Class<? extends HMaster> masterClass,
       Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
   throws IOException, InterruptedException {
     try {
@@ -249,11 +254,17 @@ public class MiniHBaseCluster extends HBaseCluster {
         if (rsPorts != null) {
           rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i));
         }
-        User user = HBaseTestingUtility.getDifferentUser(rsConf,
-            ".hfs."+index++);
+        User user = HBaseTestingUtility.getDifferentUser(rsConf, ".hfs." + index++);
         hbaseCluster.addRegionServer(rsConf, i, user);
       }
 
+      // manually add the replication servers as other users
+      for (int i = 0; i < numReplicationServers; i++) {
+        Configuration rsConf = HBaseConfiguration.create(conf);
+        User user = HBaseTestingUtility.getDifferentUser(rsConf, ".hfs." + index++);
+        hbaseCluster.addReplicationServer(rsConf, i, user);
+      }
+
       hbaseCluster.startup();
     } catch (IOException e) {
       shutdown();
@@ -792,7 +803,7 @@ public class MiniHBaseCluster extends HBaseCluster {
 
   /**
    * Grab a numbered region server of your choice.
-   * @param serverNumber
+   * @param serverNumber region server number
    * @return region server
    */
   public HRegionServer getRegionServer(int serverNumber) {
@@ -806,6 +817,43 @@ public class MiniHBaseCluster extends HBaseCluster {
         .findFirst().orElse(null);
   }
 
+  /**
+   * @return Number of live replication servers in the cluster currently.
+   */
+  public int getNumLiveReplicationServers() {
+    return this.hbaseCluster.getLiveReplicationServers().size();
+  }
+
+  /**
+   * @return List of replication server threads.
+   */
+  public List<JVMClusterUtil.ReplicationServerThread> getReplicationServerThreads() {
+    return this.hbaseCluster.getReplicationServers();
+  }
+
+  /**
+   * @return List of live replication server threads (skips the aborted and the killed)
+   */
+  public List<JVMClusterUtil.ReplicationServerThread> getLiveReplicationServerThreads() {
+    return this.hbaseCluster.getLiveReplicationServers();
+  }
+
+  /**
+   * Grab a numbered replication server of your choice.
+   * @param serverNumber
+   * @return replication server
+   */
+  public HReplicationServer getReplicationServer(int serverNumber) {
+    return hbaseCluster.getReplicationServer(serverNumber);
+  }
+
+  public HReplicationServer getReplicationServer(ServerName serverName) {
+    return hbaseCluster.getReplicationServers().stream()
+        .map(ReplicationServerThread::getReplicationServer)
+        .filter(r -> r.getServerName().equals(serverName))
+        .findFirst().orElse(null);
+  }
+
   public List<HRegion> getRegions(byte[] tableName) {
     return getRegions(TableName.valueOf(tableName));
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java
index 7a9bd68..0aa35ed 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java
@@ -75,6 +75,10 @@ public final class StartMiniClusterOption {
    * The class to use as HRegionServer, or null for default.
    */
   private Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass;
+  /**
+   * Number of replication servers to start up.
+   */
+  private final int numReplicationServers;
 
   /**
    * Number of datanodes. Used to create mini DSF cluster. Surpassed by {@link #dataNodeHosts} size.
@@ -109,7 +113,8 @@ public final class StartMiniClusterOption {
    */
   private StartMiniClusterOption(int numMasters, int numAlwaysStandByMasters,
       Class<? extends HMaster> masterClass, int numRegionServers, List<Integer> rsPorts,
-      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, int numDataNodes,
+      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass,
+      int numReplicationServers, int numDataNodes,
       String[] dataNodeHosts, int numZkServers, boolean createRootDir, boolean createWALDir) {
     this.numMasters = numMasters;
     this.numAlwaysStandByMasters = numAlwaysStandByMasters;
@@ -117,6 +122,7 @@ public final class StartMiniClusterOption {
     this.numRegionServers = numRegionServers;
     this.rsPorts = rsPorts;
     this.rsClass = rsClass;
+    this.numReplicationServers = numReplicationServers;
     this.numDataNodes = numDataNodes;
     this.dataNodeHosts = dataNodeHosts;
     this.numZkServers = numZkServers;
@@ -148,6 +154,10 @@ public final class StartMiniClusterOption {
     return rsClass;
   }
 
+  public int getNumReplicationServers() {
+    return numReplicationServers;
+  }
+
   public int getNumDataNodes() {
     return numDataNodes;
   }
@@ -196,6 +206,7 @@ public final class StartMiniClusterOption {
     private Class<? extends HMaster> masterClass = null;
     private int numRegionServers = 1;
     private List<Integer> rsPorts = null;
+    private int numReplicationServers;
     private Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass = null;
     private int numDataNodes = 1;
     private String[] dataNodeHosts = null;
@@ -210,9 +221,9 @@ public final class StartMiniClusterOption {
       if (dataNodeHosts != null && dataNodeHosts.length != 0) {
         numDataNodes = dataNodeHosts.length;
       }
-      return new StartMiniClusterOption(numMasters,numAlwaysStandByMasters, masterClass,
-          numRegionServers, rsPorts, rsClass, numDataNodes, dataNodeHosts, numZkServers,
-          createRootDir, createWALDir);
+      return new StartMiniClusterOption(numMasters, numAlwaysStandByMasters, masterClass,
+          numRegionServers, rsPorts, rsClass, numReplicationServers,
+          numDataNodes, dataNodeHosts, numZkServers, createRootDir, createWALDir);
     }
 
     public Builder numMasters(int numMasters) {
@@ -269,6 +280,11 @@ public final class StartMiniClusterOption {
       this.createWALDir = createWALDir;
       return this;
     }
+
+    public Builder numReplicationServers(int numReplicationServers) {
+      this.numReplicationServers = numReplicationServers;
+      return this;
+    }
   }
 
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java
index d97667b..a1cbebb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
@@ -77,8 +78,8 @@ public class TestReplicationServerSink {
 
   private static HMaster MASTER;
 
-  private static HReplicationServer replicationServer;
-  private static ServerName replicationServerName;
+  private static HReplicationServer REPLICATION_SERVER;
+  private static ServerName REPLICATION_SERVER_NAME;
 
   private static Path baseNamespaceDir;
   private static Path hfileArchiveDir;
@@ -93,9 +94,13 @@ public class TestReplicationServerSink {
   public static void beforeClass() throws Exception {
     CONF.setLong(HBASE_CLIENT_OPERATION_TIMEOUT, 1000);
     CONF.setLong(ONLINE_SERVER_REFRESH_INTERVAL, 10000);
-    TEST_UTIL.startMiniCluster();
+    CONF.setBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY, true);
+    TEST_UTIL.startMiniCluster(StartMiniClusterOption.builder().numReplicationServers(1).build());
     MASTER = TEST_UTIL.getMiniHBaseCluster().getMaster();
     TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster();
+    REPLICATION_SERVER = TEST_UTIL.getMiniHBaseCluster().getReplicationServerThreads().get(0)
+        .getReplicationServer();
+    REPLICATION_SERVER_NAME = REPLICATION_SERVER.getServerName();
 
     Path rootDir = CommonFSUtils.getRootDir(CONF);
     baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR));
@@ -110,11 +115,6 @@ public class TestReplicationServerSink {
 
   @Before
   public void before() throws Exception {
-    replicationServer = new HReplicationServer(CONF);
-    replicationServer.start();
-    TEST_UTIL.waitFor(60000, () -> replicationServer.isOnline());
-    replicationServerName = replicationServer.getServerName();
-
     TEST_UTIL.createTable(TABLENAME, FAMILY);
     TEST_UTIL.waitTableAvailable(TABLENAME);
   }
@@ -122,12 +122,6 @@ public class TestReplicationServerSink {
   @After
   public void after() throws IOException {
     TEST_UTIL.deleteTableIfAny(TABLENAME);
-    if (!replicationServer.isStopped()) {
-      replicationServer.stop("test");
-    }
-    TEST_UTIL.waitFor(10000, () -> !replicationServer.isAlive());
-    replicationServer = null;
-    replicationServerName = null;
   }
 
   /**
@@ -138,10 +132,10 @@ public class TestReplicationServerSink {
     AsyncClusterConnection conn =
         TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection();
     AsyncReplicationServerAdmin replAdmin =
-        conn.getReplicationServerAdmin(replicationServerName);
+        conn.getReplicationServerAdmin(REPLICATION_SERVER_NAME);
 
     ReplicationServerSinkPeer sinkPeer =
-        new ReplicationServerSinkPeer(replicationServerName, replAdmin);
+        new ReplicationServerSinkPeer(REPLICATION_SERVER_NAME, replAdmin);
     replicateWALEntryAndVerify(sinkPeer);
   }
 
@@ -178,23 +172,30 @@ public class TestReplicationServerSink {
     ReplicationServerManager replicationServerManager = MASTER.getReplicationServerManager();
     assertNotNull(replicationServerManager);
     TEST_UTIL.waitFor(60000, () -> !replicationServerManager.getOnlineServers().isEmpty()
-        && null != replicationServerManager.getServerMetrics(replicationServerName));
+        && null != replicationServerManager.getServerMetrics(REPLICATION_SERVER_NAME));
     // put data via replication server
     testReplicateWAL();
-    TEST_UTIL.waitFor(60000, () -> replicationServer.rpcServices.requestCount.sum() > 0
-        && replicationServer.rpcServices.requestCount.sum() == replicationServerManager
-        .getServerMetrics(replicationServerName).getRequestCount());
+    TEST_UTIL.waitFor(60000, () -> REPLICATION_SERVER.rpcServices.requestCount.sum() > 0
+        && REPLICATION_SERVER.rpcServices.requestCount.sum() == replicationServerManager
+        .getServerMetrics(REPLICATION_SERVER_NAME).getRequestCount());
   }
 
   @Test
   public void testReplicationServerExpire() throws Exception {
+    int initialNum = TEST_UTIL.getMiniHBaseCluster().getNumLiveReplicationServers();
+    HReplicationServer replicationServer = new HReplicationServer(CONF);
+    replicationServer.start();
+    ServerName replicationServerName = replicationServer.getServerName();
+
     ReplicationServerManager replicationServerManager = MASTER.getReplicationServerManager();
-    TEST_UTIL.waitFor(60000, () -> !replicationServerManager.getOnlineServers().isEmpty()
+    TEST_UTIL.waitFor(60000, () ->
+        initialNum + 1 == replicationServerManager.getOnlineServers().size()
         && null != replicationServerManager.getServerMetrics(replicationServerName));
 
     replicationServer.stop("test");
 
-    TEST_UTIL.waitFor(180000, 1000, replicationServerManager.getOnlineServers()::isEmpty);
+    TEST_UTIL.waitFor(180000, 1000, () ->
+        initialNum == replicationServerManager.getOnlineServers().size());
     assertNull(replicationServerManager.getServerMetrics(replicationServerName));
   }
 }
diff --git a/hbase-server/src/test/resources/hbase-site.xml b/hbase-server/src/test/resources/hbase-site.xml
index 5e64bfc..36187a3 100644
--- a/hbase-server/src/test/resources/hbase-site.xml
+++ b/hbase-server/src/test/resources/hbase-site.xml
@@ -90,6 +90,13 @@
     </description>
   </property>
   <property>
+    <name>hbase.replicationserver.port</name>
+    <value>0</value>
+    <description>Always have replicationservers come up on port '0' so we don't clash over
+      default ports.
+    </description>
+  </property>
+  <property>
     <name>hbase.ipc.client.fallback-to-simple-auth-allowed</name>
     <value>true</value>
   </property>