You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by to...@apache.org on 2020/08/05 09:49:13 UTC

[hbase] 06/09: fixed missing locks and thread pool in ConnectionImpl

This is an automated email from the ASF dual-hosted git repository.

toffer pushed a commit to branch HBASE-11288.branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit df0b07dfc56bb15a6b66a4b3e97e9b3835aa5607
Author: Francis Liu <to...@apache.org>
AuthorDate: Tue Aug 4 03:17:40 2020 -0700

    fixed missing locks and thread pool in ConnectionImpl
---
 .../hbase/client/ConnectionImplementation.java     | 38 +++++++++++++++++++---
 1 file changed, 33 insertions(+), 5 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 4d15bf9..21cedfe 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -182,7 +182,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
   // package protected for the tests
   ClusterStatusListener clusterStatusListener;
 
-  private final Object metaRegionLock = new Object();
+  private final Object rootRegionLock = new Object();
 
   private final Object masterLock = new Object();
 
@@ -191,6 +191,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
   private volatile ThreadPoolExecutor batchPool = null;
   // meta thread executor shared by all Table instances created
   // by this connection
+  private volatile ThreadPoolExecutor rootLookupPool = null;
   private volatile ThreadPoolExecutor metaLookupPool = null;
   private volatile boolean cleanupPool = false;
 
@@ -229,6 +230,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
 
   /** lock guards against multiple threads trying to query the meta region at the same time */
   private final ReentrantLock userRegionLock = new ReentrantLock();
+  private final ReentrantLock metaRegionLock = new ReentrantLock();
+
 
   private ChoreService authService;
 
@@ -501,6 +504,25 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     return tpe;
   }
 
+  private ThreadPoolExecutor getRootLookupPool() {
+    if (this.rootLookupPool == null) {
+      synchronized (this) {
+        if (this.rootLookupPool == null) {
+          //Some of the threads would be used for root replicas
+          //To start with, threads.max.core threads can hit the root (including replicas).
+          //After that, requests will get queued up in the passed queue, and only after
+          //the queue is full, a new thread will be started
+          int threads = conf.getInt("hbase.hconnection.root.lookup.threads.max", 128);
+          this.rootLookupPool = getThreadPool(
+            threads,
+            threads,
+            "-rootLookup-shared-", new LinkedBlockingQueue<>());
+        }
+      }
+    }
+    return this.rootLookupPool;
+  }
+
   private ThreadPoolExecutor getMetaLookupPool() {
     if (this.metaLookupPool == null) {
       synchronized (this) {
@@ -801,7 +823,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     }
 
     // only one thread should do the lookup.
-    synchronized (metaRegionLock) {
+    synchronized (rootRegionLock) {
       // Check the cache again for a hit in case some other thread made the
       // same query while we were waiting on the lock.
       if (useCache) {
@@ -872,7 +894,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
       }
       // Query the meta region
       long pauseBase = this.pause;
-      userRegionLock.lock();
+      ReentrantLock regionLock = userRegionLock;
+      if (parentTable.equals(TableName.ROOT_TABLE_NAME)) {
+        regionLock = metaRegionLock;
+      }
+      regionLock.lock();
       try {
         if (useCache) {// re-check cache after get lock
           RegionLocations locations = getCachedLocation(tableName, row);
@@ -885,9 +911,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
             RegionInfo.DEFAULT_REPLICA_ID);
         }
         s.resetMvccReadPoint();
+        ThreadPoolExecutor lookupPool = parentTable.equals(TableName.ROOT_TABLE_NAME) ?
+          getRootLookupPool() : getMetaLookupPool();
         try (ReversedClientScanner rcs =
           new ReversedClientScanner(conf, s, parentTable, this, rpcCallerFactory,
-            rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSecond)) {
+            rpcControllerFactory, lookupPool, metaReplicaCallTimeoutScanInMicroSecond)) {
           boolean tableNotFound = true;
           for (;;) {
             Result regionInfoRow = rcs.next();
@@ -966,7 +994,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
         relocateParent =
           !(e instanceof RegionOfflineException || e instanceof NoServerForRegionException);
       } finally {
-        userRegionLock.unlock();
+        regionLock.unlock();
       }
       try{
         Thread.sleep(ConnectionUtils.getPauseTime(pauseBase, tries));