You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/03/20 12:57:32 UTC

hbase git commit: HBASE-17691 Add ScanMetrics support for async scan

Repository: hbase
Updated Branches:
  refs/heads/master 7c03a213f -> 5b4bb8217


HBASE-17691 Add ScanMetrics support for async scan


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5b4bb821
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5b4bb821
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5b4bb821

Branch: refs/heads/master
Commit: 5b4bb8217dd4327a89fa29c93ac37bc887d96c2c
Parents: 7c03a21
Author: zhangduo <zh...@apache.org>
Authored: Mon Mar 20 17:12:53 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Mon Mar 20 20:54:04 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncClientScanner.java |  34 +++-
 .../client/AsyncRpcRetryingCallerFactory.java   |  24 ++-
 .../AsyncScanSingleRegionRpcRetryingCaller.java |  35 ++--
 .../hadoop/hbase/client/AsyncTableBase.java     |   9 +-
 .../hadoop/hbase/client/AsyncTableImpl.java     |   1 +
 .../hbase/client/AsyncTableResultScanner.java   |   9 +-
 .../hadoop/hbase/client/ClientScanner.java      |   8 +-
 .../hadoop/hbase/client/ConnectionUtils.java    |  75 +++++++++
 .../hbase/client/RawScanResultConsumer.java     |  10 ++
 .../hbase/client/ReversedScannerCallable.java   |  10 +-
 .../hadoop/hbase/client/ScanResultConsumer.java |   9 ++
 .../hadoop/hbase/client/ScannerCallable.java    |  88 ++--------
 .../client/SimpleRawScanResultConsumer.java     |  84 ++++++++++
 .../hbase/client/SimpleScanResultConsumer.java  |  75 +++++++++
 .../hadoop/hbase/client/TestAsyncTableScan.java |  42 -----
 .../hbase/client/TestAsyncTableScanMetrics.java | 159 +++++++++++++++++++
 .../hbase/client/TestRawAsyncTableScan.java     |  52 ------
 17 files changed, 526 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index fa7aa81..2c1693d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -19,8 +19,7 @@ package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
 import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
