You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2014/08/01 00:08:20 UTC
[44/50] [abbrv] git commit: [HBASE-11592] Allow number of
SplitLogWorker threads to be online-configurable
[HBASE-11592] Allow number of SplitLogWorker threads to be online-configurable
Test Plan:
mvn test -Dtest.output=true -Dtest=TestRegionServerOnlineConfigChange --
successful
Reviewers: avandever, manukranthk, rshroff, aaiyer, adela, fan, daviddeng, gauravm
Reviewed By: gauravm
CC: hbase-eng@fb.com
Differential Revision: https://phabricator.fb.com/D1461935
Tasks: 4516873
git-svn-id: svn+ssh://tubbs/svnhive/hadoop/branches/titan/VENDOR.hbase/hbase-trunk@43780 e7acf4d4-3532-417f-9e73-7a9ae25a1f51
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cfa65f89
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cfa65f89
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cfa65f89
Branch: refs/heads/0.89-fb
Commit: cfa65f89c6af2b66890a4e651c5b1b84bc772e0e
Parents: 7281aa1
Author: jgoguen <jg...@e7acf4d4-3532-417f-9e73-7a9ae25a1f51>
Authored: Wed Jul 30 17:00:28 2014 +0000
Committer: Elliott Clark <el...@fb.com>
Committed: Thu Jul 31 14:44:24 2014 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/HConstants.java | 5 ++
.../hbase/regionserver/HRegionServer.java | 75 +++++++++++++++++---
.../TestRegionServerOnlineConfigChange.java | 21 +++++-
3 files changed, 90 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/cfa65f89/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/HConstants.java b/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 8be87a4..24cc8f7 100644
--- a/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -397,6 +397,11 @@ public final class HConstants {
"hbase.hregionserver.hlog.split.workers.num";
/**
+ * The default number of threads used for splitting logs in parallel.
+ */
+ public static final int DEFAULT_HREGIONSERVER_SPLITLOG_WORKERS_NUM = 3;
+
+ /**
* If using quorum reads from HDFS, the maximum size of the thread pool.
* value <= 0 disables quorum reads.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/cfa65f89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 7c1c736..207760f 100755
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1976,16 +1976,8 @@ public class HRegionServer implements HRegionServerIf, HBaseRPCErrorHandler,
this.server.start();
isRpcServerRunning = true;
}
- int numSplitLogWorkers = conf.getInt(HConstants.HREGIONSERVER_SPLITLOG_WORKERS_NUM, 3);
- // Create the log splitting worker and start it
- this.splitLogWorkers = new ArrayList<SplitLogWorker>(numSplitLogWorkers);
- for (int i = 0; i < numSplitLogWorkers; i++) {
- SplitLogWorker splitLogWorker = new SplitLogWorker(this.zooKeeperWrapper,
- this.getConfiguration(), this.serverInfo.getServerName(),
- logCloseThreadPool, masterRef);
- this.splitLogWorkers.add(splitLogWorker);
- splitLogWorker.start();
- }
+ int numSplitLogWorkers = conf.getInt(HConstants.HREGIONSERVER_SPLITLOG_WORKERS_NUM, HConstants.DEFAULT_HREGIONSERVER_SPLITLOG_WORKERS_NUM);
+ this.setNumberOfSplitLogWorkerThreads(numSplitLogWorkers);
// start the scanner prefetch threadpool
int numHandlers = conf.getInt("hbase.regionserver.handler.count", 10);
scanPrefetchThreadPool =
@@ -1996,6 +1988,62 @@ public class HRegionServer implements HRegionServerIf, HBaseRPCErrorHandler,
this.serverInfo.getServerAddress().toString());
}
+ /**
+ * Set the number of {@link SplitLogWorker} threads available. This is
+ * intended to be called at any time during the HRegionServer's lifetime,
+ * first called during {@link #startServiceThreads()}, and will create the
+ * backing {@link ArrayList} if it doesn't already exist.
+ *
+ * When the number of worker threads to use is less than the number of
+ * workers in use, extraneous workers are stopped. Worker threads at the
+ * start of the {@link ArrayList} are left alone, threads beyond index
+ * (numWorkers - 1) are stopped.
+ *
+ * When the number of worker threads to use is greater than the number of
+ * workers in use, the appropriate number of additional {@link SplitLogWorker}
+ * threads are added.
+ *
+ * @param numWorkers The number of worker threads to have running.
+ */
+ private void setNumberOfSplitLogWorkerThreads(int numWorkers) {
+ if (numWorkers <= 0 || (this.splitLogWorkers != null && numWorkers == this.splitLogWorkers.size())) {
+ // Don't accept zero or fewer workers, and don't do anything if we try to
+ // set the same number of threads already in use.
+ return;
+ }
+
+ if (null == this.splitLogWorkers) {
+ this.splitLogWorkers = new ArrayList<SplitLogWorker>(numWorkers);
+ }
+
+ if (numWorkers < this.splitLogWorkers.size()) {
+ // Fewer workers, stop threads
+ for (int idx = this.splitLogWorkers.size() - 1; idx >= numWorkers; idx--) {
+ this.splitLogWorkers.remove(idx).stop();
+ }
+ } else {
+ // More workers, spin up threads
+ int workersNeeded = numWorkers - this.splitLogWorkers.size();
+ do {
+ SplitLogWorker splitLogWorker = new SplitLogWorker(this.zooKeeperWrapper,
+ this.getConfiguration(), this.serverInfo.getServerName(),
+ logCloseThreadPool, masterRef);
+ this.splitLogWorkers.add(splitLogWorker);
+ splitLogWorker.start();
+ workersNeeded--;
+ } while (workersNeeded > 0);
+ }
+ }
+
+ /**
+ * Get the number of {@link SplitLogWorker} threads currently available.
+ *
+ * @return The number of {@link SplitLotWorker} threads currently running.
+ */
+ public int getNumberOfSplitLogWorkerThreads() {
+ return this.splitLogWorkers.size();
+ }
+
/*
* Verify that server is healthy
*/
@@ -4181,6 +4229,8 @@ public class HRegionServer implements HRegionServerIf, HBaseRPCErrorHandler,
HConstants.HDFS_QUORUM_READ_THREADS_MAX, HConstants.DEFAULT_HDFS_QUORUM_READ_THREADS_MAX);
long timeout = conf.getLong(
HConstants.HDFS_QUORUM_READ_TIMEOUT_MILLIS, HConstants.DEFAULT_HDFS_QUORUM_READ_TIMEOUT_MILLIS);
+ int numSplitLogWorkers = conf.getInt(
+ HConstants.HREGIONSERVER_SPLITLOG_WORKERS_NUM, HConstants.DEFAULT_HREGIONSERVER_SPLITLOG_WORKERS_NUM);
boolean origProfiling = enableServerSideProfilingForAllCalls.get();
boolean newProfiling = conf.getBoolean(
@@ -4201,6 +4251,11 @@ public class HRegionServer implements HRegionServerIf, HBaseRPCErrorHandler,
this.setHDFSQuorumReadTimeoutMillis(timeout);
}
+ if (numSplitLogWorkers != this.splitLogWorkers.size()) {
+ LOG.info("Number of SplitLogWorker threads is changed from " + this.splitLogWorkers.size() + " to " + numSplitLogWorkers);
+ this.setNumberOfSplitLogWorkerThreads(numSplitLogWorkers);
+ }
+
HRegionServer.useSeekNextUsingHint =
conf.getBoolean("hbase.regionserver.scan.timestampfilter.allow_seek_next_using_hint", true);
http://git-wip-us.apache.org/repos/asf/hbase/blob/cfa65f89/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java
index 3b3ac28..2f2b733 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java
@@ -364,5 +364,24 @@ public class TestRegionServerOnlineConfigChange extends TestCase {
assertEquals(tableDescPropCustomVal, s1.conf.get(tableDescProp));
assertEquals(tableDescPropCustomVal, s2.conf.get(tableDescProp));
}
-}
+ public void testSetSplitLogWorkerThreadCount() {
+ int firstLogWorkerLimit = 5;
+ int secondLogWorkerLimit = 10;
+ int thirdLogWorkerLimit = 3;
+
+ assertEquals(HConstants.DEFAULT_HREGIONSERVER_SPLITLOG_WORKERS_NUM, rs1.getNumberOfSplitLogWorkerThreads());
+
+ conf.setInt(HConstants.HREGIONSERVER_SPLITLOG_WORKERS_NUM, firstLogWorkerLimit);
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+ assertEquals(firstLogWorkerLimit, rs1.getNumberOfSplitLogWorkerThreads());
+
+ conf.setInt(HConstants.HREGIONSERVER_SPLITLOG_WORKERS_NUM, secondLogWorkerLimit);
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+ assertEquals(secondLogWorkerLimit, rs1.getNumberOfSplitLogWorkerThreads());
+
+ conf.setInt(HConstants.HREGIONSERVER_SPLITLOG_WORKERS_NUM, thirdLogWorkerLimit);
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+ assertEquals(thirdLogWorkerLimit, rs1.getNumberOfSplitLogWorkerThreads());
+ }
+}