You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bh...@apache.org on 2021/03/09 17:04:24 UTC

[hbase] branch branch-2 updated: HBASE-25547: Thread pools should release unused resources (#3037)

This is an automated email from the ASF dual-hosted git repository.

bharathv pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 321b5c7   HBASE-25547: Thread pools should release unused resources (#3037)
321b5c7 is described below

commit 321b5c7979d4d3a4098b22b8cf22796187717981
Author: Bharath Vissapragada <bh...@apache.org>
AuthorDate: Tue Mar 9 09:03:16 2021 -0800

     HBASE-25547: Thread pools should release unused resources (#3037)
    
    * HBASE-25547: Thread pools should release unused resources (#2922)
    
    Plumbs the configuration needed to enable core thread timeout on non-critical thread pools.
    Currently only enabled for thread pools with op-codes RS_LOG_REPLAY_OPS, RS_PARALLEL_SEEK, MASTER_SNAPSHOT_OPERATIONS, MASTER_MERGE_OPERATIONS. Others can be added later as
    needed.
    
    Signed-off-by: Michael Stack <st...@apache.org>
    Signed-off-by: Viraj Jasani <vj...@apache.org>
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
    (cherry picked from commit 618236dd9040625cf38a4d98338ccff8ab35bfec)
    
    * HBASE-25547 (addendum): Roll ExecutorType into ExecutorConfig
    
    Signed-off-by: Wellington Ramos Chevreuil <wc...@apache.org>
    (cherry picked from commit 4c822d7463589796472725524be4fa2509c7bcef)
---
 .../hadoop/hbase/executor/ExecutorService.java     | 111 +++++++++++++++------
 .../org/apache/hadoop/hbase/master/HMaster.java    |  52 ++++++----
 .../hadoop/hbase/regionserver/HRegionServer.java   |  73 +++++++++-----
 .../regionserver/RegionServicesForStores.java      |   8 +-
 .../hadoop/hbase/executor/TestExecutorService.java |  14 +--
 .../regionserver/TestHRegionReplayEvents.java      |   6 +-
 .../hbase/regionserver/TestSplitLogWorker.java     |   4 +-
 7 files changed, 184 insertions(+), 84 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
index 2fe4462..a49beef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
@@ -37,6 +37,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture;
@@ -50,11 +51,10 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
  * and a <code>Runnable</code> that handles the object that is added to the queue.
  *
  * <p>In order to create a new service, create an instance of this class and
- * then do: <code>instance.startExecutorService("myService");</code>.  When done
- * call {@link #shutdown()}.
+ * then do: <code>instance.startExecutorService(executorConfig);</code>. {@link ExecutorConfig}
+ * wraps the configuration needed by this service. When done call {@link #shutdown()}.
  *
- * <p>In order to use the service created above, call
- * {@link #submit(EventHandler)}.
+ * <p>In order to use the service created above, call {@link #submit(EventHandler)}.
  */
 @InterfaceAudience.Private
 public class ExecutorService {
@@ -81,14 +81,15 @@ public class ExecutorService {
   /**
    * Start an executor service with a given name. If there was a service already
    * started with the same name, this throws a RuntimeException.
-   * @param name Name of the service to start.
+   * @param config Configuration to use for the executor.
    */
-  public void startExecutorService(String name, int maxThreads) {
+  public void startExecutorService(final ExecutorConfig config) {
+    final String name = config.getName();
     if (this.executorMap.get(name) != null) {
       throw new RuntimeException("An executor service with the name " + name +
         " is already running!");
     }
-    Executor hbes = new Executor(name, maxThreads);
+    Executor hbes = new Executor(config);
     if (this.executorMap.putIfAbsent(name, hbes) != null) {
       throw new RuntimeException("An executor service with the name " + name +
       " is already running (2)!");
@@ -119,33 +120,21 @@ public class ExecutorService {
   }
 
   Executor getExecutor(String name) {
-    Executor executor = this.executorMap.get(name);
-    return executor;
+    return this.executorMap.get(name);
   }
 
   public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) {
     return getExecutor(type).getThreadPoolExecutor();
   }
 
-  public void startExecutorService(final ExecutorType type, final int maxThreads) {
-    String name = type.getExecutorName(this.servername);
-    if (isExecutorServiceRunning(name)) {
-      LOG.debug("Executor service " + toString() + " already running on " + this.servername);
-      return;
-    }
-    startExecutorService(name, maxThreads);
-  }
-
   /**
    * Initialize the executor lazily, Note if an executor need to be initialized lazily, then all
    * paths should use this method to get the executor, should not start executor by using
-   * {@link ExecutorService#startExecutorService(ExecutorType, int)}
+   * {@link ExecutorService#startExecutorService(ExecutorConfig)}
    */
-  public ThreadPoolExecutor getExecutorLazily(ExecutorType type, int maxThreads) {
-    String name = type.getExecutorName(this.servername);
-    return executorMap
-        .computeIfAbsent(name, (executorName) -> new Executor(executorName, maxThreads))
-        .getThreadPoolExecutor();
+  public ThreadPoolExecutor getExecutorLazily(ExecutorConfig config) {
+    return executorMap.computeIfAbsent(config.getName(), (executorName) ->
+        new Executor(config)).getThreadPoolExecutor();
   }
 
   public void submit(final EventHandler eh) {
@@ -182,11 +171,71 @@ public class ExecutorService {
   }
 
   /**
+   * Configuration wrapper for {@link Executor}.
+   */
+  public class ExecutorConfig {
+    // Refer to ThreadPoolExecutor javadoc for details of these configuration.
+    // Argument validation and bound checks delegated to the underlying ThreadPoolExecutor
+    // implementation.
+    public static final long KEEP_ALIVE_TIME_MILLIS_DEFAULT = 1000;
+    private int corePoolSize = -1;
+    private boolean allowCoreThreadTimeout = false;
+    private long keepAliveTimeMillis = KEEP_ALIVE_TIME_MILLIS_DEFAULT;
+    private ExecutorType executorType;
+
+    public ExecutorConfig setExecutorType(ExecutorType type) {
+      this.executorType = type;
+      return this;
+    }
+
+    private ExecutorType getExecutorType() {
+      return Preconditions.checkNotNull(executorType, "ExecutorType not set.");
+    }
+
+    public int getCorePoolSize() {
+      return corePoolSize;
+    }
+
+    public ExecutorConfig setCorePoolSize(int corePoolSize) {
+      this.corePoolSize = corePoolSize;
+      return this;
+    }
+
+    public boolean allowCoreThreadTimeout() {
+      return allowCoreThreadTimeout;
+    }
+
+    /**
+     * Allows timing out of core threads. Good to set this for non-critical thread pools for
+     * release of unused resources. Refer to {@link ThreadPoolExecutor#allowCoreThreadTimeOut}
+     * for additional details.
+     */
+    public ExecutorConfig setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
+      this.allowCoreThreadTimeout = allowCoreThreadTimeout;
+      return this;
+    }
+
+    /**
+     * @return the executor name inferred from the type and the servername on which this is running.
+     */
+    public String getName() {
+      return getExecutorType().getExecutorName(servername);
+    }
+
+    public long getKeepAliveTimeMillis() {
+      return keepAliveTimeMillis;
+    }
+
+    public ExecutorConfig setKeepAliveTimeMillis(long keepAliveTimeMillis) {
+      this.keepAliveTimeMillis = keepAliveTimeMillis;
+      return this;
+    }
+  }
+
+  /**
    * Executor instance.
    */
   static class Executor {
-    // how long to retain excess threads
-    static final long keepAliveTimeInMillis = 1000;
     // the thread pool executor that services the requests
     final TrackingThreadPoolExecutor threadPoolExecutor;
     // work queue to use - unbounded queue
@@ -195,13 +244,15 @@ public class ExecutorService {
     private static final AtomicLong seqids = new AtomicLong(0);
     private final long id;
 
-    protected Executor(String name, int maxThreads) {
+    protected Executor(ExecutorConfig config) {
       this.id = seqids.incrementAndGet();
-      this.name = name;
+      this.name = config.getName();
       // create the thread pool executor
       this.threadPoolExecutor = new TrackingThreadPoolExecutor(
-          maxThreads, maxThreads,
-          keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q);
+          // setting maxPoolSize > corePoolSize has no effect since we use an unbounded task queue.
+          config.getCorePoolSize(), config.getCorePoolSize(),
+          config.getKeepAliveTimeMillis(), TimeUnit.MILLISECONDS, q);
+      this.threadPoolExecutor.allowCoreThreadTimeOut(config.allowCoreThreadTimeout());
       // name the threads for this threadpool
       ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
       tfb.setNameFormat(this.name + "-%d");
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 9fa6653..d3e6946 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -89,6 +89,7 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.exceptions.MasterStoppedException;
+import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
 import org.apache.hadoop.hbase.executor.ExecutorType;
 import org.apache.hadoop.hbase.favored.FavoredNodesManager;
 import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
@@ -1293,30 +1294,45 @@ public class HMaster extends HRegionServer implements MasterServices {
    */
   private void startServiceThreads() throws IOException {
     // Start the executor service pools
-    this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt(
-      HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT));
-    this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt(
-      HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT));
-    this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
-      conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS,
-        HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT));
-    this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
-      conf.getInt(HConstants.MASTER_META_SERVER_OPERATIONS_THREADS,
-        HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT));
-    this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt(
-      HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT));
-    this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, conf.getInt(
-      SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT));
-    this.executorService.startExecutorService(ExecutorType.MASTER_MERGE_OPERATIONS, conf.getInt(
-        HConstants.MASTER_MERGE_DISPATCH_THREADS,
-        HConstants.MASTER_MERGE_DISPATCH_THREADS_DEFAULT));
+    final int masterOpenRegionPoolSize = conf.getInt(
+        HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT);
+    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+        ExecutorType.MASTER_OPEN_REGION).setCorePoolSize(masterOpenRegionPoolSize));
+    final int masterCloseRegionPoolSize = conf.getInt(
+        HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT);
+    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+        ExecutorType.MASTER_CLOSE_REGION).setCorePoolSize(masterCloseRegionPoolSize));
+    final int masterServerOpThreads = conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS,
+        HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT);
+    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+        ExecutorType.MASTER_SERVER_OPERATIONS).setCorePoolSize(masterServerOpThreads));
+    final int masterServerMetaOpsThreads = conf.getInt(
+        HConstants.MASTER_META_SERVER_OPERATIONS_THREADS,
+        HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT);
+    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+        ExecutorType.MASTER_META_SERVER_OPERATIONS).setCorePoolSize(masterServerMetaOpsThreads));
+    final int masterLogReplayThreads = conf.getInt(
+        HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT);
+    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+        ExecutorType.M_LOG_REPLAY_OPS).setCorePoolSize(masterLogReplayThreads));
+    final int masterSnapshotThreads = conf.getInt(
+        SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT);
+    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+        ExecutorType.MASTER_SNAPSHOT_OPERATIONS).setCorePoolSize(masterSnapshotThreads)
+        .setAllowCoreThreadTimeout(true));
+    final int masterMergeDispatchThreads = conf.getInt(HConstants.MASTER_MERGE_DISPATCH_THREADS,
+        HConstants.MASTER_MERGE_DISPATCH_THREADS_DEFAULT);
+    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+        ExecutorType.MASTER_MERGE_OPERATIONS).setCorePoolSize(masterMergeDispatchThreads)
+        .setAllowCoreThreadTimeout(true));
 
     // We depend on there being only one instance of this executor running
     // at a time. To do concurrency, would need fencing of enable/disable of
     // tables.
     // Any time changing this maxThreads to > 1, pls see the comment at
     // AccessController#postCompletedCreateTableAction
