You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/03/20 12:57:32 UTC
hbase git commit: HBASE-17691 Add ScanMetrics support for async scan
Repository: hbase
Updated Branches:
refs/heads/master 7c03a213f -> 5b4bb8217
HBASE-17691 Add ScanMetrics support for async scan
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5b4bb821
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5b4bb821
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5b4bb821
Branch: refs/heads/master
Commit: 5b4bb8217dd4327a89fa29c93ac37bc887d96c2c
Parents: 7c03a21
Author: zhangduo <zh...@apache.org>
Authored: Mon Mar 20 17:12:53 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Mon Mar 20 20:54:04 2017 +0800
----------------------------------------------------------------------
.../hadoop/hbase/client/AsyncClientScanner.java | 34 +++-
.../client/AsyncRpcRetryingCallerFactory.java | 24 ++-
.../AsyncScanSingleRegionRpcRetryingCaller.java | 35 ++--
.../hadoop/hbase/client/AsyncTableBase.java | 9 +-
.../hadoop/hbase/client/AsyncTableImpl.java | 1 +
.../hbase/client/AsyncTableResultScanner.java | 9 +-
.../hadoop/hbase/client/ClientScanner.java | 8 +-
.../hadoop/hbase/client/ConnectionUtils.java | 75 +++++++++
.../hbase/client/RawScanResultConsumer.java | 10 ++
.../hbase/client/ReversedScannerCallable.java | 10 +-
.../hadoop/hbase/client/ScanResultConsumer.java | 9 ++
.../hadoop/hbase/client/ScannerCallable.java | 88 ++--------
.../client/SimpleRawScanResultConsumer.java | 84 ++++++++++
.../hbase/client/SimpleScanResultConsumer.java | 75 +++++++++
.../hadoop/hbase/client/TestAsyncTableScan.java | 42 -----
.../hbase/client/TestAsyncTableScanMetrics.java | 159 +++++++++++++++++++
.../hbase/client/TestRawAsyncTableScan.java | 52 ------
17 files changed, 526 insertions(+), 198 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index fa7aa81..2c1693d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -19,8 +19,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
@@ -29,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
@@ -51,6 +51,8 @@ class AsyncClientScanner {
// AsyncScanSingleRegionRpcRetryingCaller will modify this scan object directly.
private final Scan scan;
+ private final ScanMetrics scanMetrics;
+
private final RawScanResultConsumer consumer;
private final TableName tableName;
@@ -88,29 +90,46 @@ class AsyncClientScanner {
this.rpcTimeoutNs = rpcTimeoutNs;
this.startLogErrorsCnt = startLogErrorsCnt;
this.resultCache = createScanResultCache(scan);
+ if (scan.isScanMetricsEnabled()) {
+ this.scanMetrics = new ScanMetrics();
+ consumer.onScanMetricsCreated(scanMetrics);
+ } else {
+ this.scanMetrics = null;
+ }
}
private static final class OpenScannerResponse {
public final HRegionLocation loc;
+ public final boolean isRegionServerRemote;
+
public final ClientService.Interface stub;
public final HBaseRpcController controller;
public final ScanResponse resp;
- public OpenScannerResponse(HRegionLocation loc, Interface stub, HBaseRpcController controller,
- ScanResponse resp) {
+ public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, Interface stub,
+ HBaseRpcController controller, ScanResponse resp) {
this.loc = loc;
+ this.isRegionServerRemote = isRegionServerRemote;
this.stub = stub;
this.controller = controller;
this.resp = resp;
}
}
+ private int openScannerTries;
+
private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub) {
+ boolean isRegionServerRemote = isRemote(loc.getHostname());
+ incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
+ if (openScannerTries > 1) {
+ incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
+ }
+ openScannerTries++;
CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
try {
ScanRequest request = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(),
@@ -120,7 +139,7 @@ class AsyncClientScanner {
future.completeExceptionally(controller.getFailed());
return;
}
- future.complete(new OpenScannerResponse(loc, stub, controller, resp));
+ future.complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp));
});
} catch (IOException e) {
future.completeExceptionally(e);
@@ -130,8 +149,9 @@ class AsyncClientScanner {
private void startScan(OpenScannerResponse resp) {
conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
+ .remote(resp.isRegionServerRemote)
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
- .setScan(scan).consumer(consumer).resultCache(resultCache)
+ .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
@@ -149,6 +169,8 @@ class AsyncClientScanner {
}
private void openScanner() {
+ incRegionCountMetrics(scanMetrics);
+ openScannerTries = 1;
conn.callerFactory.<OpenScannerResponse> single().table(tableName).row(scan.getStartRow())
.locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index 08f52fc..d71b428 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.client;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import io.netty.util.HashedWheelTimer;
@@ -31,10 +31,10 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
-import org.apache.hadoop.ipc.ProtobufRpcEngine.Server;
/**
* Factory to create an AsyncRpcRetryCaller.
@@ -148,6 +148,8 @@ class AsyncRpcRetryingCallerFactory {
private Scan scan;
+ private ScanMetrics scanMetrics;
+
private ScanResultCache resultCache;
private RawScanResultConsumer consumer;
@@ -156,6 +158,8 @@ class AsyncRpcRetryingCallerFactory {
private HRegionLocation loc;
+ private boolean isRegionServerRemote;
+
private long scannerLeaseTimeoutPeriodNs;
private long scanTimeoutNs;
@@ -172,6 +176,16 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
+ public ScanSingleRegionCallerBuilder metrics(ScanMetrics scanMetrics) {
+ this.scanMetrics = scanMetrics;
+ return this;
+ }
+
+ public ScanSingleRegionCallerBuilder remote(boolean isRegionServerRemote) {
+ this.isRegionServerRemote = isRegionServerRemote;
+ return this;
+ }
+
public ScanSingleRegionCallerBuilder resultCache(ScanResultCache resultCache) {
this.resultCache = resultCache;
return this;
@@ -226,11 +240,11 @@ class AsyncRpcRetryingCallerFactory {
public AsyncScanSingleRegionRpcRetryingCaller build() {
checkArgument(scannerId >= 0, "invalid scannerId %d", scannerId);
return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn,
- checkNotNull(scan, "scan is null"), scannerId,
+ checkNotNull(scan, "scan is null"), scanMetrics, scannerId,
checkNotNull(resultCache, "resultCache is null"),
checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
- checkNotNull(loc, "location is null"), scannerLeaseTimeoutPeriodNs, pauseNs, maxAttempts,
- scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+ checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs,
+ pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index dd843ed..7ed6f03 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.RawScanResultConsumer.ScanResumer;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
@@ -73,6 +74,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private final Scan scan;
+ private final ScanMetrics scanMetrics;
+
private final long scannerId;
private final ScanResultCache resultCache;
@@ -83,6 +86,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private final HRegionLocation loc;
+ private final boolean regionServerRemote;
+
private final long scannerLeaseTimeoutPeriodNs;
private final long pauseNs;
@@ -107,7 +112,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private long nextCallStartNs;
- private int tries = 1;
+ private int tries;
private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
@@ -279,17 +284,19 @@ class AsyncScanSingleRegionRpcRetryingCaller {
}
public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer,
- AsyncConnectionImpl conn, Scan scan, long scannerId, ScanResultCache resultCache,
- RawScanResultConsumer consumer, Interface stub, HRegionLocation loc,
- long scannerLeaseTimeoutPeriodNs, long pauseNs, int maxAttempts, long scanTimeoutNs,
- long rpcTimeoutNs, int startLogErrorsCnt) {
+ AsyncConnectionImpl conn, Scan scan, ScanMetrics scanMetrics, long scannerId,
+ ScanResultCache resultCache, RawScanResultConsumer consumer, Interface stub,
+ HRegionLocation loc, boolean isRegionServerRemote, long scannerLeaseTimeoutPeriodNs,
+ long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.scan = scan;
+ this.scanMetrics = scanMetrics;
this.scannerId = scannerId;
this.resultCache = resultCache;
this.consumer = consumer;
this.stub = stub;
this.loc = loc;
+ this.regionServerRemote = isRegionServerRemote;
this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
this.pauseNs = pauseNs;
this.maxAttempts = maxAttempts;
@@ -315,6 +322,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
}
private void closeScanner() {
+ incRPCCallsMetrics(scanMetrics, regionServerRemote);
resetController(controller, rpcTimeoutNs);
ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
stub.scan(controller, req, resp -> {
@@ -345,6 +353,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
}
private void completeWhenError(boolean closeScanner) {
+ incRPCRetriesMetrics(scanMetrics, closeScanner);
resultCache.clear();
if (closeScanner) {
closeScanner();
@@ -449,12 +458,14 @@ class AsyncScanSingleRegionRpcRetryingCaller {
onError(controller.getFailed());
return;
}
+ updateServerSideMetrics(scanMetrics, resp);
boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage();
Result[] results;
try {
+ Result[] rawResults = ResponseConverter.getResults(controller.cellScanner(), resp);
+ updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage);
results = resultCache.addAndGet(
- Optional.ofNullable(ResponseConverter.getResults(controller.cellScanner(), resp))
- .orElse(ScanResultCache.EMPTY_RESULT_ARRAY),
+ Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY),
isHeartbeatMessage);
} catch (IOException e) {
// We can not retry here. The server has responded normally and the call sequence has been
@@ -464,6 +475,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
completeWhenError(true);
return;
}
+
// calculate this before calling onNext as it is free for user to modify the result array in
// onNext.
int numberOfIndividualRows = numberOfIndividualRows(Arrays.asList(results));
@@ -510,6 +522,10 @@ class AsyncScanSingleRegionRpcRetryingCaller {
} else {
callTimeoutNs = 0L;
}
+ incRPCCallsMetrics(scanMetrics, regionServerRemote);
+ if (tries > 1) {
+ incRPCRetriesMetrics(scanMetrics, regionServerRemote);
+ }
resetController(controller, callTimeoutNs);
ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
nextCallSeq, false, false, scan.getLimit());
@@ -518,13 +534,14 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private void next() {
nextCallSeq++;
- tries = 0;
+ tries = 1;
exceptions.clear();
nextCallStartNs = System.nanoTime();
call();
}
private void renewLease() {
+ incRPCCallsMetrics(scanMetrics, regionServerRemote);
nextCallSeq++;
resetController(controller, rpcTimeoutNs);
ScanRequest req =
http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
index e201ab2..b5a251b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
@@ -322,7 +322,14 @@ public interface AsyncTableBase {
* If your result set is very large, you should use other scan method to get a scanner or use
* callback to process the results. They will do chunking to prevent OOM. The scanAll method will
* fetch all the results and store them in a List and then return the list to you.
- * @param scan A configured {@link Scan} object. SO if you use this method to fetch a really large
+ * <p>
+ * The scan metrics will be collected background if you enable it but you have no way to get it.
+ * Usually you can get scan metrics from {@code ResultScanner}, or through
+ * {@code ScanResultConsumer.onScanMetricsCreated} but this method only returns a list of results.
+ * So if you really care about scan metrics then you'd better use other scan methods which return
+ * a {@code ResultScanner} or let you pass in a {@code ScanResultConsumer}. There is no
+ * performance difference between these scan methods so do not worry.
+ * @param scan A configured {@link Scan} object. So if you use this method to fetch a really large
* result set, it is likely to cause OOM.
* @return The results of this small scan operation. The return value will be wrapped by a
* {@link CompletableFuture}.
http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index f1625ad..29c0698 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -162,6 +162,7 @@ class AsyncTableImpl implements AsyncTable {
private void scan0(Scan scan, ScanResultConsumer consumer) {
try (ResultScanner scanner = getScanner(scan)) {
+ consumer.onScanMetricsCreated(scanner.getScanMetrics());
for (Result result; (result = scanner.next()) != null;) {
if (!consumer.onNext(result)) {
break;
http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
index eef797c..b6823f9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
@@ -48,6 +48,8 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
private final Queue<Result> queue = new ArrayDeque<>();
+ private ScanMetrics scanMetrics;
+
private long cacheSize;
private boolean closed = false;
@@ -110,6 +112,11 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
notifyAll();
}
+ @Override
+ public void onScanMetricsCreated(ScanMetrics scanMetrics) {
+ this.scanMetrics = scanMetrics;
+ }
+
private void resumePrefetch() {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("0x%x", System.identityHashCode(this)) + " resume prefetching");
@@ -168,6 +175,6 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
@Override
public ScanMetrics getScanMetrics() {
- throw new UnsupportedOperationException();
+ return scanMetrics;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index a8b029f..8aa5c53 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.numberOfIndividualRows;
import com.google.common.annotations.VisibleForTesting;
@@ -250,9 +251,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
new ScannerCallableWithReplicas(getTable(), getConnection(), createScannerCallable(), pool,
primaryOperationTimeout, scan, getRetries(), scannerTimeout, caching, conf, caller);
this.callable.setCaching(this.caching);
- if (this.scanMetrics != null) {
- this.scanMetrics.countOfRegions.incrementAndGet();
- }
+ incRegionCountMetrics(scanMetrics);
return true;
}
@@ -460,7 +459,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
// Groom the array of Results that we received back from the server before adding that
// Results to the scanner's cache. If partial results are not allowed to be seen by the
// caller, all book keeping will be performed within this method.
- Result[] resultsToAddToCache = scanResultCache.addAndGet(values, callable.isHeartbeatMessage());
+ Result[] resultsToAddToCache =
+ scanResultCache.addAndGet(values, callable.isHeartbeatMessage());
if (resultsToAddToCache.length > 0) {
for (Result rs : resultsToAddToCache) {
cache.add(rs);
http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 3e7cd00..f54f552 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -47,16 +47,20 @@ import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.DNS;
/**
* Utility used by client connections.
@@ -424,4 +428,75 @@ public final class ConnectionUtils {
return new CompleteScanResultCache();
}
}
+
+ private static final String MY_ADDRESS = getMyAddress();
+
+ private static String getMyAddress() {
+ try {
+ return DNS.getDefaultHost("default", "default");
+ } catch (UnknownHostException uhe) {
+ LOG.error("cannot determine my address", uhe);
+ return null;
+ }
+ }
+
+ static boolean isRemote(String host) {
+ return !host.equalsIgnoreCase(MY_ADDRESS);
+ }
+
+ static void incRPCCallsMetrics(ScanMetrics scanMetrics, boolean isRegionServerRemote) {
+ if (scanMetrics == null) {
+ return;
+ }
+ scanMetrics.countOfRPCcalls.incrementAndGet();
+ if (isRegionServerRemote) {
+ scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
+ }
+ }
+
+ static void incRPCRetriesMetrics(ScanMetrics scanMetrics, boolean isRegionServerRemote) {
+ if (scanMetrics == null) {
+ return;
+ }
+ scanMetrics.countOfRPCRetries.incrementAndGet();
+ if (isRegionServerRemote) {
+ scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
+ }
+ }
+
+ static void updateResultsMetrics(ScanMetrics scanMetrics, Result[] rrs,
+ boolean isRegionServerRemote) {
+ if (scanMetrics == null || rrs == null || rrs.length == 0) {
+ return;
+ }
+ long resultSize = 0;
+ for (Result rr : rrs) {
+ for (Cell cell : rr.rawCells()) {
+ resultSize += CellUtil.estimatedSerializedSizeOf(cell);
+ }
+ }
+ scanMetrics.countOfBytesInResults.addAndGet(resultSize);
+ if (isRegionServerRemote) {
+ scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
+ }
+ }
+
+ /**
+ * Use the scan metrics returned by the server to add to the identically named counters in the
+ * client side metrics. If a counter does not exist with the same name as the server side metric,
+ * the attempt to increase the counter will fail.
+ */
+ static void updateServerSideMetrics(ScanMetrics scanMetrics, ScanResponse response) {
+ if (scanMetrics == null || response == null || !response.hasScanMetrics()) {
+ return;
+ }
+ ResponseConverter.getScanMetrics(response).forEach(scanMetrics::addToCounter);
+ }
+
+ static void incRegionCountMetrics(ScanMetrics scanMetrics) {
+ if (scanMetrics == null) {
+ return;
+ }
+ scanMetrics.countOfRegions.incrementAndGet();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
index 17e0afa..899c0bb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
/**
* Receives {@link Result} for an asynchronous scan.
@@ -112,4 +113,13 @@ public interface RawScanResultConsumer {
* Indicate that the scan operation is completed normally.
*/
void onComplete();
+
+ /**
+ * If {@code scan.isScanMetricsEnabled()} returns true, then this method will be called prior to
+ * all other methods in this interface to give you the {@link ScanMetrics} instance for this scan
+ * operation. The {@link ScanMetrics} instance will be updated on-the-fly during the scan, you can
+ * store it somewhere to get the metrics at any time if you want.
+ */
+ default void onScanMetricsCreated(ScanMetrics scanMetrics) {
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
index 1d46ab4..538fe30 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
@@ -18,7 +18,8 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createCloseRowBefore;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
import java.io.IOException;
@@ -113,11 +114,8 @@ public class ReversedScannerCallable extends ScannerCallable {
}
// check how often we retry.
- if (reload && this.scanMetrics != null) {
- this.scanMetrics.countOfRPCRetries.incrementAndGet();
- if (isRegionServerRemote) {
- this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
- }
+ if (reload) {
+ incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java
index 770a87f..03b1ba0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
/**
* Receives {@link Result} for an asynchronous scan.
@@ -45,4 +46,12 @@ public interface ScanResultConsumer {
*/
void onComplete();
+ /**
+ * If {@code scan.isScanMetricsEnabled()} returns true, then this method will be called prior to
+ * all other methods in this interface to give you the {@link ScanMetrics} instance for this scan
+ * operation. The {@link ScanMetrics} instance will be updated on-the-fly during the scan, you can
+ * store it somewhere to get the metrics at any time if you want.
+ */
+ default void onScanMetricsCreated(ScanMetrics scanMetrics) {
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 0682a7a..ffac566 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -18,17 +18,18 @@
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;
+
import java.io.IOException;
import java.io.InterruptedIOException;
-import java.net.UnknownHostException;
-import java.util.Map;
-import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -48,7 +49,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
-import org.apache.hadoop.net.DNS;
/**
* Scanner operations such as create, next, etc.
@@ -72,7 +72,6 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
protected ScanMetrics scanMetrics;
private boolean logScannerActivity = false;
private int logCutOffLatency = 1000;
- private static String myAddress;
protected final int id;
enum MoreResults {
@@ -87,13 +86,6 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
* Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()}
*/
protected boolean heartbeatMessage = false;
- static {
- try {
- myAddress = DNS.getDefaultHost("default", "default");
- } catch (UnknownHostException uhe) {
- LOG.error("cannot determine my address", uhe);
- }
- }
// indicate if it is a remote server call
protected boolean isRegionServerRemote = true;
@@ -158,30 +150,23 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
}
// check how often we retry.
- if (reload && this.scanMetrics != null) {
- this.scanMetrics.countOfRPCRetries.incrementAndGet();
- if (isRegionServerRemote) {
- this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
- }
+ if (reload) {
+ incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
}
}
/**
- * compare the local machine hostname with region server's hostname
- * to decide if hbase client connects to a remote region server
+ * compare the local machine hostname with region server's hostname to decide if hbase client
+ * connects to a remote region server
*/
protected void checkIfRegionServerIsRemote() {
- if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
- isRegionServerRemote = false;
- } else {
- isRegionServerRemote = true;
- }
+ isRegionServerRemote = isRemote(getLocation().getHostname());
}
private ScanResponse next() throws IOException {
// Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server
setHeartbeatMessage(false);
- incRPCcallsMetrics();
+ incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
ScanRequest request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
this.scanMetrics != null, renew, scan.getLimit());
try {
@@ -267,7 +252,7 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
+ scannerId);
}
}
- updateServerSideMetrics(response);
+ updateServerSideMetrics(scanMetrics, response);
// moreResults is only used for the case where a filter exhausts all elements
if (response.hasMoreResults()) {
if (response.getMoreResults()) {
@@ -289,7 +274,7 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
} else {
setMoreResultsInRegion(MoreResults.UNKNOWN);
}
- updateResultsMetrics(rrs);
+ updateResultsMetrics(scanMetrics, rrs, isRegionServerRemote);
return rrs;
}
@@ -307,53 +292,12 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
this.heartbeatMessage = heartbeatMessage;
}
- private void incRPCcallsMetrics() {
- if (this.scanMetrics == null) {
- return;
- }
- this.scanMetrics.countOfRPCcalls.incrementAndGet();
- if (isRegionServerRemote) {
- this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
- }
- }
-
- protected void updateResultsMetrics(Result[] rrs) {
- if (this.scanMetrics == null || rrs == null || rrs.length == 0) {
- return;
- }
- long resultSize = 0;
- for (Result rr : rrs) {
- for (Cell cell : rr.rawCells()) {
- resultSize += CellUtil.estimatedSerializedSizeOf(cell);
- }
- }
- this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
- if (isRegionServerRemote) {
- this.scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
- }
- }
-
- /**
- * Use the scan metrics returned by the server to add to the identically named counters in the
- * client side metrics. If a counter does not exist with the same name as the server side metric,
- * the attempt to increase the counter will fail.
- * @param response
- */
- private void updateServerSideMetrics(ScanResponse response) {
- if (this.scanMetrics == null || response == null || !response.hasScanMetrics()) return;
-
- Map<String, Long> serverMetrics = ResponseConverter.getScanMetrics(response);
- for (Entry<String, Long> entry : serverMetrics.entrySet()) {
- this.scanMetrics.addToCounter(entry.getKey(), entry.getValue());
- }
- }
-
private void close() {
if (this.scannerId == -1L) {
return;
}
try {
- incRPCcallsMetrics();
+ incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
ScanRequest request =
RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
try {
@@ -371,7 +315,7 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
}
private ScanResponse openScanner() throws IOException {
- incRPCcallsMetrics();
+ incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
ScanRequest request = RequestConverter.buildScanRequest(
getLocation().getRegionInfo().getRegionName(), this.scan, this.caching, false);
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleRawScanResultConsumer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleRawScanResultConsumer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleRawScanResultConsumer.java
new file mode 100644
index 0000000..026a21f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleRawScanResultConsumer.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.common.base.Throwables;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+
+class SimpleRawScanResultConsumer implements RawScanResultConsumer {
+
+ private ScanMetrics scanMetrics;
+
+ private final Queue<Result> queue = new ArrayDeque<>();
+
+ private boolean finished;
+
+ private Throwable error;
+
+ @Override
+ public void onScanMetricsCreated(ScanMetrics scanMetrics) {
+ this.scanMetrics = scanMetrics;
+ }
+
+ @Override
+ public synchronized void onNext(Result[] results, ScanController controller) {
+ for (Result result : results) {
+ queue.offer(result);
+ }
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void onError(Throwable error) {
+ finished = true;
+ this.error = error;
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void onComplete() {
+ finished = true;
+ notifyAll();
+ }
+
+ public synchronized Result take() throws IOException, InterruptedException {
+ for (;;) {
+ if (!queue.isEmpty()) {
+ return queue.poll();
+ }
+ if (finished) {
+ if (error != null) {
+ Throwables.propagateIfPossible(error, IOException.class);
+ throw new IOException(error);
+ } else {
+ return null;
+ }
+ }
+ wait();
+ }
+ }
+
+ public ScanMetrics getScanMetrics() {
+ return scanMetrics;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumer.java
new file mode 100644
index 0000000..168129d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumer.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.common.base.Throwables;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+
+final class SimpleScanResultConsumer implements ScanResultConsumer {
+
+ private ScanMetrics scanMetrics;
+
+ private final List<Result> results = new ArrayList<>();
+
+ private Throwable error;
+
+ private boolean finished = false;
+
+ @Override
+ public void onScanMetricsCreated(ScanMetrics scanMetrics) {
+ this.scanMetrics = scanMetrics;
+ }
+
+ @Override
+ public synchronized boolean onNext(Result result) {
+ results.add(result);
+ return true;
+ }
+
+ @Override
+ public synchronized void onError(Throwable error) {
+ this.error = error;
+ finished = true;
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void onComplete() {
+ finished = true;
+ notifyAll();
+ }
+
+ public synchronized List<Result> getAll() throws Exception {
+ while (!finished) {
+ wait();
+ }
+ if (error != null) {
+ Throwables.propagateIfPossible(error, Exception.class);
+ throw new Exception(error);
+ }
+ return results;
+ }
+
+ public ScanMetrics getScanMetrics() {
+ return scanMetrics;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java
index a8aad0b..2e64593 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java
@@ -17,9 +17,6 @@
*/
package org.apache.hadoop.hbase.client;
-import com.google.common.base.Throwables;
-
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
@@ -37,45 +34,6 @@ import org.junit.runners.Parameterized.Parameters;
@Category({ LargeTests.class, ClientTests.class })
public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
- private static final class SimpleScanResultConsumer implements ScanResultConsumer {
-
- private final List<Result> results = new ArrayList<>();
-
- private Throwable error;
-
- private boolean finished = false;
-
- @Override
- public synchronized boolean onNext(Result result) {
- results.add(result);
- return true;
- }
-
- @Override
- public synchronized void onError(Throwable error) {
- this.error = error;
- finished = true;
- notifyAll();
- }
-
- @Override
- public synchronized void onComplete() {
- finished = true;
- notifyAll();
- }
-
- public synchronized List<Result> getAll() throws Exception {
- while (!finished) {
- wait();
- }
- if (error != null) {
- Throwables.propagateIfPossible(error, Exception.class);
- throw new Exception(error);
- }
- return results;
- }
- }
-
@Parameter(0)
public String scanType;
http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
new file mode 100644
index 0000000..b877dac
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ForkJoinPool;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableScanMetrics {
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static final TableName TABLE_NAME = TableName.valueOf("ScanMetrics");
+
+ private static final byte[] CF = Bytes.toBytes("cf");
+
+ private static final byte[] CQ = Bytes.toBytes("cq");
+
+ private static final byte[] VALUE = Bytes.toBytes("value");
+
+ private static AsyncConnection CONN;
+
+ private static int NUM_REGIONS;
+
+ @FunctionalInterface
+ private interface ScanWithMetrics {
+ Pair<List<Result>, ScanMetrics> scan(Scan scan) throws Exception;
+ }
+
+ @Parameter(0)
+ public String methodName;
+
+ @Parameter(1)
+ public ScanWithMetrics method;
+
+ @Parameters(name = "{index}: scan={0}")
+ public static List<Object[]> params() {
+ ScanWithMetrics doScanWithRawAsyncTable = TestAsyncTableScanMetrics::doScanWithRawAsyncTable;
+ ScanWithMetrics doScanWithAsyncTableScan = TestAsyncTableScanMetrics::doScanWithAsyncTableScan;
+ ScanWithMetrics doScanWithAsyncTableScanner =
+ TestAsyncTableScanMetrics::doScanWithAsyncTableScanner;
+ return Arrays.asList(new Object[] { "doScanWithRawAsyncTable", doScanWithRawAsyncTable },
+ new Object[] { "doScanWithAsyncTableScan", doScanWithAsyncTableScan },
+ new Object[] { "doScanWithAsyncTableScanner", doScanWithAsyncTableScanner });
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UTIL.startMiniCluster(3);
+ // Create 3 rows in the table, with rowkeys starting with "zzz*" so that
+ // scan are forced to hit all the regions.
+ try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
+ table.put(Arrays.asList(new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE),
+ new Put(Bytes.toBytes("zzz2")).addColumn(CF, CQ, VALUE),
+ new Put(Bytes.toBytes("zzz3")).addColumn(CF, CQ, VALUE)));
+ }
+ CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
+ NUM_REGIONS = UTIL.getHBaseCluster().getRegions(TABLE_NAME).size();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ IOUtils.closeQuietly(CONN);
+ UTIL.shutdownMiniCluster();
+ }
+
+ private static Pair<List<Result>, ScanMetrics> doScanWithRawAsyncTable(Scan scan)
+ throws IOException, InterruptedException {
+ SimpleRawScanResultConsumer consumer = new SimpleRawScanResultConsumer();
+ CONN.getRawTable(TABLE_NAME).scan(scan, consumer);
+ List<Result> results = new ArrayList<>();
+ for (Result result; (result = consumer.take()) != null;) {
+ results.add(result);
+ }
+ return Pair.newPair(results, consumer.getScanMetrics());
+ }
+
+ private static Pair<List<Result>, ScanMetrics> doScanWithAsyncTableScan(Scan scan)
+ throws Exception {
+ SimpleScanResultConsumer consumer = new SimpleScanResultConsumer();
+ CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).scan(scan, consumer);
+ return Pair.newPair(consumer.getAll(), consumer.getScanMetrics());
+ }
+
+ private static Pair<List<Result>, ScanMetrics> doScanWithAsyncTableScanner(Scan scan)
+ throws IOException {
+ try (ResultScanner scanner =
+ CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).getScanner(scan)) {
+ List<Result> results = new ArrayList<>();
+ for (Result result; (result = scanner.next()) != null;) {
+ results.add(result);
+ }
+ return Pair.newPair(results, scanner.getScanMetrics());
+ }
+ }
+
+ @Test
+ public void testNoScanMetrics() throws Exception {
+ Pair<List<Result>, ScanMetrics> pair = method.scan(new Scan());
+ assertEquals(3, pair.getFirst().size());
+ assertNull(pair.getSecond());
+ }
+
+ @Test
+ public void testScanMetrics() throws Exception {
+ Pair<List<Result>, ScanMetrics> pair = method.scan(new Scan().setScanMetricsEnabled(true));
+ List<Result> results = pair.getFirst();
+ assertEquals(3, results.size());
+ long bytes = results.stream().flatMap(r -> Arrays.asList(r.rawCells()).stream())
+ .mapToLong(c -> CellUtil.estimatedSerializedSizeOf(c)).sum();
+ ScanMetrics scanMetrics = pair.getSecond();
+ assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
+ assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
+ assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get());
+ // also assert a server side metric to ensure that we have published them into the client side
+ // metrics.
+ assertEquals(3, scanMetrics.countOfRowsScanned.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
index 72179c8..5311ca2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
@@ -17,13 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
-import com.google.common.base.Throwables;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
-import java.util.Queue;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -39,53 +34,6 @@ import org.junit.runners.Parameterized.Parameters;
@Category({ MediumTests.class, ClientTests.class })
public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
- private static final class SimpleRawScanResultConsumer implements RawScanResultConsumer {
-
- private final Queue<Result> queue = new ArrayDeque<>();
-
- private boolean finished;
-
- private Throwable error;
-
- @Override
- public synchronized void onNext(Result[] results, ScanController controller) {
- for (Result result : results) {
- queue.offer(result);
- }
- notifyAll();
- }
-
- @Override
- public synchronized void onError(Throwable error) {
- finished = true;
- this.error = error;
- notifyAll();
- }
-
- @Override
- public synchronized void onComplete() {
- finished = true;
- notifyAll();
- }
-
- public synchronized Result take() throws IOException, InterruptedException {
- for (;;) {
- if (!queue.isEmpty()) {
- return queue.poll();
- }
- if (finished) {
- if (error != null) {
- Throwables.propagateIfPossible(error, IOException.class);
- throw new IOException(error);
- } else {
- return null;
- }
- }
- wait();
- }
- }
- }
-
@Parameter(0)
public String scanType;