You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2014/08/01 00:08:20 UTC

[44/50] [abbrv] git commit: [HBASE-11592] Allow number of SplitLogWorker threads to be online-configurable

[HBASE-11592] Allow number of SplitLogWorker threads to be online-configurable

Test Plan:
mvn test -Dtest.output=true -Dtest=TestRegionServerOnlineConfigChange --
successful

Reviewers: avandever, manukranthk, rshroff, aaiyer, adela, fan, daviddeng, gauravm

Reviewed By: gauravm

CC: hbase-eng@fb.com

Differential Revision: https://phabricator.fb.com/D1461935

Tasks: 4516873

git-svn-id: svn+ssh://tubbs/svnhive/hadoop/branches/titan/VENDOR.hbase/hbase-trunk@43780 e7acf4d4-3532-417f-9e73-7a9ae25a1f51


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cfa65f89
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cfa65f89
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cfa65f89

Branch: refs/heads/0.89-fb
Commit: cfa65f89c6af2b66890a4e651c5b1b84bc772e0e
Parents: 7281aa1
Author: jgoguen <jg...@e7acf4d4-3532-417f-9e73-7a9ae25a1f51>
Authored: Wed Jul 30 17:00:28 2014 +0000
Committer: Elliott Clark <el...@fb.com>
Committed: Thu Jul 31 14:44:24 2014 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/HConstants.java     |  5 ++
 .../hbase/regionserver/HRegionServer.java       | 75 +++++++++++++++++---
 .../TestRegionServerOnlineConfigChange.java     | 21 +++++-
 3 files changed, 90 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/cfa65f89/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/HConstants.java b/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 8be87a4..24cc8f7 100644
--- a/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -397,6 +397,11 @@ public final class HConstants {
     "hbase.hregionserver.hlog.split.workers.num";
 
   /**
+   * The default number of threads used for splitting logs in parallel.
+   */
+  public static final int DEFAULT_HREGIONSERVER_SPLITLOG_WORKERS_NUM = 3;
+
+  /**
    * If using quorum reads from HDFS, the maximum size of the thread pool.
    * value <= 0 disables quorum reads.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/cfa65f89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 7c1c736..207760f 100755
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1976,16 +1976,8 @@ public class HRegionServer implements HRegionServerIf, HBaseRPCErrorHandler,
       this.server.start();
       isRpcServerRunning = true;
     }
-    int numSplitLogWorkers = conf.getInt(HConstants.HREGIONSERVER_SPLITLOG_WORKERS_NUM, 3);
-    // Create the log splitting worker and start it
-    this.splitLogWorkers = new ArrayList<SplitLogWorker>(numSplitLogWorkers);
-    for (int i = 0; i < numSplitLogWorkers; i++) {
-      SplitLogWorker splitLogWorker = new SplitLogWorker(this.zooKeeperWrapper,
-          this.getConfiguration(), this.serverInfo.getServerName(),
-          logCloseThreadPool, masterRef);
-      this.splitLogWorkers.add(splitLogWorker);
-      splitLogWorker.start();
-    }
+    int numSplitLogWorkers = conf.getInt(HConstants.HREGIONSERVER_SPLITLOG_WORKERS_NUM, HConstants.DEFAULT_HREGIONSERVER_SPLITLOG_WORKERS_NUM);
+    this.setNumberOfSplitLogWorkerThreads(numSplitLogWorkers);
     // start the scanner prefetch threadpool
     int numHandlers = conf.getInt("hbase.regionserver.handler.count", 10);
     scanPrefetchThreadPool =
@@ -1996,6 +1988,62 @@ public class HRegionServer implements HRegionServerIf, HBaseRPCErrorHandler,
       this.serverInfo.getServerAddress().toString());
   }
 
+  /**
+   * Set the number of {@link SplitLogWorker} threads available. This is
+   * intended to be called at any time during the HRegionServer's lifetime,
+   * first called during {@link #startServiceThreads()}, and will create the
+   * backing {@link ArrayList} if it doesn't already exist.
+   *
+   * When the number of worker threads to use is less than the number of
+   * workers in use, extraneous workers are stopped. Worker threads at the
+   * start of the {@link ArrayList} are left alone, threads beyond index
+   * (numWorkers - 1) are stopped.
+   *
+   * When the number of worker threads to use is greater than the number of
+   * workers in use, the appropriate number of additional {@link SplitLogWorker}
+   * threads are added.
+   *
+   * @param numWorkers The number of worker threads to have running.
+   */
+  private void setNumberOfSplitLogWorkerThreads(int numWorkers) {
+    if (numWorkers <= 0 || (this.splitLogWorkers != null && numWorkers == this.splitLogWorkers.size())) {
+      // Don't accept zero or fewer workers, and don't do anything if we try to
+      // set the same number of threads already in use.
+      return;
+    }
+
+    if (null == this.splitLogWorkers) {
+      this.splitLogWorkers = new ArrayList<SplitLogWorker>(numWorkers);
+    }
+
+    if (numWorkers < this.splitLogWorkers.size()) {
+      // Fewer workers, stop threads
+      for (int idx = this.splitLogWorkers.size() - 1; idx >= numWorkers; idx--) {
+        this.splitLogWorkers.remove(idx).stop();
+      }
+    } else {
+      // More workers, spin up threads
+      int workersNeeded = numWorkers - this.splitLogWorkers.size();
+      do {
+        SplitLogWorker splitLogWorker = new SplitLogWorker(this.zooKeeperWrapper,
+          this.getConfiguration(), this.serverInfo.getServerName(),
+          logCloseThreadPool, masterRef);
+        this.splitLogWorkers.add(splitLogWorker);
+        splitLogWorker.start();
+        workersNeeded--;
+      } while (workersNeeded > 0);
+    }
+  }
+
+  /**
+   * Get the number of {@link SplitLogWorker} threads currently available.
+   *
+   * @return The number of {@link SplitLotWorker} threads currently running.
+   */
+  public int getNumberOfSplitLogWorkerThreads() {
+    return this.splitLogWorkers.size();
+  }
+
   /*
    * Verify that server is healthy
    */
@@ -4181,6 +4229,8 @@ public class HRegionServer implements HRegionServerIf, HBaseRPCErrorHandler,
         HConstants.HDFS_QUORUM_READ_THREADS_MAX, HConstants.DEFAULT_HDFS_QUORUM_READ_THREADS_MAX);
     long timeout = conf.getLong(
         HConstants.HDFS_QUORUM_READ_TIMEOUT_MILLIS, HConstants.DEFAULT_HDFS_QUORUM_READ_TIMEOUT_MILLIS);
