You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/05/21 18:43:21 UTC

[38/50] [abbrv] hbase git commit: HBASE-5980 Scanner responses from RS should include metrics on rows/KVs filtered

HBASE-5980 Scanner responses from RS should include metrics on rows/KVs filtered


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/dc72dad7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/dc72dad7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/dc72dad7

Branch: refs/heads/hbase-12439
Commit: dc72dad7cd5214f2dd416b89e4bfa1c83625aff0
Parents: 77d9719
Author: stack <st...@apache.org>
Authored: Tue May 19 17:03:56 2015 -0700
Committer: stack <st...@apache.org>
Committed: Wed May 20 11:55:06 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/ScannerCallable.java    |  24 +-
 .../hbase/client/metrics/ScanMetrics.java       |  53 +-
 .../client/metrics/ServerSideScanMetrics.java   | 118 ++++
 .../hadoop/hbase/protobuf/RequestConverter.java |  16 +-
 .../hbase/protobuf/ResponseConverter.java       |  26 +
 .../hbase/protobuf/generated/ClientProtos.java  | 618 +++++++++++++++----
 hbase-protocol/src/main/protobuf/Client.proto   |   9 +-
 .../hadoop/hbase/regionserver/HRegion.java      |  37 +-
 .../regionserver/NoLimitScannerContext.java     |   2 +-
 .../hbase/regionserver/RSRpcServices.java       |  23 +
 .../regionserver/ReversedRegionScannerImpl.java |   7 +-
 .../hbase/regionserver/ScannerContext.java      |  31 +-
 ...TestServerSideScanMetricsFromClientSide.java | 324 ++++++++++
 hbase-shell/src/main/ruby/hbase.rb              |   2 +
 hbase-shell/src/main/ruby/hbase/table.rb        |  13 +-
 .../src/main/ruby/shell/commands/scan.rb        |  17 +-
 hbase-shell/src/main/ruby/shell/formatter.rb    |  31 +
 17 files changed, 1158 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/dc72dad7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 85352ff..afce287 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.client;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -202,7 +204,9 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
         setHeartbeatMessage(false);
         try {
           incRPCcallsMetrics();
-          request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
+          request =
+              RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
+                this.scanMetrics != null);
           ScanResponse response = null;
           controller = controllerFactory.newController();
           controller.setPriority(getTableName());
@@ -232,6 +236,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
                   + rows + " rows from scanner=" + scannerId);
               }
             }
+            updateServerSideMetrics(response);
             // moreResults is only used for the case where a filter exhausts all elements
             if (response.hasMoreResults() && !response.getMoreResults()) {
               scannerId = -1L;
@@ -341,6 +346,21 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
     }
   }
 
