You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by su...@apache.org on 2021/07/09 06:56:04 UTC
[hbase] 10/12: HBASE-25113 [testing] HBaseCluster support
ReplicationServer for UTs (#2662)
This is an automated email from the ASF dual-hosted git repository.
sunxin pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 6ae60979a8a61c79c737f7a65d9b5318230b2537
Author: XinSun <dd...@gmail.com>
AuthorDate: Mon Nov 23 11:01:55 2020 +0800
HBASE-25113 [testing] HBaseCluster support ReplicationServer for UTs (#2662)
Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
.../org/apache/hadoop/hbase/LocalHBaseCluster.java | 63 ++++++++++++++++++-
.../hbase/replication/HReplicationServer.java | 13 ++++
.../apache/hadoop/hbase/util/JVMClusterUtil.java | 57 +++++++++++++++++-
.../apache/hadoop/hbase/HBaseTestingUtility.java | 8 +--
.../org/apache/hadoop/hbase/MiniHBaseCluster.java | 70 ++++++++++++++++++----
.../hadoop/hbase/StartMiniClusterOption.java | 24 ++++++--
.../replication/TestReplicationServerSink.java | 45 +++++++-------
hbase-server/src/test/resources/hbase-site.xml | 7 +++
8 files changed, 242 insertions(+), 45 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
index f4847b9..24b658f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
@@ -32,9 +32,11 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.replication.HReplicationServer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.ReplicationServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -60,7 +62,10 @@ import org.slf4j.LoggerFactory;
public class LocalHBaseCluster {
private static final Logger LOG = LoggerFactory.getLogger(LocalHBaseCluster.class);
private final List<JVMClusterUtil.MasterThread> masterThreads = new CopyOnWriteArrayList<>();
- private final List<JVMClusterUtil.RegionServerThread> regionThreads = new CopyOnWriteArrayList<>();
+ private final List<JVMClusterUtil.RegionServerThread> regionThreads =
+ new CopyOnWriteArrayList<>();
+ private final List<JVMClusterUtil.ReplicationServerThread> replicationThreads =
+ new CopyOnWriteArrayList<>();
private final static int DEFAULT_NO = 1;
/** local mode */
public static final String LOCAL = "local";
@@ -259,6 +264,26 @@ public class LocalHBaseCluster {
});
}
+ @SuppressWarnings("unchecked")
+ public JVMClusterUtil.ReplicationServerThread addReplicationServer(
+ Configuration config, final int index) throws IOException {
+ // Create each replication server with its own Configuration instance so each has
+ // its Connection instance rather than share (see HBASE_INSTANCES down in
+ // the guts of ConnectionManager).
+ JVMClusterUtil.ReplicationServerThread rst =
+ JVMClusterUtil.createReplicationServerThread(config, index);
+ this.replicationThreads.add(rst);
+ return rst;
+ }
+
+ public JVMClusterUtil.ReplicationServerThread addReplicationServer(
+ final Configuration config, final int index, User user)
+ throws IOException, InterruptedException {
+ return user.runAs(
+ (PrivilegedExceptionAction<ReplicationServerThread>) () -> addReplicationServer(config,
+ index));
+ }
+
/**
* @param serverNumber
* @return region server
@@ -290,6 +315,40 @@ public class LocalHBaseCluster {
}
/**
+ * @param serverNumber replication server number
+ * @return replication server
+ */
+ public HReplicationServer getReplicationServer(int serverNumber) {
+ return replicationThreads.get(serverNumber).getReplicationServer();
+ }
+
+ /**
+ * @return Read-only list of replication server threads.
+ */
+ public List<JVMClusterUtil.ReplicationServerThread> getReplicationServers() {
+ return Collections.unmodifiableList(this.replicationThreads);
+ }
+
+ /**
+ * @return List of running servers (Some servers may have been killed or
+ * aborted during lifetime of cluster; these servers are not included in this
+ * list).
+ */
+ public List<JVMClusterUtil.ReplicationServerThread> getLiveReplicationServers() {
+ List<JVMClusterUtil.ReplicationServerThread> liveServers = new ArrayList<>();
+ List<ReplicationServerThread> list = getReplicationServers();
+ for (JVMClusterUtil.ReplicationServerThread rst: list) {
+ if (rst.isAlive()) {
+ liveServers.add(rst);
+ }
+ else {
+ LOG.info("Not alive {}", rst.getName());
+ }
+ }
+ return liveServers;
+ }
+
+ /**
* @return the Configuration used by this LocalHBaseCluster
*/
public Configuration getConfiguration() {
@@ -430,7 +489,7 @@ public class LocalHBaseCluster {
* Start the cluster.
*/
public void startup() throws IOException {
- JVMClusterUtil.startup(this.masterThreads, this.regionThreads);
+ JVMClusterUtil.startup(this.masterThreads, this.regionThreads, this.replicationThreads);
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
index 2d0336d..8d85b85 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
@@ -443,6 +443,19 @@ public class HReplicationServer extends Thread implements Server, ReplicationSou
return this.stopped;
}
+ public void waitForServerOnline(){
+ while (!isStopped() && !isOnline()) {
+ synchronized (online) {
+ try {
+ online.wait(msgInterval);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+ }
+
/**
* Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
* be hooked up to WAL.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
index 1e2ac3e..1f76864 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.replication.HReplicationServer;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +73,33 @@ public class JVMClusterUtil {
}
/**
+ * Datastructure to hold ReplicationServer Thread and ReplicationServer instance
+ */
+ public static class ReplicationServerThread extends Thread {
+ private final HReplicationServer replicationServer;
+
+ public ReplicationServerThread(final HReplicationServer r, final int index) {
+ super(r, "ReplicationServer:" + index + ";" + r.getServerName().toShortString());
+ this.replicationServer = r;
+ }
+
+ /**
+ * @return the replication server
+ */
+ public HReplicationServer getReplicationServer() {
+ return this.replicationServer;
+ }
+
+ /**
+ * Block until the replication server has come online, indicating it is ready
+ * to be used.
+ */
+ public void waitForServerOnline() {
+ replicationServer.waitForServerOnline();
+ }
+ }
+
+ /**
* Creates a {@link RegionServerThread}.
* Call 'start' on the returned thread to make it run.
* @param c Configuration to use.
@@ -98,6 +126,24 @@ public class JVMClusterUtil {
return new JVMClusterUtil.RegionServerThread(server, index);
}
+ /**
+ * Creates a {@link ReplicationServerThread}.
+ * Call 'start' on the returned thread to make it run.
+ * @param c Configuration to use.
+ * @param index Used distinguishing the object returned.
+ * @throws IOException
+ * @return Replication server added.
+ */
+ public static JVMClusterUtil.ReplicationServerThread createReplicationServerThread(
+ final Configuration c, final int index) throws IOException {
+ HReplicationServer server;
+ try {
+ server = new HReplicationServer(c);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return new JVMClusterUtil.ReplicationServerThread(server, index);
+ }
/**
* Datastructure to hold Master Thread and Master instance
@@ -122,7 +168,7 @@ public class JVMClusterUtil {
* @param c Configuration to use.
* @param hmc Class to create.
* @param index Used distinguishing the object returned.
- * @throws IOException
+ * @throws IOException exception
* @return Master added.
*/
public static JVMClusterUtil.MasterThread createMasterThread(final Configuration c,
@@ -165,7 +211,8 @@ public class JVMClusterUtil {
* @return Address to use contacting primary master.
*/
public static String startup(final List<JVMClusterUtil.MasterThread> masters,
- final List<JVMClusterUtil.RegionServerThread> regionservers) throws IOException {
+ final List<JVMClusterUtil.RegionServerThread> regionservers,
+ final List<JVMClusterUtil.ReplicationServerThread> replicationServers) throws IOException {
// Implementation note: This method relies on timed sleeps in a loop. It's not great, and
// should probably be re-written to use actual synchronization objects, but it's ok for now
@@ -193,6 +240,12 @@ public class JVMClusterUtil {
}
}
+ if (replicationServers != null) {
+ for (JVMClusterUtil.ReplicationServerThread t: replicationServers) {
+ t.start();
+ }
+ }
+
// Wait for an active master to be initialized (implies being master)
// with this, when we return the cluster is complete
final int initTimeout = configuration != null ? Integer.parseInt(
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 840b9e0..fb31b3c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -1111,8 +1111,8 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
Configuration c = new Configuration(this.conf);
this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(),
- option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
- option.getMasterClass(), option.getRsClass());
+ option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
+ option.getNumReplicationServers(), option.getMasterClass(), option.getRsClass());
// Populate the master address configuration from mini cluster configuration.
conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
// Don't leave here till we've done a successful scan of the hbase:meta
@@ -1237,8 +1237,8 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
closeConnection();
this.hbaseCluster =
new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumAlwaysStandByMasters(),
- option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(),
- option.getRsClass());
+ option.getNumRegionServers(), option.getRsPorts(), option.getNumReplicationServers(),
+ option.getMasterClass(), option.getRsClass());
// Don't leave here till we've done a successful scan of the hbase:meta
Connection conn = ConnectionFactory.createConnection(this.conf);
Table t = conn.getTable(TableName.META_TABLE_NAME);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
index 990867e..eb71623 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
@@ -32,12 +32,14 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.replication.HReplicationServer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.ReplicationServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -87,10 +89,10 @@ public class MiniHBaseCluster extends HBaseCluster {
* @param numRegionServers initial number of region servers to start.
*/
public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
- Class<? extends HMaster> masterClass,
- Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
+ Class<? extends HMaster> masterClass,
+ Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
throws IOException, InterruptedException {
- this(conf, numMasters, 0, numRegionServers, null, masterClass, regionserverClass);
+ this(conf, numMasters, 0, numRegionServers, null, 0, masterClass, regionserverClass);
}
/**
@@ -98,20 +100,22 @@ public class MiniHBaseCluster extends HBaseCluster {
* restart where for sure the regionservers come up on same address+port (but
* just with different startcode); by default mini hbase clusters choose new
* arbitrary ports on each cluster start.
+ * @param numReplicationServers initial number of replication servers to start.
* @throws IOException
* @throws InterruptedException
*/
public MiniHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters,
- int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
- Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
+ int numRegionServers, List<Integer> rsPorts, int numReplicationServers,
+ Class<? extends HMaster> masterClass,
+ Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
throws IOException, InterruptedException {
super(conf);
// Hadoop 2
CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();
- init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, masterClass,
- regionserverClass);
+ init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, numReplicationServers,
+ masterClass, regionserverClass);
this.initialClusterStatus = getClusterMetrics();
}
@@ -228,7 +232,8 @@ public class MiniHBaseCluster extends HBaseCluster {
}
private void init(final int nMasterNodes, final int numAlwaysStandByMasters,
- final int nRegionNodes, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
+ final int nRegionNodes, List<Integer> rsPorts, int numReplicationServers,
+ Class<? extends HMaster> masterClass,
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
throws IOException, InterruptedException {
try {
@@ -249,11 +254,17 @@ public class MiniHBaseCluster extends HBaseCluster {
if (rsPorts != null) {
rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i));
}
- User user = HBaseTestingUtility.getDifferentUser(rsConf,
- ".hfs."+index++);
+ User user = HBaseTestingUtility.getDifferentUser(rsConf, ".hfs." + index++);
hbaseCluster.addRegionServer(rsConf, i, user);
}
+ // manually add the replication servers as other users
+ for (int i = 0; i < numReplicationServers; i++) {
+ Configuration rsConf = HBaseConfiguration.create(conf);
+ User user = HBaseTestingUtility.getDifferentUser(rsConf, ".hfs." + index++);
+ hbaseCluster.addReplicationServer(rsConf, i, user);
+ }
+
hbaseCluster.startup();
} catch (IOException e) {
shutdown();
@@ -792,7 +803,7 @@ public class MiniHBaseCluster extends HBaseCluster {
/**
* Grab a numbered region server of your choice.
- * @param serverNumber
+ * @param serverNumber region server number
* @return region server
*/
public HRegionServer getRegionServer(int serverNumber) {
@@ -806,6 +817,43 @@ public class MiniHBaseCluster extends HBaseCluster {
.findFirst().orElse(null);
}
+ /**
+ * @return Number of live replication servers in the cluster currently.
+ */
+ public int getNumLiveReplicationServers() {
+ return this.hbaseCluster.getLiveReplicationServers().size();
+ }
+
+ /**
+ * @return List of replication server threads.
+ */
+ public List<JVMClusterUtil.ReplicationServerThread> getReplicationServerThreads() {
+ return this.hbaseCluster.getReplicationServers();
+ }
+
+ /**
+ * @return List of live replication server threads (skips the aborted and the killed)
+ */
+ public List<JVMClusterUtil.ReplicationServerThread> getLiveReplicationServerThreads() {
+ return this.hbaseCluster.getLiveReplicationServers();
+ }
+
+ /**
+ * Grab a numbered replication server of your choice.
+ * @param serverNumber
+ * @return replication server
+ */
+ public HReplicationServer getReplicationServer(int serverNumber) {
+ return hbaseCluster.getReplicationServer(serverNumber);
+ }
+
+ public HReplicationServer getReplicationServer(ServerName serverName) {
+ return hbaseCluster.getReplicationServers().stream()
+ .map(ReplicationServerThread::getReplicationServer)
+ .filter(r -> r.getServerName().equals(serverName))
+ .findFirst().orElse(null);
+ }
+
public List<HRegion> getRegions(byte[] tableName) {
return getRegions(TableName.valueOf(tableName));
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java
index 7a9bd68..0aa35ed 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java
@@ -75,6 +75,10 @@ public final class StartMiniClusterOption {
* The class to use as HRegionServer, or null for default.
*/
private Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass;
+ /**
+ * Number of replication servers to start up.
+ */
+ private final int numReplicationServers;
/**
* Number of datanodes. Used to create mini DSF cluster. Surpassed by {@link #dataNodeHosts} size.
@@ -109,7 +113,8 @@ public final class StartMiniClusterOption {
*/
private StartMiniClusterOption(int numMasters, int numAlwaysStandByMasters,
Class<? extends HMaster> masterClass, int numRegionServers, List<Integer> rsPorts,
- Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, int numDataNodes,
+ Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass,
+ int numReplicationServers, int numDataNodes,
String[] dataNodeHosts, int numZkServers, boolean createRootDir, boolean createWALDir) {
this.numMasters = numMasters;
this.numAlwaysStandByMasters = numAlwaysStandByMasters;
@@ -117,6 +122,7 @@ public final class StartMiniClusterOption {
this.numRegionServers = numRegionServers;
this.rsPorts = rsPorts;
this.rsClass = rsClass;
+ this.numReplicationServers = numReplicationServers;
this.numDataNodes = numDataNodes;
this.dataNodeHosts = dataNodeHosts;
this.numZkServers = numZkServers;
@@ -148,6 +154,10 @@ public final class StartMiniClusterOption {
return rsClass;
}
+ public int getNumReplicationServers() {
+ return numReplicationServers;
+ }
+
public int getNumDataNodes() {
return numDataNodes;
}
@@ -196,6 +206,7 @@ public final class StartMiniClusterOption {
private Class<? extends HMaster> masterClass = null;
private int numRegionServers = 1;
private List<Integer> rsPorts = null;
+ private int numReplicationServers;
private Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass = null;
private int numDataNodes = 1;
private String[] dataNodeHosts = null;
@@ -210,9 +221,9 @@ public final class StartMiniClusterOption {
if (dataNodeHosts != null && dataNodeHosts.length != 0) {
numDataNodes = dataNodeHosts.length;
}
- return new StartMiniClusterOption(numMasters,numAlwaysStandByMasters, masterClass,
- numRegionServers, rsPorts, rsClass, numDataNodes, dataNodeHosts, numZkServers,
- createRootDir, createWALDir);
+ return new StartMiniClusterOption(numMasters, numAlwaysStandByMasters, masterClass,
+ numRegionServers, rsPorts, rsClass, numReplicationServers,
+ numDataNodes, dataNodeHosts, numZkServers, createRootDir, createWALDir);
}
public Builder numMasters(int numMasters) {
@@ -269,6 +280,11 @@ public final class StartMiniClusterOption {
this.createWALDir = createWALDir;
return this;
}
+
+ public Builder numReplicationServers(int numReplicationServers) {
+ this.numReplicationServers = numReplicationServers;
+ return this;
+ }
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java
index d97667b..a1cbebb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
@@ -77,8 +78,8 @@ public class TestReplicationServerSink {
private static HMaster MASTER;
- private static HReplicationServer replicationServer;
- private static ServerName replicationServerName;
+ private static HReplicationServer REPLICATION_SERVER;
+ private static ServerName REPLICATION_SERVER_NAME;
private static Path baseNamespaceDir;
private static Path hfileArchiveDir;
@@ -93,9 +94,13 @@ public class TestReplicationServerSink {
public static void beforeClass() throws Exception {
CONF.setLong(HBASE_CLIENT_OPERATION_TIMEOUT, 1000);
CONF.setLong(ONLINE_SERVER_REFRESH_INTERVAL, 10000);
- TEST_UTIL.startMiniCluster();
+ CONF.setBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY, true);
+ TEST_UTIL.startMiniCluster(StartMiniClusterOption.builder().numReplicationServers(1).build());
MASTER = TEST_UTIL.getMiniHBaseCluster().getMaster();
TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster();
+ REPLICATION_SERVER = TEST_UTIL.getMiniHBaseCluster().getReplicationServerThreads().get(0)
+ .getReplicationServer();
+ REPLICATION_SERVER_NAME = REPLICATION_SERVER.getServerName();
Path rootDir = CommonFSUtils.getRootDir(CONF);
baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR));
@@ -110,11 +115,6 @@ public class TestReplicationServerSink {
@Before
public void before() throws Exception {
- replicationServer = new HReplicationServer(CONF);
- replicationServer.start();
- TEST_UTIL.waitFor(60000, () -> replicationServer.isOnline());
- replicationServerName = replicationServer.getServerName();
-
TEST_UTIL.createTable(TABLENAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLENAME);
}
@@ -122,12 +122,6 @@ public class TestReplicationServerSink {
@After
public void after() throws IOException {
TEST_UTIL.deleteTableIfAny(TABLENAME);
- if (!replicationServer.isStopped()) {
- replicationServer.stop("test");
- }
- TEST_UTIL.waitFor(10000, () -> !replicationServer.isAlive());
- replicationServer = null;
- replicationServerName = null;
}
/**
@@ -138,10 +132,10 @@ public class TestReplicationServerSink {
AsyncClusterConnection conn =
TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection();
AsyncReplicationServerAdmin replAdmin =
- conn.getReplicationServerAdmin(replicationServerName);
+ conn.getReplicationServerAdmin(REPLICATION_SERVER_NAME);
ReplicationServerSinkPeer sinkPeer =
- new ReplicationServerSinkPeer(replicationServerName, replAdmin);
+ new ReplicationServerSinkPeer(REPLICATION_SERVER_NAME, replAdmin);
replicateWALEntryAndVerify(sinkPeer);
}
@@ -178,23 +172,30 @@ public class TestReplicationServerSink {
ReplicationServerManager replicationServerManager = MASTER.getReplicationServerManager();
assertNotNull(replicationServerManager);
TEST_UTIL.waitFor(60000, () -> !replicationServerManager.getOnlineServers().isEmpty()
- && null != replicationServerManager.getServerMetrics(replicationServerName));
+ && null != replicationServerManager.getServerMetrics(REPLICATION_SERVER_NAME));
// put data via replication server
testReplicateWAL();
- TEST_UTIL.waitFor(60000, () -> replicationServer.rpcServices.requestCount.sum() > 0
- && replicationServer.rpcServices.requestCount.sum() == replicationServerManager
- .getServerMetrics(replicationServerName).getRequestCount());
+ TEST_UTIL.waitFor(60000, () -> REPLICATION_SERVER.rpcServices.requestCount.sum() > 0
+ && REPLICATION_SERVER.rpcServices.requestCount.sum() == replicationServerManager
+ .getServerMetrics(REPLICATION_SERVER_NAME).getRequestCount());
}
@Test
public void testReplicationServerExpire() throws Exception {
+ int initialNum = TEST_UTIL.getMiniHBaseCluster().getNumLiveReplicationServers();
+ HReplicationServer replicationServer = new HReplicationServer(CONF);
+ replicationServer.start();
+ ServerName replicationServerName = replicationServer.getServerName();
+
ReplicationServerManager replicationServerManager = MASTER.getReplicationServerManager();
- TEST_UTIL.waitFor(60000, () -> !replicationServerManager.getOnlineServers().isEmpty()
+ TEST_UTIL.waitFor(60000, () ->
+ initialNum + 1 == replicationServerManager.getOnlineServers().size()
&& null != replicationServerManager.getServerMetrics(replicationServerName));
replicationServer.stop("test");
- TEST_UTIL.waitFor(180000, 1000, replicationServerManager.getOnlineServers()::isEmpty);
+ TEST_UTIL.waitFor(180000, 1000, () ->
+ initialNum == replicationServerManager.getOnlineServers().size());
assertNull(replicationServerManager.getServerMetrics(replicationServerName));
}
}
diff --git a/hbase-server/src/test/resources/hbase-site.xml b/hbase-server/src/test/resources/hbase-site.xml
index 5e64bfc..36187a3 100644
--- a/hbase-server/src/test/resources/hbase-site.xml
+++ b/hbase-server/src/test/resources/hbase-site.xml
@@ -90,6 +90,13 @@
</description>
</property>
<property>
+ <name>hbase.replicationserver.port</name>
+ <value>0</value>
+ <description>Always have replicationservers come up on port '0' so we don't clash over
+ default ports.
+ </description>
+ </property>
+ <property>
<name>hbase.ipc.client.fallback-to-simple-auth-allowed</name>
<value>true</value>
</property>