+    int numSplitLogWorkers = conf.getInt(
+        HConstants.HREGIONSERVER_SPLITLOG_WORKERS_NUM, HConstants.DEFAULT_HREGIONSERVER_SPLITLOG_WORKERS_NUM);
 
     boolean origProfiling = enableServerSideProfilingForAllCalls.get();
     boolean newProfiling = conf.getBoolean(
@@ -4201,6 +4251,11 @@ public class HRegionServer implements HRegionServerIf, HBaseRPCErrorHandler,
       this.setHDFSQuorumReadTimeoutMillis(timeout);
     }
 
+    if (numSplitLogWorkers != this.splitLogWorkers.size()) {
+      LOG.info("Number of SplitLogWorker threads is changed from " + this.splitLogWorkers.size() + " to " + numSplitLogWorkers);
+      this.setNumberOfSplitLogWorkerThreads(numSplitLogWorkers);
+    }
+
     HRegionServer.useSeekNextUsingHint =
         conf.getBoolean("hbase.regionserver.scan.timestampfilter.allow_seek_next_using_hint", true);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/cfa65f89/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java
index 3b3ac28..2f2b733 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java
@@ -364,5 +364,24 @@ public class TestRegionServerOnlineConfigChange extends TestCase {
     assertEquals(tableDescPropCustomVal, s1.conf.get(tableDescProp));
     assertEquals(tableDescPropCustomVal, s2.conf.get(tableDescProp));
   }
-}
 
+  public void testSetSplitLogWorkerThreadCount() {
+    int firstLogWorkerLimit = 5;
+    int secondLogWorkerLimit = 10;
+    int thirdLogWorkerLimit = 3;
+
+    assertEquals(HConstants.DEFAULT_HREGIONSERVER_SPLITLOG_WORKERS_NUM, rs1.getNumberOfSplitLogWorkerThreads());
+
+    conf.setInt(HConstants.HREGIONSERVER_SPLITLOG_WORKERS_NUM, firstLogWorkerLimit);
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+    assertEquals(firstLogWorkerLimit, rs1.getNumberOfSplitLogWorkerThreads());
+
+    conf.setInt(HConstants.HREGIONSERVER_SPLITLOG_WORKERS_NUM, secondLogWorkerLimit);
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+    assertEquals(secondLogWorkerLimit, rs1.getNumberOfSplitLogWorkerThreads());
+
+    conf.setInt(HConstants.HREGIONSERVER_SPLITLOG_WORKERS_NUM, thirdLogWorkerLimit);
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+    assertEquals(thirdLogWorkerLimit, rs1.getNumberOfSplitLogWorkerThreads());
+  }
+}