You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/03/16 22:15:37 UTC

hbase git commit: HBASE-17792 Use a shared thread pool for AtomicityWriter, AtomicGetReader, AtomicScanReader's connections in TestAcidGuarantees (Huaxiang Sun)

Repository: hbase
Updated Branches:
  refs/heads/master 6fb44f7eb -> 7c19490ba


HBASE-17792 Use a shared thread pool for AtomicityWriter, AtomicGetReader, AtomicScanReader's connections in TestAcidGuarantees (Huaxiang Sun)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7c19490b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7c19490b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7c19490b

Branch: refs/heads/master
Commit: 7c19490bac854c4da6457b1edadb1e244924fa3d
Parents: 6fb44f7
Author: Michael Stack <st...@apache.org>
Authored: Thu Mar 16 15:15:28 2017 -0700
Committer: Michael Stack <st...@apache.org>
Committed: Thu Mar 16 15:15:28 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/TestAcidGuarantees.java | 51 ++++++++++++++++----
 1 file changed, 42 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7c19490b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
index 569ca89..15250ac 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
@@ -21,6 +21,11 @@ package org.apache.hadoop.hbase;
 import java.io.IOException;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
@@ -43,6 +48,7 @@ import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
 import org.apache.hadoop.hbase.testclassification.FlakeyTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -85,6 +91,7 @@ public class TestAcidGuarantees implements Tool {
 
   // when run as main
   private Configuration conf;
+  private ExecutorService sharedPool = null;
 
   private void createTableIfMissing(boolean useMob)
     throws IOException {
@@ -117,12 +124,38 @@ public class TestAcidGuarantees implements Tool {
       conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.9);
     }
     util = new HBaseTestingUtility(conf);
+    sharedPool = createThreadPool();
   }
 
   public void setHBaseTestingUtil(HBaseTestingUtility util) {
     this.util = util;
   }
 
+  private ExecutorService createThreadPool() {
+
+    int maxThreads = 256;
+    int coreThreads = 128;
+
+    long keepAliveTime = 60;
+    BlockingQueue<Runnable> workQueue =
+          new LinkedBlockingQueue<Runnable>(maxThreads *
+              HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
+
+    ThreadPoolExecutor tpe = new ThreadPoolExecutor(
+        coreThreads,
+        maxThreads,
+        keepAliveTime,
+        TimeUnit.SECONDS,
+        workQueue,
+        Threads.newDaemonThreadFactory(toString() + "-shared"));
+    tpe.allowCoreThreadTimeOut(true);
+    return tpe;
+  }
+
+  public ExecutorService getSharedThreadPool() {
+    return sharedPool;
+  }
+
   /**
    * Thread that does random full-row writes into a table.
    */
@@ -136,11 +169,11 @@ public class TestAcidGuarantees implements Tool {
     AtomicLong numWritten = new AtomicLong();
 
     public AtomicityWriter(TestContext ctx, byte targetRows[][],
-                           byte targetFamilies[][]) throws IOException {
+                           byte targetFamilies[][], ExecutorService pool) throws IOException {
       super(ctx);
       this.targetRows = targetRows;
       this.targetFamilies = targetFamilies;
-      connection = ConnectionFactory.createConnection(ctx.getConf());
+      connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
       table = connection.getTable(TABLE_NAME);
     }
     public void doAnAction() throws Exception {
@@ -182,11 +215,11 @@ public class TestAcidGuarantees implements Tool {
     AtomicLong numRead = new AtomicLong();
 
     public AtomicGetReader(TestContext ctx, byte targetRow[],
-                           byte targetFamilies[][]) throws IOException {
+                           byte targetFamilies[][], ExecutorService pool) throws IOException {
       super(ctx);
       this.targetRow = targetRow;
       this.targetFamilies = targetFamilies;
-      connection = ConnectionFactory.createConnection(ctx.getConf());
+      connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
       table = connection.getTable(TABLE_NAME);
     }
 
@@ -251,10 +284,10 @@ public class TestAcidGuarantees implements Tool {
     AtomicLong numRowsScanned = new AtomicLong();
 
     public AtomicScanReader(TestContext ctx,
-                           byte targetFamilies[][]) throws IOException {
+                           byte targetFamilies[][], ExecutorService pool) throws IOException {
       super(ctx);
       this.targetFamilies = targetFamilies;
-      connection = ConnectionFactory.createConnection(ctx.getConf());
+      connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
       table = connection.getTable(TABLE_NAME);
     }
 
@@ -344,7 +377,7 @@ public class TestAcidGuarantees implements Tool {
     List<AtomicityWriter> writers = Lists.newArrayList();
     for (int i = 0; i < numWriters; i++) {
       AtomicityWriter writer = new AtomicityWriter(
-          ctx, rows, FAMILIES);
+          ctx, rows, FAMILIES, getSharedThreadPool());
       writers.add(writer);
       ctx.addThread(writer);
     }
@@ -372,14 +405,14 @@ public class TestAcidGuarantees implements Tool {
     List<AtomicGetReader> getters = Lists.newArrayList();
     for (int i = 0; i < numGetters; i++) {
       AtomicGetReader getter = new AtomicGetReader(
-          ctx, rows[i % numUniqueRows], FAMILIES);
+          ctx, rows[i % numUniqueRows], FAMILIES, getSharedThreadPool());
       getters.add(getter);
       ctx.addThread(getter);
     }
 
     List<AtomicScanReader> scanners = Lists.newArrayList();
     for (int i = 0; i < numScanners; i++) {
-      AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES);
+      AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES, getSharedThreadPool());
       scanners.add(scanner);
       ctx.addThread(scanner);
     }