You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2017/08/07 02:41:33 UTC
hbase git commit: HBASE-18485 Performance issue:
ClientAsyncPrefetchScanner is slower than ClientSimpleScanner
Repository: hbase
Updated Branches:
refs/heads/master 2a717459b -> 5915d73a7
HBASE-18485 Performance issue: ClientAsyncPrefetchScanner is slower than ClientSimpleScanner
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5915d73a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5915d73a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5915d73a
Branch: refs/heads/master
Commit: 5915d73a70eb69adc639062f372559c9fc5130be
Parents: 2a71745
Author: Guanghao Zhang <zg...@apache.org>
Authored: Fri Aug 4 23:10:18 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Mon Aug 7 10:35:19 2017 +0800
----------------------------------------------------------------------
.../client/ClientAsyncPrefetchScanner.java | 181 ++++++-------------
.../hadoop/hbase/client/ClientScanner.java | 6 +-
.../client/TestScannersFromClientSide.java | 1 +
3 files changed, 64 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/5915d73a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
index 34c5620..e8da18f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
@@ -18,15 +18,20 @@
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
@@ -48,25 +53,18 @@ import org.apache.hadoop.hbase.util.Threads;
@InterfaceAudience.Private
public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
- private static final int ESTIMATED_SINGLE_RESULT_SIZE = 1024;
- private static final int DEFAULT_QUEUE_CAPACITY = 1024;
-
- private int cacheCapacity;
+ private long maxCacheSize;
private AtomicLong cacheSizeInBytes;
// exception queue (from prefetch to main scan execution)
private Queue<Exception> exceptionsQueue;
- // prefetch runnable object to be executed asynchronously
- private PrefetchRunnable prefetchRunnable;
- // Boolean flag to ensure only a single prefetch is running (per scan)
- // We use atomic boolean to allow multiple concurrent threads to
- // consume records from the same cache, but still have a single prefetcher thread.
- // For a single consumer thread this can be replace with a native boolean.
- private AtomicBoolean prefetchRunning;
- // an attribute for synchronizing close between scanner and prefetch threads
- private AtomicLong closingThreadId;
+ // prefetch thread to be executed asynchronously
+ private Thread prefetcher;
// used for testing
private Consumer<Boolean> prefetchListener;
- private static final int NO_THREAD = -1;
+
+ private final Lock lock = new ReentrantLock();
+ private final Condition notEmpty = lock.newCondition();
+ private final Condition notFull = lock.newCondition();
public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name,
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
@@ -84,82 +82,56 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
@Override
protected void initCache() {
// concurrent cache
- cacheCapacity = calcCacheCapacity();
+ maxCacheSize = resultSize2CacheSize(maxScannerResultSize);
cache = new LinkedBlockingQueue<>();
cacheSizeInBytes = new AtomicLong(0);
exceptionsQueue = new ConcurrentLinkedQueue<>();
- prefetchRunnable = new PrefetchRunnable();
- prefetchRunning = new AtomicBoolean(false);
- closingThreadId = new AtomicLong(NO_THREAD);
+ prefetcher = new Thread(new PrefetchRunnable());
+ Threads.setDaemonThreadRunning(prefetcher, tableName + ".asyncPrefetcher");
+ }
+
+ private long resultSize2CacheSize(long maxResultSize) {
+ // * 2 if possible
+ return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2;
}
@Override
public Result next() throws IOException {
-
try {
- boolean hasExecutedPrefetch = false;
- do {
+ lock.lock();
+ while (cache.isEmpty()) {
handleException();
-
- // If the scanner is closed and there's nothing left in the cache, next is a no-op.
- if (getCacheCount() == 0 && this.closed) {
+ if (this.closed) {
return null;
}
-
- if (prefetchCondition()) {
- // run prefetch in the background only if no prefetch is already running
- if (!isPrefetchRunning()) {
- if (prefetchRunning.compareAndSet(false, true)) {
- getPool().execute(prefetchRunnable);
- hasExecutedPrefetch = true;
- }
- }
- }
-
- while (isPrefetchRunning()) {
- // prefetch running or still pending
- if (getCacheCount() > 0) {
- return pollCache();
- } else {
- // (busy) wait for a record - sleep
- Threads.sleep(1);
- }
+ try {
+ notEmpty.await();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException("Interrupted when wait to load cache");
}
+ }
- if (getCacheCount() > 0) {
- return pollCache();
- }
- } while (!hasExecutedPrefetch);
-
- // if we exhausted this scanner before calling close, write out the scan metrics
- writeScanMetrics();
- return null;
+ Result result = pollCache();
+ if (prefetchCondition()) {
+ notFull.signalAll();
+ }
+ return result;
} finally {
+ lock.unlock();
handleException();
}
}
@Override
public void close() {
- if (!scanMetricsPublished) writeScanMetrics();
- closed = true;
- if (!isPrefetchRunning()) {
- if(closingThreadId.compareAndSet(NO_THREAD, Thread.currentThread().getId())) {
- super.close();
- }
- } // else do nothing since the async prefetch still needs this resources
- }
-
- @Override
- public int getCacheCount() {
- if(cache != null) {
- int size = cache.size();
- if(size > cacheCapacity) {
- cacheCapacity = size;
- }
- return size;
- } else {
- return 0;
+ try {
+ lock.lock();
+ super.close();
+ closed = true;
+ notFull.signalAll();
+ notEmpty.signalAll();
+ } finally {
+ lock.unlock();
}
}
@@ -182,44 +154,8 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
}
}
- private boolean isPrefetchRunning() {
- return prefetchRunning.get();
- }
-
- // double buffer - double cache size
- private int calcCacheCapacity() {
- int capacity = Integer.MAX_VALUE;
- if(caching > 0 && caching < (Integer.MAX_VALUE /2)) {
- capacity = caching * 2 + 1;
- }
- if(capacity == Integer.MAX_VALUE){
- if(maxScannerResultSize != Integer.MAX_VALUE) {
- capacity = (int) (maxScannerResultSize / ESTIMATED_SINGLE_RESULT_SIZE);
- }
- else {
- capacity = DEFAULT_QUEUE_CAPACITY;
- }
- }
- return Math.max(capacity, 1);
- }
-
private boolean prefetchCondition() {
- return
- (getCacheCount() < getCountThreshold()) &&
- (maxScannerResultSize == Long.MAX_VALUE ||
- getCacheSizeInBytes() < getSizeThreshold()) ;
- }
-
- private int getCountThreshold() {
- return Math.max(cacheCapacity / 2, 1);
- }
-
- private long getSizeThreshold() {
- return Math.max(maxScannerResultSize / 2, 1);
- }
-
- private long getCacheSizeInBytes() {
- return cacheSizeInBytes.get();
+ return cacheSizeInBytes.get() < maxCacheSize / 2;
}
private Result pollCache() {
@@ -233,21 +169,22 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
@Override
public void run() {
- boolean succeed = false;
- try {
- loadCache();
- succeed = true;
- } catch (Exception e) {
- exceptionsQueue.add(e);
- } finally {
- if (prefetchListener != null) {
- prefetchListener.accept(succeed);
- }
- prefetchRunning.set(false);
- if(closed) {
- if (closingThreadId.compareAndSet(NO_THREAD, Thread.currentThread().getId())) {
- // close was waiting for the prefetch to end
- close();
+ while (!closed) {
+ boolean succeed = false;
+ try {
+ lock.lock();
+ while (!prefetchCondition()) {
+ notFull.await();
+ }
+ loadCache();
+ succeed = true;
+ } catch (Exception e) {
+ exceptionsQueue.add(e);
+ } finally {
+ notEmpty.signalAll();
+ lock.unlock();
+ if (prefetchListener != null) {
+ prefetchListener.accept(succeed);
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5915d73a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index d3b19e4..2522434 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -72,7 +72,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
protected Result lastResult = null;
protected final long maxScannerResultSize;
private final ClusterConnection connection;
- private final TableName tableName;
+ protected final TableName tableName;
protected final int scannerTimeout;
protected boolean scanMetricsPublished = false;
protected RpcRetryingCaller<Result[]> caller;
@@ -412,6 +412,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
// This is possible if we just stopped at the boundary of a region in the previous call.
if (callable == null) {
if (!moveToNextRegion()) {
+ closed = true;
return;
}
}
@@ -478,7 +479,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
assert newLimit >= 0;
scan.setLimit(newLimit);
}
- if (scanExhausted(values)) {
+ if (scan.getLimit() == 0 || scanExhausted(values)) {
closeScanner();
closed = true;
break;
@@ -532,6 +533,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
// we are done with the current region
if (regionExhausted) {
if (!moveToNextRegion()) {
+ closed = true;
break;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5915d73a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
index ef00b24..43be573 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
@@ -754,6 +754,7 @@ public class TestScannersFromClientSide {
result = Result.create(kvListScan);
verifyResult(result, kvListExp, toLog, "Testing async scan");
}
+
TEST_UTIL.deleteTable(table);
}