-    this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
+    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+        ExecutorType.MASTER_TABLE_OPERATIONS).setCorePoolSize(1));
     startProcedureExecutor();
 
     // Create cleaner thread pool
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 1909816..a8931de 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -102,6 +102,7 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
 import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
 import org.apache.hadoop.hbase.executor.ExecutorType;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.http.InfoServer;
@@ -2026,34 +2027,56 @@ public class HRegionServer extends Thread implements
     choreService.scheduleChore(compactedFileDischarger);
 
     // Start executor services
-    this.executorService.startExecutorService(ExecutorType.RS_OPEN_REGION,
-        conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
-    this.executorService.startExecutorService(ExecutorType.RS_OPEN_META,
-        conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
-    this.executorService.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION,
-        conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3));
-    this.executorService.startExecutorService(ExecutorType.RS_CLOSE_REGION,
-        conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
-    this.executorService.startExecutorService(ExecutorType.RS_CLOSE_META,
-        conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
+    final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3);
+    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+        ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads));
+    final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1);
+    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+        ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads));
+    final int openPriorityRegionThreads =
+        conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
+    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+        ExecutorType.RS_OPEN_PRIORITY_REGION).setCorePoolSize(openPriorityRegionThreads));
+    final int closeRegionThreads =
+        conf.getInt("hbase.regionserver.executor.closeregion.threads", 3);
+    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+        ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads));
+    final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1);
+    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+        ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads));
     if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
