You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/02/01 02:02:16 UTC

[hbase] branch master updated: HBASE-21764 Size of in-memory compaction thread pool shoud be configurable

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 81ebe6d  HBASE-21764 Size of in-memory compaction thread pool shoud be configurable
81ebe6d is described below

commit 81ebe6d40d485a3aee4451acc340cf6db62609b7
Author: huzheng <op...@gmail.com>
AuthorDate: Wed Jan 30 16:15:02 2019 +0800

    HBASE-21764 Size of in-memory compaction thread pool shoud be configurable
---
 .../hadoop/hbase/executor/ExecutorService.java     | 16 +++++++-
 .../apache/hadoop/hbase/executor/ExecutorType.java |  3 +-
 .../hbase/regionserver/CompactingMemStore.java     |  4 ++
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  4 +-
 .../regionserver/RegionServicesForStores.java      | 35 +++++++++---------
 .../hbase/regionserver/TestCompactingMemStore.java |  9 ++++-
 .../hadoop/hbase/regionserver/TestHStore.java      |  4 ++
 .../TestRecoveredEditsReplayAndAbort.java          |  1 +
 .../TestWalAndCompactingMemStoreFlush.java         | 43 +++++++---------------
 .../regionserver/wal/AbstractTestWALReplay.java    |  1 +
 10 files changed, 68 insertions(+), 52 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
