You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ni...@apache.org on 2021/07/14 15:21:18 UTC
[hbase] branch HBASE-25714 updated: HBASE-25830 HBaseCluster
support CompactionServer for UTs (addendum) (#3464)
This is an automated email from the ASF dual-hosted git repository.
niuyulin pushed a commit to branch HBASE-25714
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-25714 by this push:
new 27ec80d HBASE-25830 HBaseCluster support CompactionServer for UTs (addendum) (#3464)
27ec80d is described below
commit 27ec80d6e321ff04dcc8424b8e6e5ca0ba7374d0
Author: niuyulin <yu...@gmail.com>
AuthorDate: Wed Jul 14 23:20:42 2021 +0800
HBASE-25830 HBaseCluster support CompactionServer for UTs (addendum) (#3464)
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../org/apache/hadoop/hbase/LocalHBaseCluster.java | 57 ++++++++++-
.../apache/hadoop/hbase/util/JVMClusterUtil.java | 113 ++++++++++++---------
.../org/apache/hadoop/hbase/MiniHBaseCluster.java | 41 ++++++++
.../compactionserver/TestCompactionServer.java | 33 ++++--
4 files changed, 186 insertions(+), 58 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
index ec55182..b856c7a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
@@ -168,6 +168,19 @@ public class LocalHBaseCluster {
LOG.debug("Setting RS InfoServer Port to random.");
conf.set(HConstants.REGIONSERVER_INFO_PORT, "0");
}
+ if (conf.getInt(HConstants.COMPACTION_SERVER_PORT, HConstants.DEFAULT_COMPACTION_SERVER_PORT)
+ == HConstants.DEFAULT_COMPACTION_SERVER_PORT) {
+ LOG.debug("Setting CompactionServer Port to random.");
+ conf.set(HConstants.COMPACTION_SERVER_PORT, "0");
+ }
+ // treat info ports special; expressly don't change '-1' (keep off)
+ // in case we make that the default behavior.
+ if (conf.getInt(HConstants.COMPACTION_SERVER_INFO_PORT, 0) != -1 && conf.getInt(
+ HConstants.COMPACTION_SERVER_INFO_PORT, HConstants.DEFAULT_COMPACTION_SERVER_INFOPORT)
+ == HConstants.DEFAULT_COMPACTION_SERVER_INFOPORT) {
+ LOG.debug("Setting CS InfoServer Port to random.");
+ conf.set(HConstants.COMPACTION_SERVER_INFO_PORT, "0");
+ }
if (conf.getInt(HConstants.MASTER_INFO_PORT, 0) != -1 &&
conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT)
== HConstants.DEFAULT_MASTER_INFOPORT) {
@@ -309,6 +322,17 @@ public class LocalHBaseCluster {
}
/**
+ * Wait for the specified compaction server to stop. Removes this thread from list of running
+ * threads.
+ * @return Name of compaction server that just went down.
+ */
+ public String waitOnCompactionServer(int serverNumber) {
+ JVMClusterUtil.CompactionServerThread regionServerThread =
+ this.compactionServerThreads.get(serverNumber);
+ return waitOnCompactionServer(regionServerThread);
+ }
+
+ /**
* Wait for the specified region server to stop. Removes this thread from list of running threads.
* @return Name of region server that just went down.
*/
@@ -327,6 +351,25 @@ public class LocalHBaseCluster {
}
/**
+ * Wait for the specified compaction server to stop. Removes this thread from list of running
+ * threads.
+ * @return Name of compaction server that just went down.
+ */
+ public String waitOnCompactionServer(JVMClusterUtil.CompactionServerThread cst) {
+ while (cst.isAlive()) {
+ try {
+ LOG.info("Waiting on " + cst.getCompactionServer().toString());
+ cst.join();
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while waiting for {} to finish. Retrying join", cst.getName(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ compactionServerThreads.remove(cst);
+ return cst.getName();
+ }
+
+ /**
* @return the HMaster thread
*/
public HMaster getMaster(int serverNumber) {
@@ -427,6 +470,18 @@ public class LocalHBaseCluster {
}
}
}
+ if (this.compactionServerThreads != null) {
+ for(Thread t: this.compactionServerThreads) {
+ if (t.isAlive()) {
+ try {
+ Threads.threadDumpingIsAlive(t);
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted", e);
+ }
+ }
+ }
+ }
+
}
@SuppressWarnings("unchecked")
@@ -491,7 +546,7 @@ public class LocalHBaseCluster {
* Shut down the mini HBase cluster
*/
public void shutdown() {
- JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads);
+ JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads, this.compactionServerThreads);
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
index 57a74a0..6791900 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
@@ -294,12 +294,64 @@ public class JVMClusterUtil {
}
+ private static <T extends Thread> boolean shutdown(final List<T> servers,
+ boolean wasInterrupted) {
+ final long maxTime = System.currentTimeMillis() + 30 * 1000;
+ // first try nicely.
+ for (Thread t : servers) {
+ long now = System.currentTimeMillis();
+ if (t.isAlive() && !wasInterrupted && now < maxTime) {
+ try {
+ t.join(maxTime - now);
+ } catch (InterruptedException e) {
+ LOG.info(
+ "Got InterruptedException on shutdown - " + "not waiting anymore on region server ends",
+ e);
+ wasInterrupted = true; // someone wants us to speed up.
+ }
+ }
+ }
+
+ // Let's try to interrupt the remaining threads if any.
+ for (int i = 0; i < 100; ++i) {
+ boolean atLeastOneLiveServer = false;
+ for (Thread t : servers) {
+ if (t.isAlive()) {
+ atLeastOneLiveServer = true;
+ try {
+ LOG.warn("{} remaining, give one more chance before interrupting",
+ t.getClass().getSimpleName());
+ t.join(1000);
+ } catch (InterruptedException e) {
+ wasInterrupted = true;
+ }
+ }
+ }
+ if (!atLeastOneLiveServer) {
+ break;
+ }
+ for (Thread t : servers) {
+ if (t.isAlive()) {
+ LOG.warn(
+ "{} taking too long to stop, interrupting; thread dump " + "if > 3 attempts: i=" + i,
+ t.getClass().getSimpleName());
+ if (i > 3) {
+ Threads.printThreadInfo(System.out, "Thread dump " + t.getName());
+ }
+ t.interrupt();
+ }
+ }
+ }
+ return wasInterrupted;
+ }
+
/**
* @param masters
* @param regionservers
*/
public static void shutdown(final List<MasterThread> masters,
- final List<RegionServerThread> regionservers) {
+ final List<RegionServerThread> regionservers,
+ final List<CompactionServerThread> compactionServers) {
LOG.debug("Shutting down HBase Cluster");
if (masters != null) {
// Do backups first.
@@ -335,51 +387,18 @@ public class JVMClusterUtil {
}
}
boolean wasInterrupted = false;
- final long maxTime = System.currentTimeMillis() + 30 * 1000;
if (regionservers != null) {
- // first try nicely.
for (RegionServerThread t : regionservers) {
t.getRegionServer().stop("Shutdown requested");
}
- for (RegionServerThread t : regionservers) {
- long now = System.currentTimeMillis();
- if (t.isAlive() && !wasInterrupted && now < maxTime) {
- try {
- t.join(maxTime - now);
- } catch (InterruptedException e) {
- LOG.info("Got InterruptedException on shutdown - " +
- "not waiting anymore on region server ends", e);
- wasInterrupted = true; // someone wants us to speed up.
- }
- }
- }
-
- // Let's try to interrupt the remaining threads if any.
- for (int i = 0; i < 100; ++i) {
- boolean atLeastOneLiveServer = false;
- for (RegionServerThread t : regionservers) {
- if (t.isAlive()) {
- atLeastOneLiveServer = true;
- try {
- LOG.warn("RegionServerThreads remaining, give one more chance before interrupting");
- t.join(1000);
- } catch (InterruptedException e) {
- wasInterrupted = true;
- }
- }
- }
- if (!atLeastOneLiveServer) break;
- for (RegionServerThread t : regionservers) {
- if (t.isAlive()) {
- LOG.warn("RegionServerThreads taking too long to stop, interrupting; thread dump " +
- "if > 3 attempts: i=" + i);
- if (i > 3) {
- Threads.printThreadInfo(System.out, "Thread dump " + t.getName());
- }
- t.interrupt();
- }
- }
+ wasInterrupted = shutdown(regionservers, wasInterrupted);
+ }
+ if (compactionServers != null) {
+ // first try nicely.
+ for (CompactionServerThread t : compactionServers) {
+ t.getCompactionServer().stop("Shutdown requested");
}
+ wasInterrupted = shutdown(compactionServers, wasInterrupted);
}
if (masters != null) {
@@ -398,12 +417,14 @@ public class JVMClusterUtil {
}
}
}
- LOG.info("Shutdown of " +
- ((masters != null) ? masters.size() : "0") + " master(s) and " +
- ((regionservers != null) ? regionservers.size() : "0") +
- " regionserver(s) " + (wasInterrupted ? "interrupted" : "complete"));
+ LOG.info("Shutdown of " + ((masters != null) ? masters.size() : "0") + " master(s) and "
+ + ((regionservers != null) ? regionservers.size() : "0") + " regionserver(s) "
+ + ((compactionServers != null) ? compactionServers.size() : "0") + " compactionServer(s) "
+ + (wasInterrupted ? "interrupted" : "complete")
+
+ );
- if (wasInterrupted){
+ if (wasInterrupted) {
Thread.currentThread().interrupt();
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
index fc36ac8..05fad48 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
@@ -436,6 +436,25 @@ public class MiniHBaseCluster extends HBaseCluster {
return startRegionServer(newConf);
}
+ /**
+ * Starts a compaction server thread running
+ * @return New CompactionServerThread
+ */
+ public JVMClusterUtil.CompactionServerThread startCompactionServer() throws IOException {
+ final Configuration configuration = HBaseConfiguration.create(conf);
+ User rsUser = HBaseTestingUtility.getDifferentUser(configuration, ".hfs." + index++);
+ JVMClusterUtil.CompactionServerThread t = null;
+ try {
+ t = hbaseCluster.addCompactionServer(configuration,
+ hbaseCluster.getCompactionServers().size(), rsUser);
+ t.start();
+ t.waitForServerOnline();
+ } catch (InterruptedException ie) {
+ throw new IOException("Interrupted adding compactionserver to cluster", ie);
+ }
+ return t;
+ }
+
private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration)
throws IOException {
User rsUser =
@@ -503,6 +522,20 @@ public class MiniHBaseCluster extends HBaseCluster {
}
/**
+ * Shut down the specified compaction server cleanly
+ *
+ * @param serverNumber Used as index into a list.
+ * @return the compaction server that was stopped
+ */
+ public JVMClusterUtil.CompactionServerThread stopCompactionServer(int serverNumber) {
+ JVMClusterUtil.CompactionServerThread server =
+ hbaseCluster.getCompactionServers().get(serverNumber);
+ LOG.info("Stopping " + server.toString());
+ server.getCompactionServer().stop("Stopping rs " + serverNumber);
+ return server;
+ }
+
+ /**
* Shut down the specified region server cleanly
*
* @param serverNumber Used as index into a list.
@@ -557,6 +590,14 @@ public class MiniHBaseCluster extends HBaseCluster {
return this.hbaseCluster.waitOnRegionServer(serverNumber);
}
+ /**
+ * Wait for the specified compaction server to stop. Removes this thread from list
+ * of running threads.
+ * @return Name of compaction server that just went down.
+ */
+ public String waitOnCompactionServer(final int serverNumber) {
+ return this.hbaseCluster.waitOnCompactionServer(serverNumber);
+ }
/**
* Starts a master thread running
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
index dfc17e4..6d0cccc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
@@ -95,8 +95,12 @@ public class TestCompactionServer {
@Before
public void before() throws Exception {
- TEST_UTIL.createTable(TABLENAME, FAMILY);
+ TableDescriptor tableDescriptor =
+ TableDescriptorBuilder.newBuilder(TABLENAME).setCompactionOffloadEnabled(true).build();
+ TEST_UTIL.createTable(tableDescriptor, Bytes.toByteArrays(FAMILY),
+ TEST_UTIL.getConfiguration());
TEST_UTIL.waitTableAvailable(TABLENAME);
+ COMPACTION_SERVER.requestCount.reset();
}
@After
@@ -169,9 +173,9 @@ public class TestCompactionServer {
TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
ColumnFamilyDescriptor cfd =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(FAMILY)).setMaxVersions(3).build();
- TableDescriptor modifiedtableDescriptor =
- TableDescriptorBuilder.newBuilder(TABLENAME).setColumnFamily(cfd).build();
- TEST_UTIL.getAdmin().modifyTable(modifiedtableDescriptor);
+ TableDescriptor modifiedTableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME)
+ .setColumnFamily(cfd).setCompactionOffloadEnabled(true).build();
+ TEST_UTIL.getAdmin().modifyTable(modifiedTableDescriptor);
TEST_UTIL.waitTableAvailable(TABLENAME);
doFillRecord(1, 500, RandomUtils.nextBytes(20));
doFillRecord(1, 500, RandomUtils.nextBytes(20));
@@ -198,6 +202,8 @@ public class TestCompactionServer {
return hFileCount == 1;
});
+ // To ensure do compaction on compaction server
+ TEST_UTIL.waitFor(60000, () -> COMPACTION_SERVER.requestCount.sum() > 0);
kVCount = 0;
for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) {
for (HStoreFile hStoreFile : region.getStore(Bytes.toBytes(FAMILY)).getStorefiles()) {
@@ -208,11 +214,11 @@ public class TestCompactionServer {
verifyRecord(1, 500, true);
}
-
@Test
public void testCompactionServerDown() throws Exception {
TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
- COMPACTION_SERVER.stop("test");
+ TEST_UTIL.getHBaseCluster().stopCompactionServer(0);
+ TEST_UTIL.getHBaseCluster().waitOnCompactionServer(0);
TEST_UTIL.waitFor(60000,
() -> MASTER.getCompactionOffloadManager().getOnlineServersList().size() == 0);
doPutRecord(1, 1000, true);
@@ -232,6 +238,12 @@ public class TestCompactionServer {
return hFile == 1;
});
verifyRecord(1, 1000, true);
+ TEST_UTIL.getHBaseCluster().startCompactionServer();
+ COMPACTION_SERVER = TEST_UTIL.getMiniHBaseCluster().getCompactionServerThreads().get(0)
+ .getCompactionServer();
+ COMPACTION_SERVER_NAME = COMPACTION_SERVER.getServerName();
+ TEST_UTIL.waitFor(60000,
+ () -> MASTER.getCompactionOffloadManager().getOnlineServersList().size() == 1);
}
@Test
@@ -277,20 +289,19 @@ public class TestCompactionServer {
TableDescriptor htd =
TableDescriptorBuilder.newBuilder(TEST_UTIL.getAdmin().getDescriptor(TABLENAME))
- .setCompactionOffloadEnabled(true).build();
+ .setCompactionOffloadEnabled(false).build();
TEST_UTIL.getAdmin().modifyTable(htd);
TEST_UTIL.waitUntilAllRegionsAssigned(TABLENAME);
// invoke compact
TEST_UTIL.compact(TABLENAME, false);
- TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() > 0);
- long requestCount = COMPACTION_SERVER.requestCount.sum();
+ TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() == 0);
htd = TableDescriptorBuilder.newBuilder(TEST_UTIL.getAdmin().getDescriptor(TABLENAME))
- .setCompactionOffloadEnabled(false).build();
+ .setCompactionOffloadEnabled(true).build();
TEST_UTIL.getAdmin().modifyTable(htd);
TEST_UTIL.waitUntilAllRegionsAssigned(TABLENAME);
// invoke compact
TEST_UTIL.compact(TABLENAME, false);
- TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() == requestCount);
+ TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() > 0);
}
}