You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by to...@apache.org on 2020/08/05 09:49:13 UTC
[hbase] 06/09: fixed missing locks and thread pool in ConnectionImpl
This is an automated email from the ASF dual-hosted git repository.
toffer pushed a commit to branch HBASE-11288.branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit df0b07dfc56bb15a6b66a4b3e97e9b3835aa5607
Author: Francis Liu <to...@apache.org>
AuthorDate: Tue Aug 4 03:17:40 2020 -0700
fixed missing locks and thread pool in ConnectionImpl
---
.../hbase/client/ConnectionImplementation.java | 38 +++++++++++++++++++---
1 file changed, 33 insertions(+), 5 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 4d15bf9..21cedfe 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -182,7 +182,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
// package protected for the tests
ClusterStatusListener clusterStatusListener;
- private final Object metaRegionLock = new Object();
+ private final Object rootRegionLock = new Object();
private final Object masterLock = new Object();
@@ -191,6 +191,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
private volatile ThreadPoolExecutor batchPool = null;
// meta thread executor shared by all Table instances created
// by this connection
+ private volatile ThreadPoolExecutor rootLookupPool = null;
private volatile ThreadPoolExecutor metaLookupPool = null;
private volatile boolean cleanupPool = false;
@@ -229,6 +230,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
/** lock guards against multiple threads trying to query the meta region at the same time */
private final ReentrantLock userRegionLock = new ReentrantLock();
+ private final ReentrantLock metaRegionLock = new ReentrantLock();
+
private ChoreService authService;
@@ -501,6 +504,25 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return tpe;
}
+ private ThreadPoolExecutor getRootLookupPool() {
+ if (this.rootLookupPool == null) {
+ synchronized (this) {
+ if (this.rootLookupPool == null) {
+ //Some of the threads would be used for root replicas
+ //To start with, threads.max.core threads can hit the root (including replicas).
+ //After that, requests will get queued up in the passed queue, and only after
+ //the queue is full, a new thread will be started
+ int threads = conf.getInt("hbase.hconnection.root.lookup.threads.max", 128);
+ this.rootLookupPool = getThreadPool(
+ threads,
+ threads,
+ "-rootLookup-shared-", new LinkedBlockingQueue<>());
+ }
+ }
+ }
+ return this.rootLookupPool;
+ }
+
private ThreadPoolExecutor getMetaLookupPool() {
if (this.metaLookupPool == null) {
synchronized (this) {
@@ -801,7 +823,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
}
// only one thread should do the lookup.
- synchronized (metaRegionLock) {
+ synchronized (rootRegionLock) {
// Check the cache again for a hit in case some other thread made the
// same query while we were waiting on the lock.
if (useCache) {
@@ -872,7 +894,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
}
// Query the meta region
long pauseBase = this.pause;
- userRegionLock.lock();
+ ReentrantLock regionLock = userRegionLock;
+ if (parentTable.equals(TableName.ROOT_TABLE_NAME)) {
+ regionLock = metaRegionLock;
+ }
+ regionLock.lock();
try {
if (useCache) {// re-check cache after get lock
RegionLocations locations = getCachedLocation(tableName, row);
@@ -885,9 +911,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
RegionInfo.DEFAULT_REPLICA_ID);
}
s.resetMvccReadPoint();
+ ThreadPoolExecutor lookupPool = parentTable.equals(TableName.ROOT_TABLE_NAME) ?
+ getRootLookupPool() : getMetaLookupPool();
try (ReversedClientScanner rcs =
new ReversedClientScanner(conf, s, parentTable, this, rpcCallerFactory,
- rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSecond)) {
+ rpcControllerFactory, lookupPool, metaReplicaCallTimeoutScanInMicroSecond)) {
boolean tableNotFound = true;
for (;;) {
Result regionInfoRow = rcs.next();
@@ -966,7 +994,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
relocateParent =
!(e instanceof RegionOfflineException || e instanceof NoServerForRegionException);
} finally {
- userRegionLock.unlock();
+ regionLock.unlock();
}
try{
Thread.sleep(ConnectionUtils.getPauseTime(pauseBase, tries));