You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2011/04/18 22:58:43 UTC

svn commit: r1094741 - in /hbase/trunk: CHANGES.txt src/main/java/org/apache/hadoop/hbase/client/HTable.java src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java

Author: jdcryans
Date: Mon Apr 18 20:58:42 2011
New Revision: 1094741

URL: http://svn.apache.org/viewvc?rev=1094741&view=rev
Log:
HBASE-3767  Improve how HTable handles threads used for multi actions

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1094741&r1=1094740&r2=1094741&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Mon Apr 18 20:58:42 2011
@@ -231,6 +231,7 @@ Release 0.90.3 - Unreleased
   IMPROVEMENTS
    HBASE-3747  ReplicationSource should differanciate remote and local exceptions
    HBASE-3652  Speed up tests by lowering some sleeps
+   HBASE-3767  Improve how HTable handles threads used for multi actions
 
   TASKS
    HBASE-3748  Add rolling of thrift/rest daemons to graceful_stop.sh script

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1094741&r1=1094740&r2=1094741&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java Mon Apr 18 20:58:42 2011
@@ -31,7 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -58,8 +58,6 @@ import org.apache.hadoop.hbase.ipc.ExecR
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.zookeeper.KeeperException;
 
 /**
  * Used to communicate with a single HBase table.
@@ -184,18 +182,18 @@ public class HTable implements HTableInt
       HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
     this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
 
-    int nrThreads = conf.getInt("hbase.htable.threads.max", getCurrentNrHRS());
-    if (nrThreads == 0) {
-      nrThreads = 1; // is there a better default?
+    int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
+    if (maxThreads == 0) {
+      maxThreads = 1; // is there a better default?
     }
 
-    // Unfortunately Executors.newCachedThreadPool does not allow us to
-    // set the maximum size of the pool, so we have to do it ourselves.
-    // Must also set set corethreadpool size as with a LinkedBlockingQueue,
-    // a new thread will not be started until the queue is full
-    this.pool = new ThreadPoolExecutor(nrThreads, nrThreads,
+    // Using the "direct handoff" approach, new threads will only be created
+    // if it is necessary and will grow unbounded. This could be bad but in HCM
+    // we only create as many Runnables as there are region servers. It means
+    // it also scales when new region servers are added.
+    this.pool = new ThreadPoolExecutor(1, maxThreads,
         60, TimeUnit.SECONDS,
-        new LinkedBlockingQueue<Runnable>(),
+        new SynchronousQueue<Runnable>(),
         new DaemonThreadFactory());
     ((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true);
   }
@@ -205,21 +203,6 @@ public class HTable implements HTableInt
   }
 
   /**
-   * @return the number of region servers that are currently running
-   * @throws IOException if a remote or network exception occurs
-   */
-  public int getCurrentNrHRS() throws IOException {
-    try {
-      // We go to zk rather than to master to get count of regions to avoid
-      // HTable having a Master dependency.  See HBase-2828
-      return ZKUtil.getNumberOfChildren(this.connection.getZooKeeperWatcher(),
-          this.connection.getZooKeeperWatcher().rsZNode);
-    } catch (KeeperException ke) {
-      throw new IOException("Unexpected ZooKeeper exception", ke);
-    }
-  }
-
-  /**
    * Tells whether or not a table is enabled or not.
    * @param tableName Name of table to check.
    * @return {@code true} if table is online.
@@ -1258,6 +1241,14 @@ public class HTable implements HTableInt
     }
   }
 
+  /**
+   * The pool is used for mutli requests for this HTable
+   * @return the pool used for mutli
+   */
+  ExecutorService getPool() {
+    return this.pool;
+  }
+
   static class DaemonThreadFactory implements ThreadFactory {
     static final AtomicInteger poolNumber = new AtomicInteger(1);
         final ThreadGroup group;

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1094741&r1=1094740&r2=1094741&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Mon Apr 18 20:58:42 2011
@@ -34,9 +34,12 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.UUID;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -512,7 +515,7 @@ public class TestFromClientSide {
     assertEquals(count, 10);
     scanner.close();
   }
-  
+
   /**
    * Test simple table and non-existent row cases.
    */
@@ -3970,5 +3973,65 @@ public class TestFromClientSide {
       assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], 2*(i+1));
     }
   }
+
+  /**
+   * This test demonstrates how we use ThreadPoolExecutor.
+   * It needs to show that we only use as many threads in the pool as we have
+   * region servers. To do this, instead of doing real requests, we use a
+   * SynchronousQueue where each put must wait for a take (and vice versa)
+   * so that way we have full control of the number of active threads.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testPoolBehavior() throws IOException, InterruptedException {
+    byte[] someBytes = Bytes.toBytes("pool");
+    HTable table = TEST_UTIL.createTable(someBytes, someBytes);
+    ThreadPoolExecutor pool = (ThreadPoolExecutor)table.getPool();
+
+    // Make sure that the TPE stars with a core pool size of one and 0
+    // initialized worker threads
+    assertEquals(1, pool.getCorePoolSize());
+    assertEquals(0, pool.getPoolSize());
+
+    // Build a SynchronousQueue that we use for thread coordination
+    final SynchronousQueue<Object> queue = new SynchronousQueue<Object>();
+    List<Thread> threads = new ArrayList<Thread>(5);
+    for (int i = 0; i < 5; i++) {
+      threads.add(new Thread() {
+        public void run() {
+          try {
+            // The thread blocks here until we decide to let it go
+            queue.take();
+          } catch (InterruptedException ie) { }
+        }
+      });
+    }
+    // First, add two threads and make sure the pool size follows
+    pool.submit(threads.get(0));
+    assertEquals(1, pool.getPoolSize());
+    pool.submit(threads.get(1));
+    assertEquals(2, pool.getPoolSize());
+
+    // Next, terminate those threads and then make sure the pool is still the
+    // same size
+    queue.put(new Object());
+    threads.get(0).join();
+    queue.put(new Object());
+    threads.get(1).join();
+    assertEquals(2, pool.getPoolSize());
+
+    // Now let's simulate adding a RS meaning that we'll go up to three
+    // concurrent threads. The pool should not grow larger than three.
+    pool.submit(threads.get(2));
+    pool.submit(threads.get(3));
+    pool.submit(threads.get(4));
+    assertEquals(3, pool.getPoolSize());
+    queue.put(new Object());
+    queue.put(new Object());
+    queue.put(new Object());
+  }
+
+
 }