You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2017/05/26 16:52:01 UTC

phoenix git commit: PHOENIX-3248 Enable HBase server-side scan metrics to be returned to client and surfaced through metrics (Karan Mehta)

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.1 92d8cf271 -> b553f96a2


PHOENIX-3248 Enable HBase server-side scan metrics to be returned to client and surfaced through metrics (Karan Mehta)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b553f96a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b553f96a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b553f96a

Branch: refs/heads/4.x-HBase-1.1
Commit: b553f96a21b603d02083cd1c32fca95036a80bb5
Parents: 92d8cf2
Author: Samarth Jain <sa...@apache.org>
Authored: Fri May 26 09:51:57 2017 -0700
Committer: Samarth Jain <sa...@apache.org>
Committed: Fri May 26 09:51:57 2017 -0700

----------------------------------------------------------------------
 .../DelayedTableResultIteratorFactory.java      | 18 +++--
 .../phoenix/monitoring/PhoenixMetricsIT.java    | 35 ++++-----
 .../phoenix/iterate/ChunkedResultIterator.java  | 11 +--
 .../DefaultTableResultIteratorFactory.java      | 12 ++--
 .../phoenix/iterate/ParallelIterators.java      | 11 ++-
 .../phoenix/iterate/ScanningResultIterator.java | 76 +++++++++++++-------
 .../apache/phoenix/iterate/SerialIterators.java | 13 +++-
 .../phoenix/iterate/TableResultIterator.java    | 11 +--
 .../iterate/TableResultIteratorFactory.java     |  6 +-
 .../phoenix/mapreduce/PhoenixRecordReader.java  | 16 +++--
 .../phoenix/monitoring/GlobalClientMetrics.java |  7 +-
 .../apache/phoenix/monitoring/MetricType.java   | 17 ++++-
 .../phoenix/monitoring/ReadMetricQueue.java     |  4 ++
 .../hive/mapreduce/PhoenixRecordReader.java     | 11 +--
 14 files changed, 158 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
index 55bed91..5e13982 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
@@ -26,7 +26,7 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
-import org.apache.phoenix.monitoring.CombinableMetric;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 
@@ -39,14 +39,18 @@ public class DelayedTableResultIteratorFactory implements TableResultIteratorFac
     }
     
     @Override
-    public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan,
-            CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
-        return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper);
+    public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef,
+            Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold,
+            QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
+        return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetricsHolder,
+                renewLeaseThreshold, plan, scanGrouper);
     }
-    
+
     private class DelayedTableResultIterator extends TableResultIterator {
-        public DelayedTableResultIterator (MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
-            super(mutationState, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper);
+        public DelayedTableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan,
+                ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, QueryPlan plan,
+                ParallelScanGrouper scanGrouper) throws SQLException {
+            super(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper);
         }
         
         @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index 2838f04..69ad1ff 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -76,12 +76,14 @@ import com.google.common.collect.Sets;
 
 public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
 
-    private static final List<String> mutationMetricsToSkip = Lists
-            .newArrayList(MetricType.MUTATION_COMMIT_TIME.name());
-    private static final List<String> readMetricsToSkip = Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME.name(),
-            MetricType.TASK_EXECUTION_TIME.name(), MetricType.TASK_END_TO_END_TIME.name());
+    private static final List<String> mutationMetricsToSkip =
+            Lists.newArrayList(MetricType.MUTATION_COMMIT_TIME.name());
+    private static final List<String> readMetricsToSkip =
+            Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME.name(),
+                MetricType.TASK_EXECUTION_TIME.name(), MetricType.TASK_END_TO_END_TIME.name(),
+                MetricType.COUNT_MILLS_BETWEEN_NEXTS.name());
     private static final String CUSTOM_URL_STRING = "SESSION";
-    private static final AtomicInteger numConnections = new AtomicInteger(0); 
+    private static final AtomicInteger numConnections = new AtomicInteger(0);
 
     @BeforeClass
     public static void doSetup() throws Exception {
@@ -230,7 +232,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
         resultSetBeingTested.close();
         Set<String> expectedTableNames = Sets.newHashSet(tableName);
         assertReadMetricValuesForSelectSql(Lists.newArrayList(numRows), Lists.newArrayList(numExpectedTasks),
-                resultSetBeingTested, expectedTableNames);
+            resultSetBeingTested, expectedTableNames);
     }
 
     @Test