index 4f8909e..71d8ea5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
@@ -32,6 +32,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -133,13 +134,24 @@ public class ExecutorService {
   public void startExecutorService(final ExecutorType type, final int maxThreads) {
     String name = type.getExecutorName(this.servername);
     if (isExecutorServiceRunning(name)) {
-      LOG.debug("Executor service " + toString() + " already running on " +
-          this.servername);
+      LOG.debug("Executor service " + toString() + " already running on " + this.servername);
       return;
     }
     startExecutorService(name, maxThreads);
   }
 
+  /**
+   * Initialize the executor lazily, Note if an executor need to be initialized lazily, then all
+   * paths should use this method to get the executor, should not start executor by using
+   * {@link ExecutorService#startExecutorService(ExecutorType, int)}
+   */
+  public ThreadPoolExecutor getExecutorLazily(ExecutorType type, int maxThreads) {
+    String name = type.getExecutorName(this.servername);
+    return executorMap
+        .computeIfAbsent(name, (executorName) -> new Executor(executorName, maxThreads))
+        .getThreadPoolExecutor();
+  }
+
   public void submit(final EventHandler eh) {
     Executor executor = getExecutor(eh.getEventType().getExecutorServiceType());
     if (executor == null) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
index 819f369..d354d62 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
@@ -49,7 +49,8 @@ public enum ExecutorType {
   RS_OPEN_PRIORITY_REGION    (30),
   RS_REFRESH_PEER(31),
   RS_REPLAY_SYNC_REPLICATION_WAL(32),
-  RS_SWITCH_RPC_THROTTLE(33);
+  RS_SWITCH_RPC_THROTTLE(33),
+  RS_IN_MEMORY_COMPACTION(34);
 
   ExecutorType(int value) {
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 834d9c1..64c2c53 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -63,6 +63,10 @@ public class CompactingMemStore extends AbstractMemStore {
   public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
       "hbase.memstore.inmemoryflush.threshold.factor";
   private static final int IN_MEMORY_FLUSH_MULTIPLIER = 1;
+  // In-Memory compaction pool size
+  public static final String IN_MEMORY_CONPACTION_POOL_SIZE_KEY =
+      "hbase.regionserver.inmemory.compaction.pool.size";
+  public static final int IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT = 10;
 
   private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class);
   private HStore store;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 6b36fa9..c95f61d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -298,7 +298,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   // Track data size in all memstores
   private final MemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing();
-  private final RegionServicesForStores regionServicesForStores = new RegionServicesForStores(this);
+  @VisibleForTesting
+  RegionServicesForStores regionServicesForStores;
 
   // Debug possible data loss due to WAL off
   final LongAdder numMutationsWithoutWAL = new LongAdder();
@@ -784,6 +785,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       this.blockCache = rsServices.getBlockCache().orElse(null);
       this.mobFileCache = rsServices.getMobFileCache().orElse(null);
     }
+    this.regionServicesForStores = new RegionServicesForStores(this, rsServices);
 
     setHTableSpecificConf();
     this.scannerReadPoints = new ConcurrentHashMap<>();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
index 91b23b3..ae9977e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
@@ -18,12 +18,10 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.executor.ExecutorType;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -39,22 +37,18 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
 @InterfaceAudience.Private
 public class RegionServicesForStores {
 
-  private static final int POOL_SIZE = 10;
-  private static final ThreadPoolExecutor INMEMORY_COMPACTION_POOL =
-      new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 60, TimeUnit.SECONDS,
-          new LinkedBlockingQueue<>(),
-          new ThreadFactory() {
-            @Override
-            public Thread newThread(Runnable r) {
-              String name = Thread.currentThread().getName() + "-inmemoryCompactions-" +
-                  System.currentTimeMillis();
-              return new Thread(r, name);
-            }
-          });
   private final HRegion region;
+  private final RegionServerServices rsServices;
+  private int inMemoryPoolSize;
 
-  public RegionServicesForStores(HRegion region) {
+  public RegionServicesForStores(HRegion region, RegionServerServices rsServices) {
     this.region = region;
+    this.rsServices = rsServices;
+    if (this.rsServices != null) {
+      this.inMemoryPoolSize = rsServices.getConfiguration().getInt(
+        CompactingMemStore.IN_MEMORY_CONPACTION_POOL_SIZE_KEY,
+        CompactingMemStore.IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT);
+    }
   }
 
   public void addMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta,
@@ -70,7 +64,14 @@ public class RegionServicesForStores {
     return region.getWAL();
   }
 
-  public ThreadPoolExecutor getInMemoryCompactionPool() { return INMEMORY_COMPACTION_POOL; }
+  ThreadPoolExecutor getInMemoryCompactionPool() {
+    if (rsServices != null) {
+      return rsServices.getExecutorService().getExecutorLazily(ExecutorType.RS_IN_MEMORY_COMPACTION,
+        inMemoryPoolSize);
+    } else {
+      return null;
+    }
+  }
 
   public long getMemStoreFlushSize() {
     return region.getMemStoreFlushSize();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index 3d5a8ff..501d5cd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -25,6 +25,9 @@ import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
@@ -56,6 +59,7 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -111,8 +115,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
         new HRegionInfo(TableName.valueOf("foobar"), null, null, false);
     WAL wal = hbaseUtility.createWal(conf, hbaseUtility.getDataTestDir(), info);
     this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, htd, wal, true);
-    //this.region = hbaseUtility.createTestRegion("foobar", hcd);
-    this.regionServicesForStores = region.getRegionServicesForStores();
+    this.regionServicesForStores = Mockito.spy(region.getRegionServicesForStores());
+    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
+    Mockito.when(regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
     this.store = new HStore(region, hcd, conf);
 
     long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index 7433769..786334e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -44,6 +44,7 @@ import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -220,6 +221,9 @@ public class TestHStore {
     WALFactory wals = new WALFactory(walConf, methodName);
     region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf,
         htd, null);
+    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
+    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
+    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
   }
 
   private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java
index 7aeff84..17118a8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java
@@ -122,6 +122,7 @@ public class TestRecoveredEditsReplayAndAbort {
     Mockito.when(rs.getNonceManager()).thenReturn(null);
     Mockito.when(rs.getServerName()).thenReturn(ServerName
         .valueOf("test", 0, 111));
+    Mockito.when(rs.getConfiguration()).thenReturn(CONF);
     //create a region
     TableName testTable = TableName.valueOf("testRecoveredEidtsReplayAndAbort");
     TableDescriptor htd = TableDescriptorBuilder.newBuilder(testTable)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
index 15bf2a4..edd8382 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
@@ -18,19 +18,17 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -40,6 +38,7 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
 
 /**
  * This test verifies the correctness of the Per Column Family flushing strategy
@@ -67,14 +66,14 @@ public class TestWalAndCompactingMemStoreFlush {
   private Configuration conf;
 
   private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException {
-    int i=0;
+    int i = 0;
     HTableDescriptor htd = new HTableDescriptor(TABLENAME);
     for (byte[] family : FAMILIES) {
       HColumnDescriptor hcd = new HColumnDescriptor(family);
       // even column families are going to have compacted memstore
-      if(i%2 == 0) {
-        hcd.setInMemoryCompaction(MemoryCompactionPolicy.valueOf(
-            conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY)));
+      if (i % 2 == 0) {
+        hcd.setInMemoryCompaction(MemoryCompactionPolicy
+            .valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY)));
       } else {
         hcd.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
       }
@@ -84,7 +83,12 @@ public class TestWalAndCompactingMemStoreFlush {
 
     HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false);
     Path path = new Path(DIR, callingMethod);
-    return HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd);
+    HRegion region = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd, false);
+    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
+    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
+    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
+    region.initialize(null);
+    return region;
   }
 
   // A helper function to create puts.
@@ -109,31 +113,12 @@ public class TestWalAndCompactingMemStoreFlush {
     return p;
   }
 
-  // A helper function to create gets.
-  private Get createGet(int familyNum, int putNum) {
-    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
-    return new Get(row);
-  }
-
   private void verifyInMemoryFlushSize(Region region) {
     assertEquals(
       ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).getInmemoryFlushSize(),
       ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).getInmemoryFlushSize());
   }
 
-  // A helper function to verify edits.
-  void verifyEdit(int familyNum, int putNum, Table table) throws IOException {
-    Result r = table.get(createGet(familyNum, putNum));
-    byte[] family = FAMILIES[familyNum - 1];
-    byte[] qf = Bytes.toBytes("q" + familyNum);
-    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
-    assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family));
-    assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum),
-      r.getFamilyMap(family).get(qf));
-    assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum),
-      Arrays.equals(r.getFamilyMap(family).get(qf), val));
-  }
-
   @Before
   public void setup() {
     conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index 3f9040b..f2fd591 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -681,6 +681,7 @@ public abstract class AbstractTestWALReplay {
     RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
     Mockito.doReturn(false).when(rsServices).isAborted();
     when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
+    when(rsServices.getConfiguration()).thenReturn(conf);
     Configuration customConf = new Configuration(this.conf);
     customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
         CustomStoreFlusher.class.getName());