-      this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
-          conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
-    }
-    this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
-        HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
+      final int storeScannerParallelSeekThreads =
+          conf.getInt("hbase.storescanner.parallel.seek.threads", 10);
+      executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+          ExecutorType.RS_PARALLEL_SEEK).setCorePoolSize(storeScannerParallelSeekThreads)
+          .setAllowCoreThreadTimeout(true));
+    }
+    final int logReplayOpsThreads = conf.getInt(
+        HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
+    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+        ExecutorType.RS_LOG_REPLAY_OPS).setCorePoolSize(logReplayOpsThreads)
+        .setAllowCoreThreadTimeout(true));
     // Start the threads for compacted files discharger
-    this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
-        conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10));
+    final int compactionDischargerThreads =
+        conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10);
+    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+        ExecutorType.RS_COMPACTED_FILES_DISCHARGER).setCorePoolSize(compactionDischargerThreads));
     if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
-      this.executorService.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
-          conf.getInt("hbase.regionserver.region.replica.flusher.threads",
-              conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
-    }
-    this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER,
-      conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2));
-    this.executorService.startExecutorService(ExecutorType.RS_SWITCH_RPC_THROTTLE,
-      conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1));
+      final int regionReplicaFlushThreads = conf.getInt(
+          "hbase.regionserver.region.replica.flusher.threads", conf.getInt(
+              "hbase.regionserver.executor.openregion.threads", 3));
+      executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+          ExecutorType.RS_REGION_REPLICA_FLUSH_OPS).setCorePoolSize(regionReplicaFlushThreads));
+    }
+    final int refreshPeerThreads =
+        conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
+    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+        ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads));
+
+    final int switchRpcThrottleThreads =
+        conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
+    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+        ExecutorType.RS_SWITCH_RPC_THROTTLE).setCorePoolSize(switchRpcThrottleThreads));
 
     Threads
       .setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", uncaughtExceptionHandler);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