+  /**
+   * Use the scan metrics returned by the server to add to the identically named counters in the
+   * client side metrics. If a counter does not exist with the same name as the server side metric,
+   * the attempt to increase the counter will fail.
+   * @param response
+   */
+  private void updateServerSideMetrics(ScanResponse response) {
+    if (this.scanMetrics == null || response == null || !response.hasScanMetrics()) return;
+
+    Map<String, Long> serverMetrics = ResponseConverter.getScanMetrics(response);
+    for (Entry<String, Long> entry : serverMetrics.entrySet()) {
+      this.scanMetrics.addToCounter(entry.getKey(), entry.getValue());
+    }
+  }
+
   private void close() {
     if (this.scannerId == -1L) {
       return;
@@ -348,7 +368,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
     try {
       incRPCcallsMetrics();
       ScanRequest request =
-        RequestConverter.buildScanRequest(this.scannerId, 0, true);
+          RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
       try {
         getStub().scan(null, request);
       } catch (ServiceException se) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/dc72dad7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java
index 35c6667..ec2c937 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java
@@ -18,41 +18,32 @@
 
 package org.apache.hadoop.hbase.client.metrics;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 
-import com.google.common.collect.ImmutableMap;
-
 
 /**
- * Provides client-side metrics related to scan operations.
+ * Provides metrics related to scan operations (both server side and client side metrics).
+ * <p>
  * The data can be passed to mapreduce framework or other systems.
  * We use atomic longs so that one thread can increment,
  * while another atomically resets to zero after the values are reported
  * to hadoop's counters.
- *
+ * <p>
  * Some of these metrics are general for any client operation such as put
  * However, there is no need for this. So they are defined under scan operation
  * for now.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public class ScanMetrics {
-
-  /**
-   * Hash to hold the String -> Atomic Long mappings.
-   */
-  private final Map<String, AtomicLong> counters = new HashMap<String, AtomicLong>();
+public class ScanMetrics extends ServerSideScanMetrics {
 
-  // AtomicLongs to hold the metrics values.  These are all updated through ClientScanner and
-  // ScannerCallable.  They are atomic longs so that atomic getAndSet can be used to reset the
+  // AtomicLongs to hold the metrics values. These are all updated through ClientScanner and
+  // ScannerCallable. They are atomic longs so that atomic getAndSet can be used to reset the
   // values after progress is passed to hadoop's counters.
 
-
   /**
    * number of RPC calls
    */
@@ -103,36 +94,4 @@ public class ScanMetrics {
    */
   public ScanMetrics() {
   }
-
-  private AtomicLong createCounter(String counterName) {
-    AtomicLong c = new AtomicLong(0);
-    counters.put(counterName, c);
-    return c;
-  }
-
-  public void setCounter(String counterName, long value) {
-    AtomicLong c = this.counters.get(counterName);
-    if (c != null) {
-      c.set(value);
-    }
-  }
-
-  /**
-   * Get all of the values since the last time this function was called.
-   *
-   * Calling this function will reset all AtomicLongs in the instance back to 0.
-   *
-   * @return A Map of String -> Long for metrics
-   */
-  public Map<String, Long> getMetricsMap() {
-    //Create a builder
-    ImmutableMap.Builder<String, Long> builder = ImmutableMap.builder();
-    //For every entry add the value and reset the AtomicLong back to zero
-    for (Map.Entry<String, AtomicLong> e : this.counters.entrySet()) {
-      builder.put(e.getKey(), e.getValue().getAndSet(0));
-    }
-    //Build the immutable map so that people can't mess around with it.
-    return builder.build();
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/dc72dad7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
new file mode 100644
index 0000000..c971c73
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Provides server side metrics related to scan operations.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ServerSideScanMetrics {
+  /**
+   * Hash to hold the String -> Atomic Long mappings for each metric
+   */
+  private final Map<String, AtomicLong> counters = new HashMap<String, AtomicLong>();
+
+  /**
+   * Create a new counter with the specified name
+   * @param counterName
+   * @return {@link AtomicLong} instance for the counter with counterName
+   */
+  protected AtomicLong createCounter(String counterName) {
+    AtomicLong c = new AtomicLong(0);
+    counters.put(counterName, c);
+    return c;
+  }
+
+  public static final String COUNT_OF_ROWS_SCANNED_KEY = "ROWS_SCANNED";
+  public static final String COUNT_OF_ROWS_FILTERED_KEY = "ROWS_FILTERED";
+
+  /**
+   * number of rows filtered during scan RPC
+   */
+  public final AtomicLong countOfRowsFiltered = createCounter(COUNT_OF_ROWS_FILTERED_KEY);
+
+  /**
+   * number of rows scanned during scan RPC. Not every row scanned will be returned to the client
+   * since rows may be filtered.
+   */
+  public final AtomicLong countOfRowsScanned = createCounter(COUNT_OF_ROWS_SCANNED_KEY);
+
+  /**
+   * @param counterName
+   * @param value
+   */
+  public void setCounter(String counterName, long value) {
+    AtomicLong c = this.counters.get(counterName);
+    if (c != null) {
+      c.set(value);
+    }
+  }
+
+  /**
+   * @param counterName
+   * @return true if a counter exists with the counterName
+   */
+  public boolean hasCounter(String counterName) {
+    return this.counters.containsKey(counterName);
+  }
+
+  /**
+   * @param counterName
+   * @return {@link AtomicLong} instance for this counter name, null if counter does not exist.
+   */
+  public AtomicLong getCounter(String counterName) {
+    return this.counters.get(counterName);
+  }
+
+  /**
+   * @param counterName
+   * @param delta
+   */
+  public void addToCounter(String counterName, long delta) {
+    AtomicLong c = this.counters.get(counterName);
+    if (c != null) {
+      c.addAndGet(delta);
+    }
+  }
+
+  /**
+   * Get all of the values since the last time this function was called. Calling this function will
+   * reset all AtomicLongs in the instance back to 0.
+   * @return A Map of String -> Long for metrics
+   */
+  public Map<String, Long> getMetricsMap() {
+    // Create a builder
+    ImmutableMap.Builder<String, Long> builder = ImmutableMap.builder();
+    // For every entry add the value and reset the AtomicLong back to zero
+    for (Map.Entry<String, AtomicLong> e : this.counters.entrySet()) {
+      builder.put(e.getKey(), e.getValue().getAndSet(0));
+    }
+    // Build the immutable map so that people can't mess around with it.
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/dc72dad7/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
index b0dac2f..c4f6a97 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@@ -61,6 +60,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest.RegionUpdateInfo;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
@@ -478,9 +478,8 @@ public final class RequestConverter {
    * @return a scan request
    * @throws IOException
    */
-  public static ScanRequest buildScanRequest(final byte[] regionName,
-      final Scan scan, final int numberOfRows,
-        final boolean closeScanner) throws IOException {
+  public static ScanRequest buildScanRequest(final byte[] regionName, final Scan scan,
+      final int numberOfRows, final boolean closeScanner) throws IOException {
     ScanRequest.Builder builder = ScanRequest.newBuilder();
     RegionSpecifier region = buildRegionSpecifier(
       RegionSpecifierType.REGION_NAME, regionName);
@@ -490,6 +489,7 @@ public final class RequestConverter {
     builder.setScan(ProtobufUtil.toScan(scan));
     builder.setClientHandlesPartials(true);
     builder.setClientHandlesHeartbeats(true);
+    builder.setTrackScanMetrics(scan != null && scan.isScanMetricsEnabled());
     return builder.build();
   }
 
@@ -501,14 +501,15 @@ public final class RequestConverter {
    * @param closeScanner
    * @return a scan request
    */
-  public static ScanRequest buildScanRequest(final long scannerId,
-      final int numberOfRows, final boolean closeScanner) {
+  public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows,
+      final boolean closeScanner, final boolean trackMetrics) {
     ScanRequest.Builder builder = ScanRequest.newBuilder();
     builder.setNumberOfRows(numberOfRows);
     builder.setCloseScanner(closeScanner);
     builder.setScannerId(scannerId);
     builder.setClientHandlesPartials(true);
     builder.setClientHandlesHeartbeats(true);
+    builder.setTrackScanMetrics(trackMetrics);
     return builder.build();
   }
 
@@ -522,7 +523,7 @@ public final class RequestConverter {
    * @return a scan request
    */
   public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows,
-      final boolean closeScanner, final long nextCallSeq) {
+      final boolean closeScanner, final long nextCallSeq, final boolean trackMetrics) {
     ScanRequest.Builder builder = ScanRequest.newBuilder();
     builder.setNumberOfRows(numberOfRows);
     builder.setCloseScanner(closeScanner);
@@ -530,6 +531,7 @@ public final class RequestConverter {
     builder.setNextCallSeq(nextCallSeq);
     builder.setClientHandlesPartials(true);
     builder.setClientHandlesHeartbeats(true);
+    builder.setTrackScanMetrics(trackMetrics);
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/dc72dad7/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
index 4b64697..177b1c7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
@@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.protobuf;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -48,6 +50,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair;
+import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
@@ -395,4 +399,26 @@ public final class ResponseConverter {
     }
     return results;
   }
+
+  public static Map<String, Long> getScanMetrics(ScanResponse response) {
+    Map<String, Long> metricMap = new HashMap<String, Long>();
+    if (response == null || !response.hasScanMetrics() || response.getScanMetrics() == null) {
+      return metricMap;
+    }
+    
+    ScanMetrics metrics = response.getScanMetrics();
+    int numberOfMetrics = metrics.getMetricsCount();
+    for (int i = 0; i < numberOfMetrics; i++) {
+      NameInt64Pair metricPair = metrics.getMetrics(i);
+      if (metricPair != null) {
+        String name = metricPair.getName();
+        Long value = metricPair.getValue();
+        if (name != null && value != null) {
+          metricMap.put(name, value);
+        }
+      }
+    }
+
+    return metricMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/dc72dad7/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
index 2991ece..55767c7 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
@@ -16443,6 +16443,16 @@ public final class ClientProtos {
      * <code>optional bool client_handles_heartbeats = 8;</code>
      */
     boolean getClientHandlesHeartbeats();
+
+    // optional bool track_scan_metrics = 9;
+    /**
+     * <code>optional bool track_scan_metrics = 9;</code>
+     */
+    boolean hasTrackScanMetrics();
+    /**
+     * <code>optional bool track_scan_metrics = 9;</code>
+     */
+    boolean getTrackScanMetrics();
   }
   /**
    * Protobuf type {@code ScanRequest}
@@ -16564,6 +16574,11 @@ public final class ClientProtos {
               clientHandlesHeartbeats_ = input.readBool();
               break;
             }
+            case 72: {
+              bitField0_ |= 0x00000100;
+              trackScanMetrics_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -16744,6 +16759,22 @@ public final class ClientProtos {
       return clientHandlesHeartbeats_;
     }
 
+    // optional bool track_scan_metrics = 9;
+    public static final int TRACK_SCAN_METRICS_FIELD_NUMBER = 9;
+    private boolean trackScanMetrics_;
+    /**
+     * <code>optional bool track_scan_metrics = 9;</code>
+     */
+    public boolean hasTrackScanMetrics() {
+      return ((bitField0_ & 0x00000100) == 0x00000100);
+    }
+    /**
+     * <code>optional bool track_scan_metrics = 9;</code>
+     */
+    public boolean getTrackScanMetrics() {
+      return trackScanMetrics_;
+    }
+
     private void initFields() {
       region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
       scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance();
@@ -16753,6 +16784,7 @@ public final class ClientProtos {
       nextCallSeq_ = 0L;
       clientHandlesPartials_ = false;
       clientHandlesHeartbeats_ = false;
+      trackScanMetrics_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -16802,6 +16834,9 @@ public final class ClientProtos {
       if (((bitField0_ & 0x00000080) == 0x00000080)) {
         output.writeBool(8, clientHandlesHeartbeats_);
       }
+      if (((bitField0_ & 0x00000100) == 0x00000100)) {
+        output.writeBool(9, trackScanMetrics_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -16843,6 +16878,10 @@ public final class ClientProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(8, clientHandlesHeartbeats_);
       }
+      if (((bitField0_ & 0x00000100) == 0x00000100)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(9, trackScanMetrics_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -16906,6 +16945,11 @@ public final class ClientProtos {
         result = result && (getClientHandlesHeartbeats()
             == other.getClientHandlesHeartbeats());
       }
+      result = result && (hasTrackScanMetrics() == other.hasTrackScanMetrics());
+      if (hasTrackScanMetrics()) {
+        result = result && (getTrackScanMetrics()
+            == other.getTrackScanMetrics());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -16951,6 +16995,10 @@ public final class ClientProtos {
         hash = (37 * hash) + CLIENT_HANDLES_HEARTBEATS_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getClientHandlesHeartbeats());
       }
+      if (hasTrackScanMetrics()) {
+        hash = (37 * hash) + TRACK_SCAN_METRICS_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getTrackScanMetrics());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -17099,6 +17147,8 @@ public final class ClientProtos {
         bitField0_ = (bitField0_ & ~0x00000040);
         clientHandlesHeartbeats_ = false;
         bitField0_ = (bitField0_ & ~0x00000080);
+        trackScanMetrics_ = false;
+        bitField0_ = (bitField0_ & ~0x00000100);
         return this;
       }
 
@@ -17167,6 +17217,10 @@ public final class ClientProtos {
           to_bitField0_ |= 0x00000080;
         }
         result.clientHandlesHeartbeats_ = clientHandlesHeartbeats_;
+        if (((from_bitField0_ & 0x00000100) == 0x00000100)) {
+          to_bitField0_ |= 0x00000100;
+        }
+        result.trackScanMetrics_ = trackScanMetrics_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -17207,6 +17261,9 @@ public final class ClientProtos {
         if (other.hasClientHandlesHeartbeats()) {
           setClientHandlesHeartbeats(other.getClientHandlesHeartbeats());
         }
+        if (other.hasTrackScanMetrics()) {
+          setTrackScanMetrics(other.getTrackScanMetrics());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -17678,6 +17735,39 @@ public final class ClientProtos {
         return this;
       }
 
+      // optional bool track_scan_metrics = 9;
+      private boolean trackScanMetrics_ ;
+      /**
+       * <code>optional bool track_scan_metrics = 9;</code>
+       */
+      public boolean hasTrackScanMetrics() {
+        return ((bitField0_ & 0x00000100) == 0x00000100);
+      }
+      /**
+       * <code>optional bool track_scan_metrics = 9;</code>
+       */
+      public boolean getTrackScanMetrics() {
+        return trackScanMetrics_;
+      }
+      /**
+       * <code>optional bool track_scan_metrics = 9;</code>
+       */
+      public Builder setTrackScanMetrics(boolean value) {
+        bitField0_ |= 0x00000100;
+        trackScanMetrics_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool track_scan_metrics = 9;</code>
+       */
+      public Builder clearTrackScanMetrics() {
+        bitField0_ = (bitField0_ & ~0x00000100);
+        trackScanMetrics_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:ScanRequest)
     }
 
@@ -17920,6 +18010,38 @@ public final class ClientProtos {
      * </pre>
      */
     boolean getHeartbeatMessage();
+
+    // optional .ScanMetrics scan_metrics = 10;
+    /**
+     * <code>optional .ScanMetrics scan_metrics = 10;</code>
+     *
+     * <pre>
+     * This field is filled in if the client has requested that scan metrics be tracked.
+     * The metrics tracked here are sent back to the client to be tracked together with 
+     * the existing client side metrics.
+     * </pre>
+     */
+    boolean hasScanMetrics();
+    /**
+     * <code>optional .ScanMetrics scan_metrics = 10;</code>
+     *
+     * <pre>
+     * This field is filled in if the client has requested that scan metrics be tracked.
+     * The metrics tracked here are sent back to the client to be tracked together with 
+     * the existing client side metrics.
+     * </pre>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics getScanMetrics();
+    /**
+     * <code>optional .ScanMetrics scan_metrics = 10;</code>
+     *
+     * <pre>
+     * This field is filled in if the client has requested that scan metrics be tracked.
+     * The metrics tracked here are sent back to the client to be tracked together with 
+     * the existing client side metrics.
+     * </pre>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder getScanMetricsOrBuilder();
   }
   /**
    * Protobuf type {@code ScanResponse}
@@ -18058,6 +18180,19 @@ public final class ClientProtos {
               heartbeatMessage_ = input.readBool();
               break;
             }
+            case 82: {
+              org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder subBuilder = null;
+              if (((bitField0_ & 0x00000040) == 0x00000040)) {
+                subBuilder = scanMetrics_.toBuilder();
+              }
+              scanMetrics_ = input.readMessage(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.PARSER, extensionRegistry);
+              if (subBuilder != null) {
+                subBuilder.mergeFrom(scanMetrics_);
+                scanMetrics_ = subBuilder.buildPartial();
+              }
+              bitField0_ |= 0x00000040;
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -18401,6 +18536,46 @@ public final class ClientProtos {
       return heartbeatMessage_;
     }
 
+    // optional .ScanMetrics scan_metrics = 10;
+    public static final int SCAN_METRICS_FIELD_NUMBER = 10;
+    private org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics scanMetrics_;
+    /**
+     * <code>optional .ScanMetrics scan_metrics = 10;</code>
+     *
+     * <pre>
+     * This field is filled in if the client has requested that scan metrics be tracked.
+     * The metrics tracked here are sent back to the client to be tracked together with 
+     * the existing client side metrics.
+     * </pre>
+     */
+    public boolean hasScanMetrics() {
+      return ((bitField0_ & 0x00000040) == 0x00000040);
+    }
+    /**
+     * <code>optional .ScanMetrics scan_metrics = 10;</code>
+     *
+     * <pre>
+     * This field is filled in if the client has requested that scan metrics be tracked.
+     * The metrics tracked here are sent back to the client to be tracked together with 
+     * the existing client side metrics.
+     * </pre>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics getScanMetrics() {
+      return scanMetrics_;
+    }
+    /**
+     * <code>optional .ScanMetrics scan_metrics = 10;</code>
+     *
+     * <pre>
+     * This field is filled in if the client has requested that scan metrics be tracked.
+     * The metrics tracked here are sent back to the client to be tracked together with 
+     * the existing client side metrics.
+     * </pre>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder getScanMetricsOrBuilder() {
+      return scanMetrics_;
+    }
+
     private void initFields() {
       cellsPerResult_ = java.util.Collections.emptyList();
       scannerId_ = 0L;
@@ -18411,6 +18586,7 @@ public final class ClientProtos {
       partialFlagPerResult_ = java.util.Collections.emptyList();
       moreResultsInRegion_ = false;
       heartbeatMessage_ = false;
+      scanMetrics_ = org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -18451,6 +18627,9 @@ public final class ClientProtos {
       if (((bitField0_ & 0x00000020) == 0x00000020)) {
         output.writeBool(9, heartbeatMessage_);
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        output.writeMessage(10, scanMetrics_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -18503,6 +18682,10 @@ public final class ClientProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(9, heartbeatMessage_);
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(10, scanMetrics_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -18562,6 +18745,11 @@ public final class ClientProtos {
         result = result && (getHeartbeatMessage()
             == other.getHeartbeatMessage());
       }
+      result = result && (hasScanMetrics() == other.hasScanMetrics());
+      if (hasScanMetrics()) {
+        result = result && getScanMetrics()
+            .equals(other.getScanMetrics());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -18611,6 +18799,10 @@ public final class ClientProtos {
         hash = (37 * hash) + HEARTBEAT_MESSAGE_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getHeartbeatMessage());
       }
+      if (hasScanMetrics()) {
+        hash = (37 * hash) + SCAN_METRICS_FIELD_NUMBER;
+        hash = (53 * hash) + getScanMetrics().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -18719,6 +18911,7 @@ public final class ClientProtos {
       private void maybeForceBuilderInitialization() {
         if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
           getResultsFieldBuilder();
+          getScanMetricsFieldBuilder();
         }
       }
       private static Builder create() {
@@ -18749,6 +18942,12 @@ public final class ClientProtos {
         bitField0_ = (bitField0_ & ~0x00000080);
         heartbeatMessage_ = false;
         bitField0_ = (bitField0_ & ~0x00000100);
+        if (scanMetricsBuilder_ == null) {
+          scanMetrics_ = org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance();
+        } else {
+          scanMetricsBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000200);
         return this;
       }
 
@@ -18820,6 +19019,14 @@ public final class ClientProtos {
           to_bitField0_ |= 0x00000020;
         }
         result.heartbeatMessage_ = heartbeatMessage_;
+        if (((from_bitField0_ & 0x00000200) == 0x00000200)) {
+          to_bitField0_ |= 0x00000040;
+        }
+        if (scanMetricsBuilder_ == null) {
+          result.scanMetrics_ = scanMetrics_;
+        } else {
+          result.scanMetrics_ = scanMetricsBuilder_.build();
+        }
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -18900,6 +19107,9 @@ public final class ClientProtos {
         if (other.hasHeartbeatMessage()) {
           setHeartbeatMessage(other.getHeartbeatMessage());
         }
+        if (other.hasScanMetrics()) {
+          mergeScanMetrics(other.getScanMetrics());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -19797,6 +20007,177 @@ public final class ClientProtos {
         return this;
       }
 
+      // optional .ScanMetrics scan_metrics = 10;
+      private org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics scanMetrics_ = org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance();
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder> scanMetricsBuilder_;
+      /**
+       * <code>optional .ScanMetrics scan_metrics = 10;</code>
+       *
+       * <pre>
+       * This field is filled in if the client has requested that scan metrics be tracked.
+       * The metrics tracked here are sent back to the client to be tracked together with 
+       * the existing client side metrics.
+       * </pre>
+       */
+      public boolean hasScanMetrics() {
+        return ((bitField0_ & 0x00000200) == 0x00000200);
+      }
+      /**
+       * <code>optional .ScanMetrics scan_metrics = 10;</code>
+       *
+       * <pre>
+       * This field is filled in if the client has requested that scan metrics be tracked.
+       * The metrics tracked here are sent back to the client to be tracked together with 
+       * the existing client side metrics.
+       * </pre>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics getScanMetrics() {
+        if (scanMetricsBuilder_ == null) {
+          return scanMetrics_;
+        } else {
+          return scanMetricsBuilder_.getMessage();
+        }
+      }
+      /**
+       * <code>optional .ScanMetrics scan_metrics = 10;</code>
+       *
+       * <pre>
+       * This field is filled in if the client has requested that scan metrics be tracked.
+       * The metrics tracked here are sent back to the client to be tracked together with 
+       * the existing client side metrics.
+       * </pre>
+       */
+      public Builder setScanMetrics(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics value) {
+        if (scanMetricsBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          scanMetrics_ = value;
+          onChanged();
+        } else {
+          scanMetricsBuilder_.setMessage(value);
+        }
+        bitField0_ |= 0x00000200;
+        return this;
+      }
+      /**
+       * <code>optional .ScanMetrics scan_metrics = 10;</code>
+       *
+       * <pre>
+       * This field is filled in if the client has requested that scan metrics be tracked.
+       * The metrics tracked here are sent back to the client to be tracked together with 
+       * the existing client side metrics.
+       * </pre>
+       */
+      public Builder setScanMetrics(
+          org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder builderForValue) {
+        if (scanMetricsBuilder_ == null) {
+          scanMetrics_ = builderForValue.build();
+          onChanged();
+        } else {
+          scanMetricsBuilder_.setMessage(builderForValue.build());
+        }
+        bitField0_ |= 0x00000200;
+        return this;
+      }
+      /**
+       * <code>optional .ScanMetrics scan_metrics = 10;</code>
+       *
+       * <pre>
+       * This field is filled in if the client has requested that scan metrics be tracked.
+       * The metrics tracked here are sent back to the client to be tracked together with 
+       * the existing client side metrics.
+       * </pre>
+       */
+      public Builder mergeScanMetrics(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics value) {
+        if (scanMetricsBuilder_ == null) {
+          if (((bitField0_ & 0x00000200) == 0x00000200) &&
+              scanMetrics_ != org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance()) {
+            scanMetrics_ =
+              org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.newBuilder(scanMetrics_).mergeFrom(value).buildPartial();
+          } else {
+            scanMetrics_ = value;
+          }
+          onChanged();
+        } else {
+          scanMetricsBuilder_.mergeFrom(value);
+        }
+        bitField0_ |= 0x00000200;
+        return this;
+      }
+      /**
+       * <code>optional .ScanMetrics scan_metrics = 10;</code>
+       *
+       * <pre>
+       * This field is filled in if the client has requested that scan metrics be tracked.
+       * The metrics tracked here are sent back to the client to be tracked together with 
+       * the existing client side metrics.
+       * </pre>
+       */
+      public Builder clearScanMetrics() {
+        if (scanMetricsBuilder_ == null) {
+          scanMetrics_ = org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance();
+          onChanged();
+        } else {
+          scanMetricsBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000200);
+        return this;
+      }
+      /**
+       * <code>optional .ScanMetrics scan_metrics = 10;</code>
+       *
+       * <pre>
+       * This field is filled in if the client has requested that scan metrics be tracked.
+       * The metrics tracked here are sent back to the client to be tracked together with 
+       * the existing client side metrics.
+       * </pre>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder getScanMetricsBuilder() {
+        bitField0_ |= 0x00000200;
+        onChanged();
+        return getScanMetricsFieldBuilder().getBuilder();
+      }
+      /**
+       * <code>optional .ScanMetrics scan_metrics = 10;</code>
+       *
+       * <pre>
+       * This field is filled in if the client has requested that scan metrics be tracked.
+       * The metrics tracked here are sent back to the client to be tracked together with 
+       * the existing client side metrics.
+       * </pre>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder getScanMetricsOrBuilder() {
+        if (scanMetricsBuilder_ != null) {
+          return scanMetricsBuilder_.getMessageOrBuilder();
+        } else {
+          return scanMetrics_;
+        }
+      }
+      /**
+       * <code>optional .ScanMetrics scan_metrics = 10;</code>
+       *
+       * <pre>
+       * This field is filled in if the client has requested that scan metrics be tracked.
+       * The metrics tracked here are sent back to the client to be tracked together with 
+       * the existing client side metrics.
+       * </pre>
+       */
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder> 
+          getScanMetricsFieldBuilder() {
+        if (scanMetricsBuilder_ == null) {
+          scanMetricsBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+              org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder>(
+                  scanMetrics_,
+                  getParentForChildren(),
+                  isClean());
+          scanMetrics_ = null;
+        }
+        return scanMetricsBuilder_;
+      }
+
       // @@protoc_insertion_point(builder_scope:ScanResponse)
     }
 
@@ -32868,122 +33249,124 @@ public final class ClientProtos {
   static {
     java.lang.String[] descriptorData = {
       "\n\014Client.proto\032\013HBase.proto\032\014Filter.prot" +
-      "o\032\nCell.proto\032\020Comparator.proto\"\037\n\016Autho" +
-      "rizations\022\r\n\005label\030\001 \003(\t\"$\n\016CellVisibili" +
-      "ty\022\022\n\nexpression\030\001 \002(\t\"+\n\006Column\022\016\n\006fami" +
-      "ly\030\001 \002(\014\022\021\n\tqualifier\030\002 \003(\014\"\324\002\n\003Get\022\013\n\003r" +
-      "ow\030\001 \002(\014\022\027\n\006column\030\002 \003(\0132\007.Column\022!\n\tatt" +
-      "ribute\030\003 \003(\0132\016.NameBytesPair\022\027\n\006filter\030\004" +
-      " \001(\0132\007.Filter\022\036\n\ntime_range\030\005 \001(\0132\n.Time" +
-      "Range\022\027\n\014max_versions\030\006 \001(\r:\0011\022\032\n\014cache_" +
-      "blocks\030\007 \001(\010:\004true\022\023\n\013store_limit\030\010 \001(\r\022",
-      "\024\n\014store_offset\030\t \001(\r\022\035\n\016existence_only\030" +
-      "\n \001(\010:\005false\022!\n\022closest_row_before\030\013 \001(\010" +
-      ":\005false\022)\n\013consistency\030\014 \001(\0162\014.Consisten" +
-      "cy:\006STRONG\"z\n\006Result\022\023\n\004cell\030\001 \003(\0132\005.Cel" +
-      "l\022\035\n\025associated_cell_count\030\002 \001(\005\022\016\n\006exis" +
-      "ts\030\003 \001(\010\022\024\n\005stale\030\004 \001(\010:\005false\022\026\n\007partia" +
-      "l\030\005 \001(\010:\005false\"A\n\nGetRequest\022 \n\006region\030\001" +
-      " \002(\0132\020.RegionSpecifier\022\021\n\003get\030\002 \002(\0132\004.Ge" +
-      "t\"&\n\013GetResponse\022\027\n\006result\030\001 \001(\0132\007.Resul" +
-      "t\"\200\001\n\tCondition\022\013\n\003row\030\001 \002(\014\022\016\n\006family\030\002",
-      " \002(\014\022\021\n\tqualifier\030\003 \002(\014\022\"\n\014compare_type\030" +
-      "\004 \002(\0162\014.CompareType\022\037\n\ncomparator\030\005 \002(\0132" +
-      "\013.Comparator\"\265\006\n\rMutationProto\022\013\n\003row\030\001 " +
-      "\001(\014\0220\n\013mutate_type\030\002 \001(\0162\033.MutationProto" +
-      ".MutationType\0220\n\014column_value\030\003 \003(\0132\032.Mu" +
-      "tationProto.ColumnValue\022\021\n\ttimestamp\030\004 \001" +
-      "(\004\022!\n\tattribute\030\005 \003(\0132\016.NameBytesPair\022:\n" +
-      "\ndurability\030\006 \001(\0162\031.MutationProto.Durabi" +
-      "lity:\013USE_DEFAULT\022\036\n\ntime_range\030\007 \001(\0132\n." +
-      "TimeRange\022\035\n\025associated_cell_count\030\010 \001(\005",
-      "\022\r\n\005nonce\030\t \001(\004\032\347\001\n\013ColumnValue\022\016\n\006famil" +
-      "y\030\001 \002(\014\022B\n\017qualifier_value\030\002 \003(\0132).Mutat" +
-      "ionProto.ColumnValue.QualifierValue\032\203\001\n\016" +
-      "QualifierValue\022\021\n\tqualifier\030\001 \001(\014\022\r\n\005val" +
-      "ue\030\002 \001(\014\022\021\n\ttimestamp\030\003 \001(\004\022.\n\013delete_ty" +
-      "pe\030\004 \001(\0162\031.MutationProto.DeleteType\022\014\n\004t" +
-      "ags\030\005 \001(\014\"W\n\nDurability\022\017\n\013USE_DEFAULT\020\000" +
-      "\022\014\n\010SKIP_WAL\020\001\022\r\n\tASYNC_WAL\020\002\022\014\n\010SYNC_WA" +
-      "L\020\003\022\r\n\tFSYNC_WAL\020\004\">\n\014MutationType\022\n\n\006AP" +
-      "PEND\020\000\022\r\n\tINCREMENT\020\001\022\007\n\003PUT\020\002\022\n\n\006DELETE",
-      "\020\003\"p\n\nDeleteType\022\026\n\022DELETE_ONE_VERSION\020\000" +
-      "\022\034\n\030DELETE_MULTIPLE_VERSIONS\020\001\022\021\n\rDELETE" +
-      "_FAMILY\020\002\022\031\n\025DELETE_FAMILY_VERSION\020\003\"\207\001\n" +
-      "\rMutateRequest\022 \n\006region\030\001 \002(\0132\020.RegionS" +
-      "pecifier\022 \n\010mutation\030\002 \002(\0132\016.MutationPro" +
-      "to\022\035\n\tcondition\030\003 \001(\0132\n.Condition\022\023\n\013non" +
-      "ce_group\030\004 \001(\004\"<\n\016MutateResponse\022\027\n\006resu" +
-      "lt\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\271\003\n" +
-      "\004Scan\022\027\n\006column\030\001 \003(\0132\007.Column\022!\n\tattrib" +
-      "ute\030\002 \003(\0132\016.NameBytesPair\022\021\n\tstart_row\030\003",
-      " \001(\014\022\020\n\010stop_row\030\004 \001(\014\022\027\n\006filter\030\005 \001(\0132\007" +
-      ".Filter\022\036\n\ntime_range\030\006 \001(\0132\n.TimeRange\022" +
-      "\027\n\014max_versions\030\007 \001(\r:\0011\022\032\n\014cache_blocks" +
-      "\030\010 \001(\010:\004true\022\022\n\nbatch_size\030\t \001(\r\022\027\n\017max_" +
-      "result_size\030\n \001(\004\022\023\n\013store_limit\030\013 \001(\r\022\024" +
-      "\n\014store_offset\030\014 \001(\r\022&\n\036load_column_fami" +
-      "lies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027\n\010r" +
-      "eversed\030\017 \001(\010:\005false\022)\n\013consistency\030\020 \001(" +
-      "\0162\014.Consistency:\006STRONG\022\017\n\007caching\030\021 \001(\r" +
-      "\"\342\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regio",
-      "nSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\022\n\nscann" +
-      "er_id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rc" +
-      "lose_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(" +
-      "\004\022\037\n\027client_handles_partials\030\007 \001(\010\022!\n\031cl" +
-      "ient_handles_heartbeats\030\010 \001(\010\"\344\001\n\014ScanRe" +
-      "sponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n\nscan" +
-      "ner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003tt" +
-      "l\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r\n\005sta" +
-      "le\030\006 \001(\010\022\037\n\027partial_flag_per_result\030\007 \003(" +
-      "\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031\n\021hea",
-      "rtbeat_message\030\t \001(\010\"\263\001\n\024BulkLoadHFileRe" +
+      "o\032\nCell.proto\032\020Comparator.proto\032\017MapRedu" +
+      "ce.proto\"\037\n\016Authorizations\022\r\n\005label\030\001 \003(" +
+      "\t\"$\n\016CellVisibility\022\022\n\nexpression\030\001 \002(\t\"" +
+      "+\n\006Column\022\016\n\006family\030\001 \002(\014\022\021\n\tqualifier\030\002" +
+      " \003(\014\"\324\002\n\003Get\022\013\n\003row\030\001 \002(\014\022\027\n\006column\030\002 \003(" +
+      "\0132\007.Column\022!\n\tattribute\030\003 \003(\0132\016.NameByte" +
+      "sPair\022\027\n\006filter\030\004 \001(\0132\007.Filter\022\036\n\ntime_r" +
+      "ange\030\005 \001(\0132\n.TimeRange\022\027\n\014max_versions\030\006" +
+      " \001(\r:\0011\022\032\n\014cache_blocks\030\007 \001(\010:\004true\022\023\n\013s",
+      "tore_limit\030\010 \001(\r\022\024\n\014store_offset\030\t \001(\r\022\035" +
+      "\n\016existence_only\030\n \001(\010:\005false\022!\n\022closest" +
+      "_row_before\030\013 \001(\010:\005false\022)\n\013consistency\030" +
+      "\014 \001(\0162\014.Consistency:\006STRONG\"z\n\006Result\022\023\n" +
+      "\004cell\030\001 \003(\0132\005.Cell\022\035\n\025associated_cell_co" +
+      "unt\030\002 \001(\005\022\016\n\006exists\030\003 \001(\010\022\024\n\005stale\030\004 \001(\010" +
+      ":\005false\022\026\n\007partial\030\005 \001(\010:\005false\"A\n\nGetRe" +
       "quest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022" +
-      "5\n\013family_path\030\002 \003(\0132 .BulkLoadHFileRequ" +
-      "est.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032*" +
-      "\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002" +
-      "(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loaded\030\001 " +
-      "\002(\010\"a\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002" +
-      "(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_name\030" +
-      "\003 \002(\t\022\017\n\007request\030\004 \002(\014\"9\n\030CoprocessorSer" +
-      "viceResult\022\035\n\005value\030\001 \001(\0132\016.NameBytesPai",
-      "r\"d\n\031CoprocessorServiceRequest\022 \n\006region" +
-      "\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132\027" +
-      ".CoprocessorServiceCall\"]\n\032CoprocessorSe" +
-      "rviceResponse\022 \n\006region\030\001 \002(\0132\020.RegionSp" +
-      "ecifier\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"{" +
-      "\n\006Action\022\r\n\005index\030\001 \001(\r\022 \n\010mutation\030\002 \001(" +
-      "\0132\016.MutationProto\022\021\n\003get\030\003 \001(\0132\004.Get\022-\n\014" +
-      "service_call\030\004 \001(\0132\027.CoprocessorServiceC" +
-      "all\"Y\n\014RegionAction\022 \n\006region\030\001 \002(\0132\020.Re" +
-      "gionSpecifier\022\016\n\006atomic\030\002 \001(\010\022\027\n\006action\030",
-      "\003 \003(\0132\007.Action\"D\n\017RegionLoadStats\022\027\n\014mem" +
-      "storeLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(" +
-      "\005:\0010\"\266\001\n\021ResultOrException\022\r\n\005index\030\001 \001(" +
-      "\r\022\027\n\006result\030\002 \001(\0132\007.Result\022!\n\texception\030" +
-      "\003 \001(\0132\016.NameBytesPair\0221\n\016service_result\030" +
-      "\004 \001(\0132\031.CoprocessorServiceResult\022#\n\tload" +
-      "Stats\030\005 \001(\0132\020.RegionLoadStats\"f\n\022RegionA" +
-      "ctionResult\022-\n\021resultOrException\030\001 \003(\0132\022" +
-      ".ResultOrException\022!\n\texception\030\002 \001(\0132\016." +
-      "NameBytesPair\"f\n\014MultiRequest\022#\n\014regionA",
-      "ction\030\001 \003(\0132\r.RegionAction\022\022\n\nnonceGroup" +
-      "\030\002 \001(\004\022\035\n\tcondition\030\003 \001(\0132\n.Condition\"S\n" +
-      "\rMultiResponse\022/\n\022regionActionResult\030\001 \003" +
-      "(\0132\023.RegionActionResult\022\021\n\tprocessed\030\002 \001" +
-      "(\010*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELIN" +
-      "E\020\0012\205\003\n\rClientService\022 \n\003Get\022\013.GetReques" +
-      "t\032\014.GetResponse\022)\n\006Mutate\022\016.MutateReques" +
-      "t\032\017.MutateResponse\022#\n\004Scan\022\014.ScanRequest" +
-      "\032\r.ScanResponse\022>\n\rBulkLoadHFile\022\025.BulkL" +
-      "oadHFileRequest\032\026.BulkLoadHFileResponse\022",
-      "F\n\013ExecService\022\032.CoprocessorServiceReque" +
-      "st\032\033.CoprocessorServiceResponse\022R\n\027ExecR" +
-      "egionServerService\022\032.CoprocessorServiceR" +
-      "equest\032\033.CoprocessorServiceResponse\022&\n\005M" +
-      "ulti\022\r.MultiRequest\032\016.MultiResponseBB\n*o" +
-      "rg.apache.hadoop.hbase.protobuf.generate" +
-      "dB\014ClientProtosH\001\210\001\001\240\001\001"
+      "\021\n\003get\030\002 \002(\0132\004.Get\"&\n\013GetResponse\022\027\n\006res" +
+      "ult\030\001 \001(\0132\007.Result\"\200\001\n\tCondition\022\013\n\003row\030",
+      "\001 \002(\014\022\016\n\006family\030\002 \002(\014\022\021\n\tqualifier\030\003 \002(\014" +
+      "\022\"\n\014compare_type\030\004 \002(\0162\014.CompareType\022\037\n\n" +
+      "comparator\030\005 \002(\0132\013.Comparator\"\265\006\n\rMutati" +
+      "onProto\022\013\n\003row\030\001 \001(\014\0220\n\013mutate_type\030\002 \001(" +
+      "\0162\033.MutationProto.MutationType\0220\n\014column" +
+      "_value\030\003 \003(\0132\032.MutationProto.ColumnValue" +
+      "\022\021\n\ttimestamp\030\004 \001(\004\022!\n\tattribute\030\005 \003(\0132\016" +
+      ".NameBytesPair\022:\n\ndurability\030\006 \001(\0162\031.Mut" +
+      "ationProto.Durability:\013USE_DEFAULT\022\036\n\nti" +
+      "me_range\030\007 \001(\0132\n.TimeRange\022\035\n\025associated",
+      "_cell_count\030\010 \001(\005\022\r\n\005nonce\030\t \001(\004\032\347\001\n\013Col" +
+      "umnValue\022\016\n\006family\030\001 \002(\014\022B\n\017qualifier_va" +
+      "lue\030\002 \003(\0132).MutationProto.ColumnValue.Qu" +
+      "alifierValue\032\203\001\n\016QualifierValue\022\021\n\tquali" +
+      "fier\030\001 \001(\014\022\r\n\005value\030\002 \001(\014\022\021\n\ttimestamp\030\003" +
+      " \001(\004\022.\n\013delete_type\030\004 \001(\0162\031.MutationProt" +
+      "o.DeleteType\022\014\n\004tags\030\005 \001(\014\"W\n\nDurability" +
+      "\022\017\n\013USE_DEFAULT\020\000\022\014\n\010SKIP_WAL\020\001\022\r\n\tASYNC" +
+      "_WAL\020\002\022\014\n\010SYNC_WAL\020\003\022\r\n\tFSYNC_WAL\020\004\">\n\014M" +
+      "utationType\022\n\n\006APPEND\020\000\022\r\n\tINCREMENT\020\001\022\007",
+      "\n\003PUT\020\002\022\n\n\006DELETE\020\003\"p\n\nDeleteType\022\026\n\022DEL" +
+      "ETE_ONE_VERSION\020\000\022\034\n\030DELETE_MULTIPLE_VER" +
+      "SIONS\020\001\022\021\n\rDELETE_FAMILY\020\002\022\031\n\025DELETE_FAM" +
+      "ILY_VERSION\020\003\"\207\001\n\rMutateRequest\022 \n\006regio" +
+      "n\030\001 \002(\0132\020.RegionSpecifier\022 \n\010mutation\030\002 " +
+      "\002(\0132\016.MutationProto\022\035\n\tcondition\030\003 \001(\0132\n" +
+      ".Condition\022\023\n\013nonce_group\030\004 \001(\004\"<\n\016Mutat" +
+      "eResponse\022\027\n\006result\030\001 \001(\0132\007.Result\022\021\n\tpr" +
+      "ocessed\030\002 \001(\010\"\271\003\n\004Scan\022\027\n\006column\030\001 \003(\0132\007" +
+      ".Column\022!\n\tattribute\030\002 \003(\0132\016.NameBytesPa",
+      "ir\022\021\n\tstart_row\030\003 \001(\014\022\020\n\010stop_row\030\004 \001(\014\022" +
+      "\027\n\006filter\030\005 \001(\0132\007.Filter\022\036\n\ntime_range\030\006" +
+      " \001(\0132\n.TimeRange\022\027\n\014max_versions\030\007 \001(\r:\001" +
+      "1\022\032\n\014cache_blocks\030\010 \001(\010:\004true\022\022\n\nbatch_s" +
+      "ize\030\t \001(\r\022\027\n\017max_result_size\030\n \001(\004\022\023\n\013st" +
+      "ore_limit\030\013 \001(\r\022\024\n\014store_offset\030\014 \001(\r\022&\n" +
+      "\036load_column_families_on_demand\030\r \001(\010\022\r\n" +
+      "\005small\030\016 \001(\010\022\027\n\010reversed\030\017 \001(\010:\005false\022)\n" +
+      "\013consistency\030\020 \001(\0162\014.Consistency:\006STRONG" +
+      "\022\017\n\007caching\030\021 \001(\r\"\376\001\n\013ScanRequest\022 \n\006reg",
+      "ion\030\001 \001(\0132\020.RegionSpecifier\022\023\n\004scan\030\002 \001(" +
+      "\0132\005.Scan\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016number_o" +
+      "f_rows\030\004 \001(\r\022\025\n\rclose_scanner\030\005 \001(\010\022\025\n\rn" +
+      "ext_call_seq\030\006 \001(\004\022\037\n\027client_handles_par" +
+      "tials\030\007 \001(\010\022!\n\031client_handles_heartbeats" +
+      "\030\010 \001(\010\022\032\n\022track_scan_metrics\030\t \001(\010\"\210\002\n\014S" +
+      "canResponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n" +
+      "\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022" +
+      "\013\n\003ttl\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r" +
+      "\n\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_result",
+      "\030\007 \003(\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031" +
+      "\n\021heartbeat_message\030\t \001(\010\022\"\n\014scan_metric" +
+      "s\030\n \001(\0132\014.ScanMetrics\"\263\001\n\024BulkLoadHFileR" +
+      "equest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier" +
+      "\0225\n\013family_path\030\002 \003(\0132 .BulkLoadHFileReq" +
+      "uest.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032" +
+      "*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 " +
+      "\002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loaded\030\001" +
+      " \002(\010\"a\n\026CoprocessorServiceCall\022\013\n\003row\030\001 " +
+      "\002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_name",
+      "\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"9\n\030CoprocessorSe" +
+      "rviceResult\022\035\n\005value\030\001 \001(\0132\016.NameBytesPa" +
+      "ir\"d\n\031CoprocessorServiceRequest\022 \n\006regio" +
+      "n\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132" +
+      "\027.CoprocessorServiceCall\"]\n\032CoprocessorS" +
+      "erviceResponse\022 \n\006region\030\001 \002(\0132\020.RegionS" +
+      "pecifier\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"" +
+      "{\n\006Action\022\r\n\005index\030\001 \001(\r\022 \n\010mutation\030\002 \001" +
+      "(\0132\016.MutationProto\022\021\n\003get\030\003 \001(\0132\004.Get\022-\n" +
+      "\014service_call\030\004 \001(\0132\027.CoprocessorService",
+      "Call\"Y\n\014RegionAction\022 \n\006region\030\001 \002(\0132\020.R" +
+      "egionSpecifier\022\016\n\006atomic\030\002 \001(\010\022\027\n\006action" +
+      "\030\003 \003(\0132\007.Action\"D\n\017RegionLoadStats\022\027\n\014me" +
+      "mstoreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001" +
+      "(\005:\0010\"\266\001\n\021ResultOrException\022\r\n\005index\030\001 \001" +
+      "(\r\022\027\n\006result\030\002 \001(\0132\007.Result\022!\n\texception" +
+      "\030\003 \001(\0132\016.NameBytesPair\0221\n\016service_result" +
+      "\030\004 \001(\0132\031.CoprocessorServiceResult\022#\n\tloa" +
+      "dStats\030\005 \001(\0132\020.RegionLoadStats\"f\n\022Region" +
+      "ActionResult\022-\n\021resultOrException\030\001 \003(\0132",
+      "\022.ResultOrException\022!\n\texception\030\002 \001(\0132\016" +
+      ".NameBytesPair\"f\n\014MultiRequest\022#\n\014region" +
+      "Action\030\001 \003(\0132\r.RegionAction\022\022\n\nnonceGrou" +
+      "p\030\002 \001(\004\022\035\n\tcondition\030\003 \001(\0132\n.Condition\"S" +
+      "\n\rMultiResponse\022/\n\022regionActionResult\030\001 " +
+      "\003(\0132\023.RegionActionResult\022\021\n\tprocessed\030\002 " +
+      "\001(\010*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELI" +
+      "NE\020\0012\205\003\n\rClientService\022 \n\003Get\022\013.GetReque" +
+      "st\032\014.GetResponse\022)\n\006Mutate\022\016.MutateReque" +
+      "st\032\017.MutateResponse\022#\n\004Scan\022\014.ScanReques",
+      "t\032\r.ScanResponse\022>\n\rBulkLoadHFile\022\025.Bulk" +
+      "LoadHFileRequest\032\026.BulkLoadHFileResponse" +
+      "\022F\n\013ExecService\022\032.CoprocessorServiceRequ" +
+      "est\032\033.CoprocessorServiceResponse\022R\n\027Exec" +
+      "RegionServerService\022\032.CoprocessorService" +
+      "Request\032\033.CoprocessorServiceResponse\022&\n\005" +
+      "Multi\022\r.MultiRequest\032\016.MultiResponseBB\n*" +
+      "org.apache.hadoop.hbase.protobuf.generat" +
+      "edB\014ClientProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -33079,13 +33462,13 @@ public final class ClientProtos {
           internal_static_ScanRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_ScanRequest_descriptor,
-              new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", });
+              new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", });
           internal_static_ScanResponse_descriptor =
             getDescriptor().getMessageTypes().get(13);
           internal_static_ScanResponse_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_ScanResponse_descriptor,
-              new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", "HeartbeatMessage", });
+              new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", "HeartbeatMessage", "ScanMetrics", });
           internal_static_BulkLoadHFileRequest_descriptor =
             getDescriptor().getMessageTypes().get(14);
           internal_static_BulkLoadHFileRequest_fieldAccessorTable = new
@@ -33180,6 +33563,7 @@ public final class ClientProtos {
           org.apache.hadoop.hbase.protobuf.generated.FilterProtos.getDescriptor(),
           org.apache.hadoop.hbase.protobuf.generated.CellProtos.getDescriptor(),
           org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos.getDescriptor(),
+          org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.getDescriptor(),
         }, assigner);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/dc72dad7/hbase-protocol/src/main/protobuf/Client.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto
index 3a48cc8..c857c63 100644
--- a/hbase-protocol/src/main/protobuf/Client.proto
+++ b/hbase-protocol/src/main/protobuf/Client.proto
@@ -28,6 +28,7 @@ import "HBase.proto";
 import "Filter.proto";
 import "Cell.proto";
 import "Comparator.proto";
+import "MapReduce.proto";
 
 /**
  * The protocol buffer version of Authorizations.
@@ -276,6 +277,7 @@ message ScanRequest {
   optional uint64 next_call_seq = 6;
   optional bool client_handles_partials = 7;
   optional bool client_handles_heartbeats = 8;
+  optional bool track_scan_metrics = 9;
 }
 
 /**
@@ -314,12 +316,17 @@ message ScanResponse {
   // reasons such as the size in bytes or quantity of results accumulated. This field
   // will true when more results exist in the current region.
   optional bool more_results_in_region = 8;
-
+  
   // This field is filled in if the server is sending back a heartbeat message.
   // Heartbeat messages are sent back to the client to prevent the scanner from
   // timing out. Seeing a heartbeat message communicates to the Client that the
   // server would have continued to scan had the time limit not been reached.
   optional bool heartbeat_message = 9;
+  
+  // This field is filled in if the client has requested that scan metrics be tracked.
+  // The metrics tracked here are sent back to the client to be tracked together with 
+  // the existing client side metrics.
+  optional ScanMetrics scan_metrics = 10;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/dc72dad7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 3994027..827f1d6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5417,6 +5417,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
           nextKv = heap.peek();
           moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
+          if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext);
 
           if (scannerContext.checkBatchLimit(limitScope)) {
             return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
@@ -5490,8 +5491,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         // progress should be kept.
         if (scannerContext.getKeepProgress()) {
           // Progress should be kept. Reset to initial values seen at start of method invocation.
-          scannerContext
-              .setProgress(initialBatchProgress, initialSizeProgress, initialTimeProgress);
+          scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
+            initialTimeProgress);
         } else {
           scannerContext.clearProgress();
         }
@@ -5556,7 +5557,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           // Check if rowkey filter wants to exclude this row. If so, loop to next.
           // Technically, if we hit limits before on this row, we don't need this call.
           if (filterRowKey(currentRow, offset, length)) {
-            boolean moreRows = nextRow(currentRow, offset, length);
+            incrementCountOfRowsFilteredMetric(scannerContext);
+            // Typically the count of rows scanned is incremented inside #populateResult. However,
+            // here we are filtering a row based purely on its row key, preventing us from calling
+            // #populateResult. Thus, perform the necessary increment here to rows scanned metric
+            incrementCountOfRowsScannedMetric(scannerContext);
+            boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
             if (!moreRows) {
               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
             }
@@ -5605,9 +5611,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             }
           }
 
-          if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {
+          if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
+            incrementCountOfRowsFilteredMetric(scannerContext);
             results.clear();
-            boolean moreRows = nextRow(currentRow, offset, length);
+            boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
             if (!moreRows) {
               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
             }
@@ -5650,14 +5657,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         // Double check to prevent empty rows from appearing in result. It could be
         // the case when SingleColumnValueExcludeFilter is used.
         if (results.isEmpty()) {
-          boolean moreRows = nextRow(currentRow, offset, length);
+          incrementCountOfRowsFilteredMetric(scannerContext);
+          boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
           if (!moreRows) {
             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
           }
           if (!stopRow) continue;
         }
 
-        // We are done. Return the result.
         if (stopRow) {
           return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
         } else {
@@ -5666,6 +5673,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
     }
 
+    protected void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
+      if (scannerContext == null || !scannerContext.isTrackingMetrics()) return;
+
+      scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();
+    }
+
+    protected void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) {
+      if (scannerContext == null || !scannerContext.isTrackingMetrics()) return;
+
+      scannerContext.getMetrics().countOfRowsScanned.incrementAndGet();
+    }
+
     /**
      * @param currentRow
      * @param offset
@@ -5712,7 +5731,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           && filter.filterRowKey(row, offset, length);
     }
 
-    protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
+    protected boolean nextRow(ScannerContext scannerContext, byte[] currentRow, int offset,
+        short length) throws IOException {
       assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
       Cell next;
       while ((next = this.storeHeap.peek()) != null &&
@@ -5720,6 +5740,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         this.storeHeap.next(MOCKED_LIST);
       }
       resetFilters();
+
       // Calling the hook in CP which allows it to do a fast forward
       return this.region.getCoprocessorHost() == null
           || this.region.getCoprocessorHost()

http://git-wip-us.apache.org/repos/asf/hbase/blob/dc72dad7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
index 66ed6c0..3e0d7e8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 public class NoLimitScannerContext extends ScannerContext {
 
   public NoLimitScannerContext() {
-    super(false, null);
+    super(false, null, false);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/dc72dad7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 6491a5c..07a7e83 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeSet;
@@ -141,9 +142,11 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResul
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -2334,12 +2337,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
                 final LimitScope timeScope =
                     allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
 
+                boolean trackMetrics =
+                    request.hasTrackScanMetrics() && request.getTrackScanMetrics();
+
                 // Configure with limits for this RPC. Set keep progress true since size progress
                 // towards size limit should be kept between calls to nextRaw
                 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
                 contextBuilder.setSizeLimit(sizeScope, maxResultSize);
                 contextBuilder.setBatchLimit(scanner.getBatch());
                 contextBuilder.setTimeLimit(timeScope, timeLimit);
+                contextBuilder.setTrackMetrics(trackMetrics);
                 ScannerContext scannerContext = contextBuilder.build();
 
                 boolean limitReached = false;
@@ -2393,6 +2400,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
                   // We didn't get a single batch
                   builder.setMoreResultsInRegion(false);
                 }
+
+                // Check to see if the client requested that we track metrics server side. If the
+                // client requested metrics, retrieve the metrics from the scanner context.
+                if (trackMetrics) {
+                  Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
+                  ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
+                  NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
+
+                  for (Entry<String, Long> entry : metrics.entrySet()) {
+                    pairBuilder.setName(entry.getKey());
+                    pairBuilder.setValue(entry.getValue());
+                    metricBuilder.addMetrics(pairBuilder.build());
+                  }
+
+                  builder.setScanMetrics(metricBuilder.build());
+                }
               }
               region.updateReadRequestsCount(i);
               region.getMetrics().updateScanNext(totalCellSize);

http://git-wip-us.apache.org/repos/asf/hbase/blob/dc72dad7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java
index 81c14e5..3b2a8bf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java
@@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
 
@@ -63,13 +63,14 @@ class ReversedRegionScannerImpl extends RegionScannerImpl {
   }
 
   @Override
-  protected boolean nextRow(byte[] currentRow, int offset, short length)
-      throws IOException {
+  protected boolean nextRow(ScannerContext scannerContext, byte[] currentRow, int offset,
+      short length) throws IOException {
     assert super.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
     byte row[] = new byte[length];
     System.arraycopy(currentRow, offset, row, 0, length);
     this.storeHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(row));
     resetFilters();
+
     // Calling the hook in CP which allows it to do a fast forward
     if (this.region.getCoprocessorHost() != null) {
       return this.region.getCoprocessorHost().postScannerFilterRow(this,

http://git-wip-us.apache.org/repos/asf/hbase/blob/dc72dad7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
index 8dfd0f4..a927789 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
 
 /**
  * ScannerContext instances encapsulate limit tracking AND progress towards those limits during
@@ -96,7 +97,12 @@ public class ScannerContext {
   boolean keepProgress;
   private static boolean DEFAULT_KEEP_PROGRESS = false;
 
-  ScannerContext(boolean keepProgress, LimitFields limitsToCopy) {
+  /**
+   * Tracks the relevant server side metrics during scans. null when metrics should not be tracked
+   */
+  final ServerSideScanMetrics metrics;
+
+  ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean trackMetrics) {
     this.limits = new LimitFields();
     if (limitsToCopy != null) this.limits.copy(limitsToCopy);
 
@@ -105,6 +111,21 @@ public class ScannerContext {
 
     this.keepProgress = keepProgress;
     this.scannerState = DEFAULT_STATE;
+    this.metrics = trackMetrics ? new ServerSideScanMetrics() : null;
+  }
+
+  boolean isTrackingMetrics() {
+    return this.metrics != null;
+  }
+
+  /**
+   * Get the metrics instance. Should only be called after a call to {@link #isTrackingMetrics()}
+   * has been made to confirm that metrics are indeed being tracked.
+   * @return {@link ServerSideScanMetrics} instance that is tracking metrics for this scan
+   */
+  ServerSideScanMetrics getMetrics() {
+    assert isTrackingMetrics();
+    return this.metrics;
   }
 
   /**
@@ -331,6 +352,7 @@ public class ScannerContext {
 
   public static final class Builder {
     boolean keepProgress = DEFAULT_KEEP_PROGRESS;
+    boolean trackMetrics = false;
     LimitFields limits = new LimitFields();
 
     private Builder() {
@@ -345,6 +367,11 @@ public class ScannerContext {
       return this;
     }
 
+    public Builder setTrackMetrics(boolean trackMetrics) {
+      this.trackMetrics = trackMetrics;
+      return this;
+    }
+
     public Builder setSizeLimit(LimitScope sizeScope, long sizeLimit) {
       limits.setSize(sizeLimit);
       limits.setSizeScope(sizeScope);
@@ -363,7 +390,7 @@ public class ScannerContext {
     }
 
     public ScannerContext build() {
-      return new ScannerContext(keepProgress, limits);
+      return new ScannerContext(keepProgress, limits, trackMetrics);
     }
   }