@@ -29,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
@@ -51,6 +51,8 @@ class AsyncClientScanner {
   // AsyncScanSingleRegionRpcRetryingCaller will modify this scan object directly.
   private final Scan scan;
 
+  private final ScanMetrics scanMetrics;
+
   private final RawScanResultConsumer consumer;
 
   private final TableName tableName;
@@ -88,29 +90,46 @@ class AsyncClientScanner {
     this.rpcTimeoutNs = rpcTimeoutNs;
     this.startLogErrorsCnt = startLogErrorsCnt;
     this.resultCache = createScanResultCache(scan);
+    if (scan.isScanMetricsEnabled()) {
+      this.scanMetrics = new ScanMetrics();
+      consumer.onScanMetricsCreated(scanMetrics);
+    } else {
+      this.scanMetrics = null;
+    }
   }
 
   private static final class OpenScannerResponse {
 
     public final HRegionLocation loc;
 
+    public final boolean isRegionServerRemote;
+
     public final ClientService.Interface stub;
 
     public final HBaseRpcController controller;
 
     public final ScanResponse resp;
 
-    public OpenScannerResponse(HRegionLocation loc, Interface stub, HBaseRpcController controller,
-        ScanResponse resp) {
+    public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, Interface stub,
+        HBaseRpcController controller, ScanResponse resp) {
       this.loc = loc;
+      this.isRegionServerRemote = isRegionServerRemote;
       this.stub = stub;
       this.controller = controller;
       this.resp = resp;
     }
   }
 
+  private int openScannerTries;
+
   private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller,
       HRegionLocation loc, ClientService.Interface stub) {
+    boolean isRegionServerRemote = isRemote(loc.getHostname());
+    incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
+    if (openScannerTries > 1) {
+      incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
+    }
+    openScannerTries++;
     CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
     try {
       ScanRequest request = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(),
@@ -120,7 +139,7 @@ class AsyncClientScanner {
           future.completeExceptionally(controller.getFailed());
           return;
         }
-        future.complete(new OpenScannerResponse(loc, stub, controller, resp));
+        future.complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp));
       });
     } catch (IOException e) {
       future.completeExceptionally(e);
@@ -130,8 +149,9 @@ class AsyncClientScanner {
 
   private void startScan(OpenScannerResponse resp) {
     conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
+        .remote(resp.isRegionServerRemote)
         .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
-        .setScan(scan).consumer(consumer).resultCache(resultCache)
+        .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
         .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
         .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
         .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
@@ -149,6 +169,8 @@ class AsyncClientScanner {
   }
 
   private void openScanner() {
+    incRegionCountMetrics(scanMetrics);
+    openScannerTries = 1;
     conn.callerFactory.<OpenScannerResponse> single().table(tableName).row(scan.getStartRow())
         .locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
         .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)

http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index 08f52fc..d71b428 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.client;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
 
 import io.netty.util.HashedWheelTimer;
 
@@ -31,10 +31,10 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
-import org.apache.hadoop.ipc.ProtobufRpcEngine.Server;
 
 /**
  * Factory to create an AsyncRpcRetryCaller.
@@ -148,6 +148,8 @@ class AsyncRpcRetryingCallerFactory {
 
     private Scan scan;
 
+    private ScanMetrics scanMetrics;
+
     private ScanResultCache resultCache;
 
     private RawScanResultConsumer consumer;
@@ -156,6 +158,8 @@ class AsyncRpcRetryingCallerFactory {
 
     private HRegionLocation loc;
 
+    private boolean isRegionServerRemote;
+
     private long scannerLeaseTimeoutPeriodNs;
 
     private long scanTimeoutNs;
@@ -172,6 +176,16 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
+    public ScanSingleRegionCallerBuilder metrics(ScanMetrics scanMetrics) {
+      this.scanMetrics = scanMetrics;
+      return this;
+    }
+
+    public ScanSingleRegionCallerBuilder remote(boolean isRegionServerRemote) {
+      this.isRegionServerRemote = isRegionServerRemote;
+      return this;
+    }
+
     public ScanSingleRegionCallerBuilder resultCache(ScanResultCache resultCache) {
       this.resultCache = resultCache;
       return this;
@@ -226,11 +240,11 @@ class AsyncRpcRetryingCallerFactory {
     public AsyncScanSingleRegionRpcRetryingCaller build() {
       checkArgument(scannerId >= 0, "invalid scannerId %d", scannerId);
       return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn,
-          checkNotNull(scan, "scan is null"), scannerId,
+          checkNotNull(scan, "scan is null"), scanMetrics, scannerId,
           checkNotNull(resultCache, "resultCache is null"),
           checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
-          checkNotNull(loc, "location is null"), scannerLeaseTimeoutPeriodNs, pauseNs, maxAttempts,
-          scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+          checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs,
+          pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index dd843ed..7ed6f03 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.RawScanResultConsumer.ScanResumer;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
@@ -73,6 +74,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
 
   private final Scan scan;
 
+  private final ScanMetrics scanMetrics;
+
   private final long scannerId;
 
   private final ScanResultCache resultCache;
@@ -83,6 +86,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
 
   private final HRegionLocation loc;
 
+  private final boolean regionServerRemote;
+
   private final long scannerLeaseTimeoutPeriodNs;
 
   private final long pauseNs;
@@ -107,7 +112,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
 
   private long nextCallStartNs;
 
-  private int tries = 1;
+  private int tries;
 
   private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
 
@@ -279,17 +284,19 @@ class AsyncScanSingleRegionRpcRetryingCaller {
   }
 
   public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer,
-      AsyncConnectionImpl conn, Scan scan, long scannerId, ScanResultCache resultCache,
-      RawScanResultConsumer consumer, Interface stub, HRegionLocation loc,
-      long scannerLeaseTimeoutPeriodNs, long pauseNs, int maxAttempts, long scanTimeoutNs,
-      long rpcTimeoutNs, int startLogErrorsCnt) {
+      AsyncConnectionImpl conn, Scan scan, ScanMetrics scanMetrics, long scannerId,
+      ScanResultCache resultCache, RawScanResultConsumer consumer, Interface stub,
+      HRegionLocation loc, boolean isRegionServerRemote, long scannerLeaseTimeoutPeriodNs,
+      long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
     this.retryTimer = retryTimer;
     this.scan = scan;
+    this.scanMetrics = scanMetrics;
     this.scannerId = scannerId;
     this.resultCache = resultCache;
     this.consumer = consumer;
     this.stub = stub;
     this.loc = loc;
+    this.regionServerRemote = isRegionServerRemote;
     this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
     this.pauseNs = pauseNs;
     this.maxAttempts = maxAttempts;
@@ -315,6 +322,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
   }
 
   private void closeScanner() {
+    incRPCCallsMetrics(scanMetrics, regionServerRemote);
     resetController(controller, rpcTimeoutNs);
     ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
     stub.scan(controller, req, resp -> {
@@ -345,6 +353,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
   }
 
   private void completeWhenError(boolean closeScanner) {
+    incRPCRetriesMetrics(scanMetrics, closeScanner);
     resultCache.clear();
     if (closeScanner) {
       closeScanner();
@@ -449,12 +458,14 @@ class AsyncScanSingleRegionRpcRetryingCaller {
       onError(controller.getFailed());
       return;
     }
+    updateServerSideMetrics(scanMetrics, resp);
     boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage();
     Result[] results;
     try {
+      Result[] rawResults = ResponseConverter.getResults(controller.cellScanner(), resp);
+      updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage);
       results = resultCache.addAndGet(
-        Optional.ofNullable(ResponseConverter.getResults(controller.cellScanner(), resp))
-            .orElse(ScanResultCache.EMPTY_RESULT_ARRAY),
+        Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY),
         isHeartbeatMessage);
     } catch (IOException e) {
       // We can not retry here. The server has responded normally and the call sequence has been
@@ -464,6 +475,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
       completeWhenError(true);
       return;
     }
+
     // calculate this before calling onNext as it is free for user to modify the result array in
     // onNext.
     int numberOfIndividualRows = numberOfIndividualRows(Arrays.asList(results));
@@ -510,6 +522,10 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     } else {
       callTimeoutNs = 0L;
     }
+    incRPCCallsMetrics(scanMetrics, regionServerRemote);
+    if (tries > 1) {
+      incRPCRetriesMetrics(scanMetrics, regionServerRemote);
+    }
     resetController(controller, callTimeoutNs);
     ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
       nextCallSeq, false, false, scan.getLimit());
@@ -518,13 +534,14 @@ class AsyncScanSingleRegionRpcRetryingCaller {
 
   private void next() {
     nextCallSeq++;
-    tries = 0;
+    tries = 1;
     exceptions.clear();
     nextCallStartNs = System.nanoTime();
     call();
   }
 
   private void renewLease() {
+    incRPCCallsMetrics(scanMetrics, regionServerRemote);
     nextCallSeq++;
     resetController(controller, rpcTimeoutNs);
     ScanRequest req =

http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
index e201ab2..b5a251b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
@@ -322,7 +322,14 @@ public interface AsyncTableBase {
    * If your result set is very large, you should use other scan method to get a scanner or use
    * callback to process the results. They will do chunking to prevent OOM. The scanAll method will
    * fetch all the results and store them in a List and then return the list to you.
-   * @param scan A configured {@link Scan} object. SO if you use this method to fetch a really large
+   * <p>
+   * The scan metrics will be collected background if you enable it but you have no way to get it.
+   * Usually you can get scan metrics from {@code ResultScanner}, or through
+   * {@code ScanResultConsumer.onScanMetricsCreated} but this method only returns a list of results.
+   * So if you really care about scan metrics then you'd better use other scan methods which return
+   * a {@code ResultScanner} or let you pass in a {@code ScanResultConsumer}. There is no
+   * performance difference between these scan methods so do not worry.
+   * @param scan A configured {@link Scan} object. So if you use this method to fetch a really large
    *          result set, it is likely to cause OOM.
    * @return The results of this small scan operation. The return value will be wrapped by a
    *         {@link CompletableFuture}.

http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index f1625ad..29c0698 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -162,6 +162,7 @@ class AsyncTableImpl implements AsyncTable {
 
   private void scan0(Scan scan, ScanResultConsumer consumer) {
     try (ResultScanner scanner = getScanner(scan)) {
+      consumer.onScanMetricsCreated(scanner.getScanMetrics());
       for (Result result; (result = scanner.next()) != null;) {
         if (!consumer.onNext(result)) {
           break;

http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
index eef797c..b6823f9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
@@ -48,6 +48,8 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
 
   private final Queue<Result> queue = new ArrayDeque<>();
 
+  private ScanMetrics scanMetrics;
+
   private long cacheSize;
 
   private boolean closed = false;
@@ -110,6 +112,11 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
     notifyAll();
   }
 
+  @Override
+  public void onScanMetricsCreated(ScanMetrics scanMetrics) {
+    this.scanMetrics = scanMetrics;
+  }
+
   private void resumePrefetch() {
     if (LOG.isDebugEnabled()) {
       LOG.debug(String.format("0x%x", System.identityHashCode(this)) + " resume prefetching");
@@ -168,6 +175,6 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
 
   @Override
   public ScanMetrics getScanMetrics() {
-    throw new UnsupportedOperationException();
+    return scanMetrics;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index a8b029f..8aa5c53 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.numberOfIndividualRows;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -250,9 +251,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
         new ScannerCallableWithReplicas(getTable(), getConnection(), createScannerCallable(), pool,
             primaryOperationTimeout, scan, getRetries(), scannerTimeout, caching, conf, caller);
     this.callable.setCaching(this.caching);
-    if (this.scanMetrics != null) {
-      this.scanMetrics.countOfRegions.incrementAndGet();
-    }
+    incRegionCountMetrics(scanMetrics);
     return true;
   }
 
@@ -460,7 +459,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
       // Groom the array of Results that we received back from the server before adding that
       // Results to the scanner's cache. If partial results are not allowed to be seen by the
       // caller, all book keeping will be performed within this method.
-      Result[] resultsToAddToCache = scanResultCache.addAndGet(values, callable.isHeartbeatMessage());
+      Result[] resultsToAddToCache =
+          scanResultCache.addAndGet(values, callable.isHeartbeatMessage());
       if (resultsToAddToCache.length > 0) {
         for (Result rs : resultsToAddToCache) {
           cache.add(rs);

http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 3e7cd00..f54f552 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -47,16 +47,20 @@ import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.DNS;
 
 /**
  * Utility used by client connections.
@@ -424,4 +428,75 @@ public final class ConnectionUtils {
       return new CompleteScanResultCache();
     }
   }
+
+  private static final String MY_ADDRESS = getMyAddress();
+
+  private static String getMyAddress() {
+    try {
+      return DNS.getDefaultHost("default", "default");
+    } catch (UnknownHostException uhe) {
+      LOG.error("cannot determine my address", uhe);
+      return null;
+    }
+  }
+
+  static boolean isRemote(String host) {
+    return !host.equalsIgnoreCase(MY_ADDRESS);
+  }
+
+  static void incRPCCallsMetrics(ScanMetrics scanMetrics, boolean isRegionServerRemote) {
+    if (scanMetrics == null) {
+      return;
+    }
+    scanMetrics.countOfRPCcalls.incrementAndGet();
+    if (isRegionServerRemote) {
+      scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
+    }
+  }
+
+  static void incRPCRetriesMetrics(ScanMetrics scanMetrics, boolean isRegionServerRemote) {
+    if (scanMetrics == null) {
+      return;
+    }
+    scanMetrics.countOfRPCRetries.incrementAndGet();
+    if (isRegionServerRemote) {
+      scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
+    }
+  }
+
+  static void updateResultsMetrics(ScanMetrics scanMetrics, Result[] rrs,
+      boolean isRegionServerRemote) {
+    if (scanMetrics == null || rrs == null || rrs.length == 0) {
+      return;
+    }
+    long resultSize = 0;
+    for (Result rr : rrs) {
+      for (Cell cell : rr.rawCells()) {
+        resultSize += CellUtil.estimatedSerializedSizeOf(cell);
+      }
+    }
+    scanMetrics.countOfBytesInResults.addAndGet(resultSize);
+    if (isRegionServerRemote) {
+      scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
+    }
+  }
+
+  /**
+   * Use the scan metrics returned by the server to add to the identically named counters in the
+   * client side metrics. If a counter does not exist with the same name as the server side metric,
+   * the attempt to increase the counter will fail.
+   */
+  static void updateServerSideMetrics(ScanMetrics scanMetrics, ScanResponse response) {
+    if (scanMetrics == null || response == null || !response.hasScanMetrics()) {
+      return;
+    }
+    ResponseConverter.getScanMetrics(response).forEach(scanMetrics::addToCounter);
+  }
+
+  static void incRegionCountMetrics(ScanMetrics scanMetrics) {
+    if (scanMetrics == null) {
+      return;
+    }
+    scanMetrics.countOfRegions.incrementAndGet();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
index 17e0afa..899c0bb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 
 /**
  * Receives {@link Result} for an asynchronous scan.
@@ -112,4 +113,13 @@ public interface RawScanResultConsumer {
    * Indicate that the scan operation is completed normally.
    */
   void onComplete();
+
+  /**
+   * If {@code scan.isScanMetricsEnabled()} returns true, then this method will be called prior to
+   * all other methods in this interface to give you the {@link ScanMetrics} instance for this scan
+   * operation. The {@link ScanMetrics} instance will be updated on-the-fly during the scan, you can
+   * store it somewhere to get the metrics at any time if you want.
+   */
+  default void onScanMetricsCreated(ScanMetrics scanMetrics) {
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
index 1d46ab4..538fe30 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
@@ -18,7 +18,8 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createCloseRowBefore;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
 
 import java.io.IOException;
@@ -113,11 +114,8 @@ public class ReversedScannerCallable extends ScannerCallable {
     }
 
     // check how often we retry.
-    if (reload && this.scanMetrics != null) {
-      this.scanMetrics.countOfRPCRetries.incrementAndGet();
-      if (isRegionServerRemote) {
-        this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
-      }
+    if (reload) {
+      incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java
index 770a87f..03b1ba0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 
 /**
  * Receives {@link Result} for an asynchronous scan.
@@ -45,4 +46,12 @@ public interface ScanResultConsumer {
    */
   void onComplete();
 
+  /**
+   * If {@code scan.isScanMetricsEnabled()} returns true, then this method will be called prior to
+   * all other methods in this interface to give you the {@link ScanMetrics} instance for this scan
+   * operation. The {@link ScanMetrics} instance will be updated on-the-fly during the scan, you can
+   * store it somewhere to get the metrics at any time if you want.
+   */
+  default void onScanMetricsCreated(ScanMetrics scanMetrics) {
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 0682a7a..ffac566 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -18,17 +18,18 @@
 
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;
+
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.net.UnknownHostException;
-import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -48,7 +49,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
-import org.apache.hadoop.net.DNS;
 
 /**
  * Scanner operations such as create, next, etc.
@@ -72,7 +72,6 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
   protected ScanMetrics scanMetrics;
   private boolean logScannerActivity = false;
   private int logCutOffLatency = 1000;
-  private static String myAddress;
   protected final int id;
 
   enum MoreResults {
@@ -87,13 +86,6 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
    * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()}
    */
   protected boolean heartbeatMessage = false;
-  static {
-    try {
-      myAddress = DNS.getDefaultHost("default", "default");
-    } catch (UnknownHostException uhe) {
-      LOG.error("cannot determine my address", uhe);
-    }
-  }
 
   // indicate if it is a remote server call
   protected boolean isRegionServerRemote = true;
@@ -158,30 +150,23 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
     }
 
     // check how often we retry.
-    if (reload && this.scanMetrics != null) {
-      this.scanMetrics.countOfRPCRetries.incrementAndGet();
-      if (isRegionServerRemote) {
-        this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
-      }
+    if (reload) {
+      incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
     }
   }
 
   /**
-   * compare the local machine hostname with region server's hostname
-   * to decide if hbase client connects to a remote region server
+   * compare the local machine hostname with region server's hostname to decide if hbase client
+   * connects to a remote region server
    */
   protected void checkIfRegionServerIsRemote() {
-    if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
-      isRegionServerRemote = false;
-    } else {
-      isRegionServerRemote = true;
-    }
+    isRegionServerRemote = isRemote(getLocation().getHostname());
   }
 
   private ScanResponse next() throws IOException {
     // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server
     setHeartbeatMessage(false);
-    incRPCcallsMetrics();
+    incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
     ScanRequest request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
       this.scanMetrics != null, renew, scan.getLimit());
     try {
@@ -267,7 +252,7 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
             + scannerId);
       }
     }
-    updateServerSideMetrics(response);
+    updateServerSideMetrics(scanMetrics, response);
     // moreResults is only used for the case where a filter exhausts all elements
     if (response.hasMoreResults()) {
       if (response.getMoreResults()) {
@@ -289,7 +274,7 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
     } else {
       setMoreResultsInRegion(MoreResults.UNKNOWN);
     }
-    updateResultsMetrics(rrs);
+    updateResultsMetrics(scanMetrics, rrs, isRegionServerRemote);
     return rrs;
   }
 
@@ -307,53 +292,12 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
     this.heartbeatMessage = heartbeatMessage;
   }
 
-  private void incRPCcallsMetrics() {
-    if (this.scanMetrics == null) {
-      return;
-    }
-    this.scanMetrics.countOfRPCcalls.incrementAndGet();
-    if (isRegionServerRemote) {
-      this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
-    }
-  }
-
-  protected void updateResultsMetrics(Result[] rrs) {
-    if (this.scanMetrics == null || rrs == null || rrs.length == 0) {
-      return;
-    }
-    long resultSize = 0;
-    for (Result rr : rrs) {
-      for (Cell cell : rr.rawCells()) {
-        resultSize += CellUtil.estimatedSerializedSizeOf(cell);
-      }
-    }
-    this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
-    if (isRegionServerRemote) {
-      this.scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
-    }
-  }
-
-  /**
-   * Use the scan metrics returned by the server to add to the identically named counters in the
-   * client side metrics. If a counter does not exist with the same name as the server side metric,
-   * the attempt to increase the counter will fail.
-   * @param response
-   */
-  private void updateServerSideMetrics(ScanResponse response) {
-    if (this.scanMetrics == null || response == null || !response.hasScanMetrics()) return;
-
-    Map<String, Long> serverMetrics = ResponseConverter.getScanMetrics(response);
-    for (Entry<String, Long> entry : serverMetrics.entrySet()) {
-      this.scanMetrics.addToCounter(entry.getKey(), entry.getValue());
-    }
-  }
-
   private void close() {
     if (this.scannerId == -1L) {
       return;
     }
     try {
-      incRPCcallsMetrics();
+      incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
       ScanRequest request =
           RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
       try {
@@ -371,7 +315,7 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
   }
 
   private ScanResponse openScanner() throws IOException {
-    incRPCcallsMetrics();
+    incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
     ScanRequest request = RequestConverter.buildScanRequest(
       getLocation().getRegionInfo().getRegionName(), this.scan, this.caching, false);
     try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleRawScanResultConsumer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleRawScanResultConsumer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleRawScanResultConsumer.java
new file mode 100644
index 0000000..026a21f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleRawScanResultConsumer.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.common.base.Throwables;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+
+class SimpleRawScanResultConsumer implements RawScanResultConsumer {
+
+  private ScanMetrics scanMetrics;
+
+  private final Queue<Result> queue = new ArrayDeque<>();
+
+  private boolean finished;
+
+  private Throwable error;
+
+  @Override
+  public void onScanMetricsCreated(ScanMetrics scanMetrics) {
+    this.scanMetrics = scanMetrics;
+  }
+
+  @Override
+  public synchronized void onNext(Result[] results, ScanController controller) {
+    for (Result result : results) {
+      queue.offer(result);
+    }
+    notifyAll();
+  }
+
+  @Override
+  public synchronized void onError(Throwable error) {
+    finished = true;
+    this.error = error;
+    notifyAll();
+  }
+
+  @Override
+  public synchronized void onComplete() {
+    finished = true;
+    notifyAll();
+  }
+
+  public synchronized Result take() throws IOException, InterruptedException {
+    for (;;) {
+      if (!queue.isEmpty()) {
+        return queue.poll();
+      }
+      if (finished) {
+        if (error != null) {
+          Throwables.propagateIfPossible(error, IOException.class);
+          throw new IOException(error);
+        } else {
+          return null;
+        }
+      }
+      wait();
+    }
+  }
+
+  public ScanMetrics getScanMetrics() {
+    return scanMetrics;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumer.java
new file mode 100644
index 0000000..168129d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumer.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.common.base.Throwables;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+
+final class SimpleScanResultConsumer implements ScanResultConsumer {
+
+  private ScanMetrics scanMetrics;
+
+  private final List<Result> results = new ArrayList<>();
+
+  private Throwable error;
+
+  private boolean finished = false;
+
+  @Override
+  public void onScanMetricsCreated(ScanMetrics scanMetrics) {
+    this.scanMetrics = scanMetrics;
+  }
+
+  @Override
+  public synchronized boolean onNext(Result result) {
+    results.add(result);
+    return true;
+  }
+
+  @Override
+  public synchronized void onError(Throwable error) {
+    this.error = error;
+    finished = true;
+    notifyAll();
+  }
+
+  @Override
+  public synchronized void onComplete() {
+    finished = true;
+    notifyAll();
+  }
+
+  public synchronized List<Result> getAll() throws Exception {
+    while (!finished) {
+      wait();
+    }
+    if (error != null) {
+      Throwables.propagateIfPossible(error, Exception.class);
+      throw new Exception(error);
+    }
+    return results;
+  }
+
+  public ScanMetrics getScanMetrics() {
+    return scanMetrics;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java
index a8aad0b..2e64593 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java
@@ -17,9 +17,6 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import com.google.common.base.Throwables;
-
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ForkJoinPool;
 import java.util.function.Supplier;
@@ -37,45 +34,6 @@ import org.junit.runners.Parameterized.Parameters;
 @Category({ LargeTests.class, ClientTests.class })
 public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
 
-  private static final class SimpleScanResultConsumer implements ScanResultConsumer {
-
-    private final List<Result> results = new ArrayList<>();
-
-    private Throwable error;
-
-    private boolean finished = false;
-
-    @Override
-    public synchronized boolean onNext(Result result) {
-      results.add(result);
-      return true;
-    }
-
-    @Override
-    public synchronized void onError(Throwable error) {
-      this.error = error;
-      finished = true;
-      notifyAll();
-    }
-
-    @Override
-    public synchronized void onComplete() {
-      finished = true;
-      notifyAll();
-    }
-
-    public synchronized List<Result> getAll() throws Exception {
-      while (!finished) {
-        wait();
-      }
-      if (error != null) {
-        Throwables.propagateIfPossible(error, Exception.class);
-        throw new Exception(error);
-      }
-      return results;
-    }
-  }
-
   @Parameter(0)
   public String scanType;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
new file mode 100644
index 0000000..b877dac
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ForkJoinPool;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableScanMetrics {
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static final TableName TABLE_NAME = TableName.valueOf("ScanMetrics");
+
+  private static final byte[] CF = Bytes.toBytes("cf");
+
+  private static final byte[] CQ = Bytes.toBytes("cq");
+
+  private static final byte[] VALUE = Bytes.toBytes("value");
+
+  private static AsyncConnection CONN;
+
+  private static int NUM_REGIONS;
+
+  @FunctionalInterface
+  private interface ScanWithMetrics {
+    Pair<List<Result>, ScanMetrics> scan(Scan scan) throws Exception;
+  }
+
+  @Parameter(0)
+  public String methodName;
+
+  @Parameter(1)
+  public ScanWithMetrics method;
+
+  @Parameters(name = "{index}: scan={0}")
+  public static List<Object[]> params() {
+    ScanWithMetrics doScanWithRawAsyncTable = TestAsyncTableScanMetrics::doScanWithRawAsyncTable;
+    ScanWithMetrics doScanWithAsyncTableScan = TestAsyncTableScanMetrics::doScanWithAsyncTableScan;
+    ScanWithMetrics doScanWithAsyncTableScanner =
+        TestAsyncTableScanMetrics::doScanWithAsyncTableScanner;
+    return Arrays.asList(new Object[] { "doScanWithRawAsyncTable", doScanWithRawAsyncTable },
+      new Object[] { "doScanWithAsyncTableScan", doScanWithAsyncTableScan },
+      new Object[] { "doScanWithAsyncTableScanner", doScanWithAsyncTableScanner });
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL.startMiniCluster(3);
+    // Create 3 rows in the table, with rowkeys starting with "zzz*" so that
+    // scan are forced to hit all the regions.
+    try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
+      table.put(Arrays.asList(new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE),
+        new Put(Bytes.toBytes("zzz2")).addColumn(CF, CQ, VALUE),
+        new Put(Bytes.toBytes("zzz3")).addColumn(CF, CQ, VALUE)));
+    }
+    CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
+    NUM_REGIONS = UTIL.getHBaseCluster().getRegions(TABLE_NAME).size();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    IOUtils.closeQuietly(CONN);
+    UTIL.shutdownMiniCluster();
+  }
+
+  private static Pair<List<Result>, ScanMetrics> doScanWithRawAsyncTable(Scan scan)
+      throws IOException, InterruptedException {
+    SimpleRawScanResultConsumer consumer = new SimpleRawScanResultConsumer();
+    CONN.getRawTable(TABLE_NAME).scan(scan, consumer);
+    List<Result> results = new ArrayList<>();
+    for (Result result; (result = consumer.take()) != null;) {
+      results.add(result);
+    }
+    return Pair.newPair(results, consumer.getScanMetrics());
+  }
+
+  private static Pair<List<Result>, ScanMetrics> doScanWithAsyncTableScan(Scan scan)
+      throws Exception {
+    SimpleScanResultConsumer consumer = new SimpleScanResultConsumer();
+    CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).scan(scan, consumer);
+    return Pair.newPair(consumer.getAll(), consumer.getScanMetrics());
+  }
+
+  private static Pair<List<Result>, ScanMetrics> doScanWithAsyncTableScanner(Scan scan)
+      throws IOException {
+    try (ResultScanner scanner =
+        CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).getScanner(scan)) {
+      List<Result> results = new ArrayList<>();
+      for (Result result; (result = scanner.next()) != null;) {
+        results.add(result);
+      }
+      return Pair.newPair(results, scanner.getScanMetrics());
+    }
+  }
+
+  @Test
+  public void testNoScanMetrics() throws Exception {
+    Pair<List<Result>, ScanMetrics> pair = method.scan(new Scan());
+    assertEquals(3, pair.getFirst().size());
+    assertNull(pair.getSecond());
+  }
+
+  @Test
+  public void testScanMetrics() throws Exception {
+    Pair<List<Result>, ScanMetrics> pair = method.scan(new Scan().setScanMetricsEnabled(true));
+    List<Result> results = pair.getFirst();
+    assertEquals(3, results.size());
+    long bytes = results.stream().flatMap(r -> Arrays.asList(r.rawCells()).stream())
+        .mapToLong(c -> CellUtil.estimatedSerializedSizeOf(c)).sum();
+    ScanMetrics scanMetrics = pair.getSecond();
+    assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
+    assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
+    assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get());
+    // also assert a server side metric to ensure that we have published them into the client side
+    // metrics.
+    assertEquals(3, scanMetrics.countOfRowsScanned.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5b4bb821/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
index 72179c8..5311ca2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
@@ -17,13 +17,8 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import com.google.common.base.Throwables;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Queue;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -39,53 +34,6 @@ import org.junit.runners.Parameterized.Parameters;
 @Category({ MediumTests.class, ClientTests.class })
 public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
 
-  private static final class SimpleRawScanResultConsumer implements RawScanResultConsumer {
-
-    private final Queue<Result> queue = new ArrayDeque<>();
-
-    private boolean finished;
-
-    private Throwable error;
-
-    @Override
-    public synchronized void onNext(Result[] results, ScanController controller) {
-      for (Result result : results) {
-        queue.offer(result);
-      }
-      notifyAll();
-    }
-
-    @Override
-    public synchronized void onError(Throwable error) {
-      finished = true;
-      this.error = error;
-      notifyAll();
-    }
-
-    @Override
-    public synchronized void onComplete() {
-      finished = true;
-      notifyAll();
-    }
-
-    public synchronized Result take() throws IOException, InterruptedException {
-      for (;;) {
-        if (!queue.isEmpty()) {
-          return queue.poll();
-        }
-        if (finished) {
-          if (error != null) {
-            Throwables.propagateIfPossible(error, IOException.class);
-            throw new IOException(error);
-          } else {
-            return null;
-          }
-        }
-        wait();
-      }
-    }
-  }
-
   @Parameter(0)
   public String scanType;