@@ -617,7 +619,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
             assertMetricsHaveSameValues(metrics.get(index1), metrics.get(index3), mutationMetricsToSkip);
         }
     }
-    
+
     @Test
     public void testOpenConnectionsCounter() throws Exception {
         long numOpenConnections = GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue();
@@ -626,7 +628,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
         }
         assertEquals(numOpenConnections, GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue());
     }
-    
+
     private void createTableAndInsertValues(boolean commit, int numRows, Connection conn, String tableName)
             throws SQLException {
         String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
@@ -684,20 +686,14 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
             String tableName = entry.getKey();
             expectedTableNames.remove(tableName);
             Map<String, Long> metricValues = entry.getValue();
-            boolean scanMetricsPresent = false;
             boolean taskCounterMetricsPresent = false;
             boolean taskExecutionTimeMetricsPresent = false;
             boolean memoryMetricsPresent = false;
             for (Entry<String, Long> pair : metricValues.entrySet()) {
                 String metricName = pair.getKey();
                 long metricValue = pair.getValue();
-                long n = numRows.get(counter);
                 long numTask = numExpectedTasks.get(counter);
-                if (metricName.equals(SCAN_BYTES.name())) {
-                    // we are using a SCAN_BYTES_DELTA of 1. So number of scan bytes read should be number of rows read
-                    assertEquals(n, metricValue);
-                    scanMetricsPresent = true;
-                } else if (metricName.equals(TASK_EXECUTED_COUNTER.name())) {
+                if (metricName.equals(TASK_EXECUTED_COUNTER.name())) {
                     assertEquals(numTask, metricValue);
                     taskCounterMetricsPresent = true;
                 } else if (metricName.equals(TASK_EXECUTION_TIME.name())) {
@@ -709,7 +705,6 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
                 }
             }
             counter++;
-            assertTrue(scanMetricsPresent);
             assertTrue(taskCounterMetricsPresent);
             assertTrue(taskExecutionTimeMetricsPresent);
             assertTrue(memoryMetricsPresent);
@@ -822,7 +817,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
             }
         }
     }
-    
+
     @Test
     public void testGetConnectionsForSameUrlConcurrently()  throws Exception {
         // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
@@ -940,7 +935,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
             exec.shutdownNow();
         }
     }
-    
+
     @Test
     public void testGetConnectionsWithDifferentJDBCParamsConcurrently()  throws Exception {
         DriverManager.registerDriver(PhoenixDriver.INSTANCE);
@@ -977,9 +972,9 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
                     c.close();
                 } catch (Exception ignore) {}
             }
-        } 
+        }
     }
-    
+
     private static class GetConnectionCallable implements Callable<Connection> {
         private final String url;
         GetConnectionCallable(String url) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index a12d40c..8595fd4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -19,7 +19,6 @@
 package org.apache.phoenix.iterate;
 
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
-import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
 
 import java.sql.SQLException;
 import java.util.List;
@@ -30,6 +29,8 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.TableRef;
@@ -144,12 +145,14 @@ public class ChunkedResultIterator implements PeekingResultIterator {
             }
             if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get next chunked result iterator over " + tableRef.getTable().getPhysicalName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan)));
             String tableName = tableRef.getTable().getPhysicalName().getString();
+            ReadMetricQueue readMetrics = context.getReadMetricsQueue();
+            ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, tableName, scan,
+                readMetrics.isRequestMetricsEnabled());
             long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
             ResultIterator singleChunkResultIterator =
                     new SingleChunkResultIterator(new TableResultIterator(mutationState, scan,
-                            context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName),
-                            renewLeaseThreshold, plan, DefaultParallelScanGrouper.getInstance()),
-                            chunkSize);
+                            scanMetricsHolder, renewLeaseThreshold, plan,
+                            DefaultParallelScanGrouper.getInstance()), chunkSize);
             resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName, plan);
         }
         return resultIterator;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