index 06795a5..eda04da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
@@ -22,6 +22,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
 import org.apache.hadoop.hbase.executor.ExecutorType;
 import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.wal.WAL;
@@ -96,8 +98,10 @@ public class RegionServicesForStores {
 
   ThreadPoolExecutor getInMemoryCompactionPool() {
     if (rsServices != null) {
-      return rsServices.getExecutorService().getExecutorLazily(ExecutorType.RS_IN_MEMORY_COMPACTION,
-        inMemoryPoolSize);
+      ExecutorService executorService = rsServices.getExecutorService();
+      ExecutorConfig config = executorService.new ExecutorConfig().setExecutorType(
+          ExecutorType.RS_IN_MEMORY_COMPACTION).setCorePoolSize(inMemoryPoolSize);
+      return executorService.getExecutorLazily(config);
     } else {
       // this could only happen in tests
       return getInMemoryCompactionPoolForTest();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java
index c93e951..6b58d07 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.Waiter.Predicate;
 import org.apache.hadoop.hbase.executor.ExecutorService.Executor;
+import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
 import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -70,8 +71,8 @@ public class TestExecutorService {
 
     // Start an executor service pool with max 5 threads
     ExecutorService executorService = new ExecutorService("unit_test");
-    executorService.startExecutorService(
-      ExecutorType.MASTER_SERVER_OPERATIONS, maxThreads);
+    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+        ExecutorType.MASTER_SERVER_OPERATIONS).setCorePoolSize(maxThreads));
 
     Executor executor =
       executorService.getExecutor(ExecutorType.MASTER_SERVER_OPERATIONS);
@@ -138,7 +139,7 @@ public class TestExecutorService {
     }
 
     // Make sure threads are still around even after their timetolive expires.
-    Thread.sleep(ExecutorService.Executor.keepAliveTimeInMillis * 2);
+    Thread.sleep(ExecutorConfig.KEEP_ALIVE_TIME_MILLIS_DEFAULT * 2);
     assertEquals(maxThreads, pool.getPoolSize());
 
     executorService.shutdown();
@@ -196,8 +197,8 @@ public class TestExecutorService {
     when(server.getConfiguration()).thenReturn(conf);
 
     ExecutorService executorService = new ExecutorService("unit_test");
-    executorService.startExecutorService(
-      ExecutorType.MASTER_SERVER_OPERATIONS, 1);
+    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+        ExecutorType.MASTER_SERVER_OPERATIONS).setCorePoolSize(1));
 
 
     executorService.submit(new EventHandler(server, EventType.M_SERVER_SHUTDOWN) {
@@ -229,7 +230,8 @@ public class TestExecutorService {
     when(server.getConfiguration()).thenReturn(conf);
 
     ExecutorService executorService = new ExecutorService("testSnapshotHandlers");
-    executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, 1);
+    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+        ExecutorType.MASTER_SNAPSHOT_OPERATIONS).setCorePoolSize(1));
 
     CountDownLatch latch = new CountDownLatch(1);
     CountDownLatch waitForEventToStart = new CountDownLatch(1);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index 4be04f9..7b62119 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -60,6 +60,8 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
+import org.apache.hadoop.hbase.executor.ExecutorType;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
@@ -190,8 +192,8 @@ public class TestHRegionReplayEvents {
     String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER
         .toString();
     ExecutorService es = new ExecutorService(string);
-    es.startExecutorService(
-      string+"-"+string, 1);
+    es.startExecutorService(es.new ExecutorConfig().setCorePoolSize(1).setExecutorType(
+        ExecutorType.RS_COMPACTED_FILES_DISCHARGER));
     when(rss.getExecutorService()).thenReturn(es);
     primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
     primaryRegion.close();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
index 8388f2d..e58a37b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
 import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
 import org.apache.hadoop.hbase.executor.ExecutorType;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -212,7 +213,8 @@ public class TestSplitLogWorker {
 
     SplitLogCounters.resetCounters();
     executorService = new ExecutorService("TestSplitLogWorker");
-    executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10);
+    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
+        ExecutorType.RS_LOG_REPLAY_OPS).setCorePoolSize(10));
   }
 
   @After