index b720b56..976b839 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
@@ -22,15 +22,17 @@ import java.sql.SQLException;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.MutationState;
-import org.apache.phoenix.monitoring.CombinableMetric;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.schema.TableRef;
 
 public class DefaultTableResultIteratorFactory implements TableResultIteratorFactory {
 
-    @Override
-    public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan,
-            CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
-        return new TableResultIterator(mutationState, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper);
+       @Override
+    public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef,
+            Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold,
+            QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
+        return new TableResultIterator(mutationState, scan, scanMetricsHolder, renewLeaseThreshold,
+                plan, scanGrouper);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 8c9b689..f0360e2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -30,9 +30,9 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.job.JobManager.JobCallable;
-import org.apache.phoenix.monitoring.CombinableMetric;
 import org.apache.phoenix.monitoring.MetricType;
 import org.apache.phoenix.monitoring.ReadMetricQueue;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.LogUtil;
@@ -97,11 +97,16 @@ public class ParallelIterators extends BaseResultIterators {
         context.getOverallQueryMetrics().updateNumParallelScans(numScans);
         GLOBAL_NUM_PARALLEL_SCANS.update(numScans);
         final long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
+        boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled();
         for (final ScanLocator scanLocation : scanLocations) {
             final Scan scan = scanLocation.getScan();
-            final CombinableMetric scanMetrics = readMetrics.allotMetric(MetricType.SCAN_BYTES, physicalTableName);
+            final ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, physicalTableName,
+                scan, isRequestMetricsEnabled);
             final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(readMetrics, physicalTableName);
-            final TableResultIterator tableResultItr = context.getConnection().getTableResultIteratorFactory().newIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper);
+            final TableResultIterator tableResultItr =
+                    context.getConnection().getTableResultIteratorFactory().newIterator(
+                        mutationState, tableRef, scan, scanMetricsHolder, renewLeaseThreshold, plan,
+                        scanGrouper);
             context.getConnection().addIteratorForLeaseRenewal(tableResultItr);
             Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
                 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
index 7f865ed..8ee00e9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -22,33 +22,74 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTE
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
+import java.util.Map;
 
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.phoenix.monitoring.CombinableMetric;
-import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric;
-import org.apache.phoenix.monitoring.GlobalClientMetrics;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ServerUtil;
 
 public class ScanningResultIterator implements ResultIterator {
     private final ResultScanner scanner;
-    private final CombinableMetric scanMetrics;
+    private final Scan scan;
+    private final ScanMetricsHolder scanMetricsHolder;
+    boolean scanMetricsUpdated;
+    boolean scanMetricsEnabled;
 
-    public ScanningResultIterator(ResultScanner scanner, CombinableMetric scanMetrics) {
+    // These metric names are how HBase refers them
+    // Since HBase stores these strings as static final, we are using the same here
+    static final String RPC_CALLS_METRIC_NAME = "RPC_CALLS";
+    static final String REMOTE_RPC_CALLS_METRIC_NAME = "REMOTE_RPC_CALLS";
+    static final String MILLIS_BETWEEN_NEXTS_METRIC_NAME = "MILLIS_BETWEEN_NEXTS";
+    static final String NOT_SERVING_REGION_EXCEPTION_METRIC_NAME = "NOT_SERVING_REGION_EXCEPTION";
+    static final String BYTES_IN_RESULTS_METRIC_NAME = "BYTES_IN_RESULTS";
+    static final String BYTES_IN_REMOTE_RESULTS_METRIC_NAME = "BYTES_IN_REMOTE_RESULTS";
+    static final String REGIONS_SCANNED_METRIC_NAME = "REGIONS_SCANNED";
+    static final String RPC_RETRIES_METRIC_NAME = "RPC_RETRIES";
+    static final String REMOTE_RPC_RETRIES_METRIC_NAME = "REMOTE_RPC_RETRIES";
+    static final String COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME = "ROWS_SCANNED";
+    static final String COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME = "ROWS_FILTERED";
+    static final String GLOBAL_BYTES_IN_RESULTS_METRIC_NAME = "BYTES_IN_RESULTS";
+
+    public ScanningResultIterator(ResultScanner scanner, Scan scan, ScanMetricsHolder scanMetricsHolder) {
         this.scanner = scanner;
-        this.scanMetrics = scanMetrics;
+        this.scan = scan;
+        this.scanMetricsHolder = scanMetricsHolder;
+        scanMetricsUpdated = false;
+        scanMetricsEnabled = scan.isScanMetricsEnabled();
     }
 
     @Override
     public void close() throws SQLException {
+        getScanMetrics();
         scanner.close();
     }
 
+    private void getScanMetrics() {
+
+        if (!scanMetricsUpdated && scanMetricsEnabled) {
+            Map<String, Long> scanMetricsMap = scan.getScanMetrics().getMetricsMap();
+            scanMetricsHolder.getCountOfRPCcalls().change(scanMetricsMap.get(RPC_CALLS_METRIC_NAME));
+            scanMetricsHolder.getCountOfRemoteRPCcalls().change(scanMetricsMap.get(REMOTE_RPC_CALLS_METRIC_NAME));
+            scanMetricsHolder.getSumOfMillisSecBetweenNexts().change(scanMetricsMap.get(MILLIS_BETWEEN_NEXTS_METRIC_NAME));
+            scanMetricsHolder.getCountOfNSRE().change(scanMetricsMap.get(NOT_SERVING_REGION_EXCEPTION_METRIC_NAME));
+            scanMetricsHolder.getCountOfBytesInResults().change(scanMetricsMap.get(BYTES_IN_RESULTS_METRIC_NAME));
+            scanMetricsHolder.getCountOfBytesInRemoteResults().change(scanMetricsMap.get(BYTES_IN_REMOTE_RESULTS_METRIC_NAME));
+            scanMetricsHolder.getCountOfRegions().change(scanMetricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+            scanMetricsHolder.getCountOfRPCRetries().change(scanMetricsMap.get(RPC_RETRIES_METRIC_NAME));
+            scanMetricsHolder.getCountOfRemoteRPCRetries().change(scanMetricsMap.get(REMOTE_RPC_RETRIES_METRIC_NAME));
+            scanMetricsHolder.getCountOfRowsScanned().change(scanMetricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+            scanMetricsHolder.getCountOfRowsFiltered().change(scanMetricsMap.get(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME));
+
+            GLOBAL_SCAN_BYTES.update(scanMetricsMap.get(GLOBAL_BYTES_IN_RESULTS_METRIC_NAME));
+            scanMetricsUpdated = true;
+        }
+
+    }
+
     @Override
     public Tuple next() throws SQLException {
         try {
@@ -57,7 +98,6 @@ public class ScanningResultIterator implements ResultIterator {
                 close(); // Free up resources early
                 return null;
             }
-            calculateScanSize(result);
             // TODO: use ResultTuple.setResult(result)?
             // Need to create a new one if holding on to it (i.e. OrderedResultIterator)
             return new ResultTuple(result);
@@ -75,22 +115,6 @@ public class ScanningResultIterator implements ResultIterator {
         return "ScanningResultIterator [scanner=" + scanner + "]";
     }
 
-    private void calculateScanSize(Result result) {
-        if (GlobalClientMetrics.isMetricsEnabled() || scanMetrics != NoOpRequestMetric.INSTANCE) {
-            if (result != null) {
-                Cell[] cells = result.rawCells();
-                long scanResultSize = 0;
-                for (Cell cell : cells) {
-                    KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-                    scanResultSize += kv.heapSize();
-                }
-                scanMetrics.change(scanResultSize);
-                GLOBAL_SCAN_BYTES.update(scanResultSize);
-            }
-        }
-    }
-
-
     public ResultScanner getScanner() {
         return scanner;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index d8f7f40..eb0c949 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -17,8 +17,6 @@
  */
 package org.apache.phoenix.iterate;
 
-import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
-
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
@@ -33,6 +31,8 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.job.JobManager.JobCallable;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.QueryConstants;
@@ -168,12 +168,19 @@ public class SerialIterators extends BaseResultIterators {
             if (index >= scans.size()) {
                 return EMPTY_ITERATOR;
             }
+            ReadMetricQueue readMetrics = context.getReadMetricsQueue();
+            boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled();
             while (index < scans.size()) {
                 Scan currentScan = scans.get(index++);
                 if (remainingOffset != null) {
                     currentScan.setAttribute(BaseScannerRegionObserver.SCAN_OFFSET, PInteger.INSTANCE.toBytes(remainingOffset));
                 }
-                TableResultIterator itr = new TableResultIterator(mutationState, currentScan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), renewLeaseThreshold, plan, scanGrouper);
+                ScanMetricsHolder scanMetricsHolder =
+                        ScanMetricsHolder.getInstance(readMetrics, tableName, currentScan,
+                            isRequestMetricsEnabled);
+                TableResultIterator itr =
+                        new TableResultIterator(mutationState, currentScan, scanMetricsHolder,
+                                renewLeaseThreshold, plan, scanGrouper);
                 PeekingResultIterator peekingItr = iteratorFactory.newIterator(context, itr, currentScan, tableName, plan);
                 Tuple tuple;
                 if ((tuple = peekingItr.peek()) == null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index c6fcc1d..f854996 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.monitoring.CombinableMetric;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.tuple.Tuple;
@@ -64,7 +65,7 @@ import com.google.common.annotations.VisibleForTesting;
 public class TableResultIterator implements ResultIterator {
     private final Scan scan;
     private final HTableInterface htable;
-    private final CombinableMetric scanMetrics;
+    private final ScanMetricsHolder scanMetricsHolder;
     private static final ResultIterator UNINITIALIZED_SCANNER = ResultIterator.EMPTY_ITERATOR;
     private final long renewLeaseThreshold;
     private final QueryPlan plan;
@@ -85,7 +86,7 @@ public class TableResultIterator implements ResultIterator {
 
     @VisibleForTesting // Exposed for testing. DON'T USE ANYWHERE ELSE!
     TableResultIterator() {
-        this.scanMetrics = null;
+        this.scanMetricsHolder = null;
         this.renewLeaseThreshold = 0;
         this.htable = null;
         this.scan = null;
@@ -97,10 +98,10 @@ public class TableResultIterator implements ResultIterator {
         RENEWED, NOT_RENEWED, CLOSED, UNINITIALIZED, THRESHOLD_NOT_REACHED, LOCK_NOT_ACQUIRED, NOT_SUPPORTED
     };
 
-    public TableResultIterator(MutationState mutationState, Scan scan, CombinableMetric scanMetrics,
+    public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder,
             long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
         this.scan = scan;
-        this.scanMetrics = scanMetrics;
+        this.scanMetricsHolder = scanMetricsHolder;
         this.plan = plan;
         PTable table = plan.getTableRef().getTable();
         htable = mutationState.getHTable(table);
@@ -186,7 +187,7 @@ public class TableResultIterator implements ResultIterator {
             if (delegate == UNINITIALIZED_SCANNER) {
                 try {
                     this.scanIterator =
-                            new ScanningResultIterator(htable.getScanner(scan), scanMetrics);
+                            new ScanningResultIterator(htable.getScanner(scan), scan, scanMetricsHolder);
                 } catch (IOException e) {
                     Closeables.closeQuietly(htable);
                     throw ServerUtil.parseServerException(e);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
index 8d7b54d..c23e342 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
@@ -23,8 +23,12 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.monitoring.CombinableMetric;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.schema.TableRef;
 
 public interface TableResultIteratorFactory {
-    public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException;        
+    public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef,
+            Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold,
+            QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException;
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index 17d9b6a..d4d6734 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -17,8 +17,6 @@
  */
 package org.apache.phoenix.mapreduce;
 
-import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
-
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
@@ -47,13 +45,13 @@ import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.iterate.TableResultIterator;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.monitoring.ReadMetricQueue;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
+import org.apache.phoenix.query.ConnectionQueryServices;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 
-import org.apache.phoenix.query.ConnectionQueryServices;
-
 /**
  * {@link RecordReader} implementation that iterates over the the records.
  */
@@ -119,10 +117,18 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
             services.clearTableRegionCache(tableNameBytes);
 
             long renewScannerLeaseThreshold = queryPlan.getContext().getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
+            boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled();
             for (Scan scan : scans) {
                 // For MR, skip the region boundary check exception if we encounter a split. ref: PHOENIX-2599
                 scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
-                final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext().getConnection().getMutationState(), scan, readMetrics.allotMetric(SCAN_BYTES, tableName), renewScannerLeaseThreshold, queryPlan, MapReduceParallelScanGrouper.getInstance());
+                ScanMetricsHolder scanMetricsHolder =
+                        ScanMetricsHolder.getInstance(readMetrics, tableName, scan,
+                            isRequestMetricsEnabled);
+                final TableResultIterator tableResultIterator =
+                        new TableResultIterator(
+                                queryPlan.getContext().getConnection().getMutationState(), scan,
+                                scanMetricsHolder, renewScannerLeaseThreshold, queryPlan,
+                                MapReduceParallelScanGrouper.getInstance());
                 PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
                 iterators.add(peekingResultIterator);
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
index b5f9422..6ba677a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
@@ -18,8 +18,6 @@
 package org.apache.phoenix.monitoring;
 
 import static org.apache.phoenix.monitoring.MetricType.HCONNECTIONS_COUNTER;
-import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
-import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
 import static org.apache.phoenix.monitoring.MetricType.MEMORY_WAIT_TIME;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
@@ -27,10 +25,11 @@ import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.QUERY_SERVICES_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.QUERY_TIME;
 import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER;
-import static org.apache.phoenix.monitoring.MetricType.TASK_REJECTED_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
 import static org.apache.phoenix.monitoring.MetricType.SELECT_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.SPOOL_FILE_COUNTER;
@@ -41,6 +40,8 @@ import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
 import static org.apache.phoenix.monitoring.MetricType.TASK_QUEUE_WAIT_TIME;
 import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.TASK_REJECTED_COUNTER;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index 7b21de5..a18d4ce 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -46,8 +46,19 @@ public enum MetricType {
     HCONNECTIONS_COUNTER("Number of HConnections created by phoenix driver"),
     PHOENIX_CONNECTIONS_THROTTLED_COUNTER("Number of client Phoenix connections prevented from opening " +
                                               "because there are already too many to that target cluster."),
-    PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("Number of requests for Phoenix connections, whether successful or not.");
-    
+    PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("Number of requests for Phoenix connections, whether successful or not."),
+    COUNT_RPC_CALLS("Number of RPC calls"),
+    COUNT_REMOTE_RPC_CALLS("Number of remote RPC calls"),
+    COUNT_MILLS_BETWEEN_NEXTS("Sum of milliseconds between sequential next calls"),
+    COUNT_NOT_SERVING_REGION_EXCEPTION("Number of NotServingRegionException caught"),
+    COUNT_BYTES_REGION_SERVER_RESULTS("Number of bytes in Result objects from region servers"),
+    COUNT_BYTES_IN_REMOTE_RESULTS("Number of bytes in Result objects from remote region servers"),
+    COUNT_SCANNED_REGIONS("Number of regions scanned"),
+    COUNT_RPC_RETRIES("Number of RPC retries"),
+    COUNT_REMOTE_RPC_RETRIES("Number of remote RPC retries"),
+    COUNT_ROWS_SCANNED("Number of rows scanned"),
+    COUNT_ROWS_FILTERED("Number of rows filtered");
+
     private final String description;
 
     private MetricType(String description) {
@@ -56,6 +67,6 @@ public enum MetricType {
 
     public String description() {
         return description;
-    }
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
index e6c6be2..0e985ed 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
@@ -177,4 +177,8 @@ public class ReadMetricQueue {
         return q;
     }
 
+	public boolean isRequestMetricsEnabled() {
+		return isRequestMetricsEnabled;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
index ca27686..c10c4d1 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
@@ -17,8 +17,6 @@
  */
 package org.apache.phoenix.hive.mapreduce;
 
-import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
-
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
@@ -49,6 +47,7 @@ import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.iterate.TableResultIterator;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.monitoring.ReadMetricQueue;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
@@ -113,12 +112,14 @@ public class PhoenixRecordReader<T extends DBWritable> implements
             String tableName = queryPlan.getTableRef().getTable().getPhysicalName().getString();
             long renewScannerLeaseThreshold = queryPlan.getContext().getConnection()
                     .getQueryServices().getRenewLeaseThresholdMilliSeconds();
+            boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled();
             for (Scan scan : scans) {
                 scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes
                         .toBytes(true));
-                final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan
-                        .getContext().getConnection().getMutationState(), scan,
-                        readMetrics.allotMetric(SCAN_BYTES, tableName), renewScannerLeaseThreshold, queryPlan, MapReduceParallelScanGrouper.getInstance()	);
+                ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, tableName, scan, isRequestMetricsEnabled);
+                final TableResultIterator tableResultIterator = new TableResultIterator(
+                        queryPlan.getContext().getConnection().getMutationState(), scan, scanMetricsHolder,
+                        renewScannerLeaseThreshold, queryPlan, MapReduceParallelScanGrouper.getInstance());
 
                 PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap
                         (tableResultIterator);