You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2015/06/27 01:29:03 UTC

[1/4] phoenix git commit: PHOENIX-1819 Build a framework to capture and report phoenix client side request level metrics

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 006aec5a3 -> 7e29d57bd


http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 7004f3c..18f914e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -27,6 +27,7 @@ import java.io.Reader;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.util.ArrayList;
@@ -34,6 +35,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.TreeSet;
@@ -61,8 +63,9 @@ import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
-import org.apache.phoenix.monitoring.Metric;
-import org.apache.phoenix.monitoring.PhoenixMetrics;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.monitoring.GlobalClientMetrics;
+import org.apache.phoenix.monitoring.GlobalMetric;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
@@ -142,7 +145,7 @@ public class PhoenixRuntime {
     public static final String ANNOTATION_ATTRIB_PREFIX = "phoenix.annotation.";
 
     /**
-     * Use this connection property to explicity enable or disable auto-commit on a new connection.
+     * Use this connection property to explicitly enable or disable auto-commit on a new connection.
      */
     public static final String AUTO_COMMIT_ATTRIB = "AutoCommit";
 
@@ -152,6 +155,11 @@ public class PhoenixRuntime {
      * upserting data into them, and getting the uncommitted state through {@link #getUncommittedData(Connection)}
      */
     public final static String CONNECTIONLESS = "none";
+    
+    /**
+     * Use this connection property to explicitly enable or disable request level metric collection.
+     */
+    public static final String REQUEST_METRIC_ATTRIB = "RequestMetric";
 
     private static final String HEADER_IN_LINE = "in-line";
     private static final String SQL_FILE_EXT = ".sql";
@@ -980,9 +988,162 @@ public class PhoenixRuntime {
     }
     
     /**
-     * Exposes the various internal phoenix metrics. 
+     * Exposes the various internal phoenix metrics collected at the client JVM level. 
+     */
+    public static Collection<GlobalMetric> getGlobalPhoenixClientMetrics() {
+        return GlobalClientMetrics.getMetrics();
+    }
+    
+    /**
+     * 
+     * @return whether or not the global client metrics are being collected
      */
-    public static Collection<Metric> getInternalPhoenixMetrics() {
-        return PhoenixMetrics.getMetrics();
+    public static boolean areGlobalClientMetricsBeingCollected() {
+        return GlobalClientMetrics.isMetricsEnabled();
     }
-}
+    
+    /**
+     * Method to expose the metrics associated with performing reads using the passed result set. A typical pattern is:
+     * 
+     * <pre>
+     * {@code
+     * Map<String, Map<String, Long>> overAllQueryMetrics = null;
+     * Map<String, Map<String, Long>> requestReadMetrics = null;
+     * try (ResultSet rs = stmt.executeQuery()) {
+     *    while(rs.next()) {
+     *      .....
+     *    }
+     *    overAllQueryMetrics = PhoenixRuntime.getOverAllReadRequestMetrics(rs);
+     *    requestReadMetrics = PhoenixRuntime.getRequestReadMetrics(rs);
+     *    PhoenixRuntime.resetMetrics(rs);
+     * }
+     * </pre>
+     * 
+     * @param rs
+     *            result set to get the metrics for
+     * @return a map of (table name) -> (map of (metric name) -> (metric value))
+     * @throws SQLException
+     */
+    public static Map<String, Map<String, Long>> getRequestReadMetrics(ResultSet rs) throws SQLException {
+        PhoenixResultSet resultSet = rs.unwrap(PhoenixResultSet.class);
+        return resultSet.getReadMetrics();
+    }
+
+    /**
+     * Method to expose the overall metrics associated with executing a query via phoenix. A typical pattern of
+     * accessing request level read metrics and overall read query metrics is:
+     * 
+     * <pre>
+     * {@code
+     * Map<String, Map<String, Long>> overAllQueryMetrics = null;
+     * Map<String, Map<String, Long>> requestReadMetrics = null;
+     * try (ResultSet rs = stmt.executeQuery()) {
+     *    while(rs.next()) {
+     *      .....
+     *    }
+     *    overAllQueryMetrics = PhoenixRuntime.getOverAllReadRequestMetrics(rs);
+     *    requestReadMetrics = PhoenixRuntime.getRequestReadMetrics(rs);
+     *    PhoenixRuntime.resetMetrics(rs);
+     * }
+     * </pre>
+     * 
+     * @param rs
+     *            result set to get the metrics for
+     * @return a map of metric name -> metric value
+     * @throws SQLException
+     */
+    public static Map<String, Long> getOverAllReadRequestMetrics(ResultSet rs) throws SQLException {
+        PhoenixResultSet resultSet = rs.unwrap(PhoenixResultSet.class);
+        return resultSet.getOverAllRequestReadMetrics();
+    }
+
+    /**
+     * Method to expose the metrics associated with sending over mutations to HBase. These metrics are updated when
+     * commit is called on the passed connection. Mutation metrics are accumulated for the connection till
+     * {@link #resetMetrics(Connection)} is called or the connection is closed. Example usage:
+     * 
+     * <pre>
+     * {@code
+     * Map<String, Map<String, Long>> mutationWriteMetrics = null;
+     * Map<String, Map<String, Long>> mutationReadMetrics = null;
+     * try (Connection conn = DriverManager.getConnection(url)) {
+     *    conn.createStatement.executeUpdate(dml1);
+     *    ....
+     *    conn.createStatement.executeUpdate(dml2);
+     *    ...
+     *    conn.createStatement.executeUpdate(dml3);
+     *    ...
+     *    conn.commit();
+     *    mutationWriteMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+     *    mutationReadMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(conn);
+     *    PhoenixRuntime.resetMetrics(rs);
+     * }
+     * </pre>
+     *  
+     * @param conn
+     *            connection to get the metrics for
+     * @return a map of (table name) -> (map of (metric name) -> (metric value))
+     * @throws SQLException
+     */
+    public static Map<String, Map<String, Long>> getWriteMetricsForMutationsSinceLastReset(Connection conn) throws SQLException {
+        PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+        return pConn.getMutationMetrics();
+    }
+
+    /**
+     * Method to expose the read metrics associated with executing a dml statement. These metrics are updated when
+     * commit is called on the passed connection. Read metrics are accumulated till {@link #resetMetrics(Connection)} is
+     * called or the connection is closed. Example usage:
+     * 
+     * <pre>
+     * {@code
+     * Map<String, Map<String, Long>> mutationWriteMetrics = null;
+     * Map<String, Map<String, Long>> mutationReadMetrics = null;
+     * try (Connection conn = DriverManager.getConnection(url)) {
+     *    conn.createStatement.executeUpdate(dml1);
+     *    ....
+     *    conn.createStatement.executeUpdate(dml2);
+     *    ...
+     *    conn.createStatement.executeUpdate(dml3);
+     *    ...
+     *    conn.commit();
+     *    mutationWriteMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+     *    mutationReadMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(conn);
+     *    PhoenixRuntime.resetMetrics(rs);
+     * }
+     * </pre> 
+     * @param conn
+     *            connection to get the metrics for
+     * @return  a map of (table name) -> (map of (metric name) -> (metric value))
+     * @throws SQLException
+     */
+    public static Map<String, Map<String, Long>> getReadMetricsForMutationsSinceLastReset(Connection conn) throws SQLException {
+        PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+        return pConn.getReadMetrics();
+    }
+
+    /**
+     * Reset the read metrics collected in the result set.
+     * 
+     * @see {@link #getRequestReadMetrics(ResultSet)} {@link #getOverAllReadRequestMetrics(ResultSet)}
+     * @param rs
+     * @throws SQLException
+     */
+    public static void resetMetrics(ResultSet rs) throws SQLException {
+        PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
+        prs.resetMetrics();
+    }
+    
+    /**
+     * Reset the mutation and reads-for-mutations metrics collected in the connection.
+     * 
+     * @see {@link #getReadMetricsForMutationsSinceLastReset(Connection)} {@link #getWriteMetricsForMutationsSinceLastReset(Connection)}
+     * @param conn
+     * @throws SQLException
+     */
+    public static void resetMetrics(Connection conn) throws SQLException {
+        PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+        pConn.clearMetrics();
+    }
+    
+ }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
index ab6a4a7..5ae1a56 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.memory.DelegatingMemoryManager;
 import org.apache.phoenix.memory.GlobalMemoryManager;
 import org.apache.phoenix.memory.MemoryManager;
+import org.apache.phoenix.monitoring.MemoryMetricsHolder;
+import org.apache.phoenix.monitoring.SpoolingMetricsHolder;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
@@ -52,7 +54,7 @@ public class SpoolingResultIteratorTest {
             };
 
         MemoryManager memoryManager = new DelegatingMemoryManager(new GlobalMemoryManager(threshold, 0));
-        ResultIterator scanner = new SpoolingResultIterator(iterator, memoryManager, threshold, maxSizeSpool,"/tmp");
+        ResultIterator scanner = new SpoolingResultIterator(SpoolingMetricsHolder.NO_OP_INSTANCE, MemoryMetricsHolder.NO_OP_INSTANCE, iterator, memoryManager, threshold, maxSizeSpool,"/tmp");
         AssertResults.assertResults(scanner, expectedResults);
     }
 


[4/4] phoenix git commit: PHOENIX-1819 Build a framework to capture and report phoenix client side request level metrics

Posted by sa...@apache.org.
PHOENIX-1819 Build a framework to capture and report phoenix client side request level metrics


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7e29d57b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7e29d57b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7e29d57b

Branch: refs/heads/4.x-HBase-0.98
Commit: 7e29d57bda0eb66b1dbd102060253e1b1468f052
Parents: 006aec5
Author: Samarth <sa...@salesforce.com>
Authored: Fri Jun 26 16:28:50 2015 -0700
Committer: Samarth <sa...@salesforce.com>
Committed: Fri Jun 26 16:28:50 2015 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/PhoenixMetricsIT.java       | 147 ----
 .../apache/phoenix/execute/PartialCommitIT.java |   1 +
 .../phoenix/monitoring/PhoenixMetricsIT.java    | 815 +++++++++++++++++++
 .../apache/phoenix/cache/ServerCacheClient.java |   7 +
 .../apache/phoenix/compile/DeleteCompiler.java  |  50 +-
 .../MutatingParallelIteratorFactory.java        |  51 +-
 .../phoenix/compile/StatementContext.java       |  49 +-
 .../apache/phoenix/compile/UpsertCompiler.java  |  80 +-
 .../apache/phoenix/execute/AggregatePlan.java   |   8 +-
 .../apache/phoenix/execute/HashJoinPlan.java    |   7 +
 .../apache/phoenix/execute/MutationState.java   | 290 ++++---
 .../org/apache/phoenix/execute/UnionPlan.java   |   8 +-
 .../phoenix/iterate/BaseResultIterators.java    |  15 +-
 .../phoenix/iterate/ChunkedResultIterator.java  |  21 +-
 .../iterate/ParallelIteratorFactory.java        |   4 +-
 .../phoenix/iterate/ParallelIterators.java      |  25 +-
 .../iterate/RoundRobinResultIterator.java       |   4 +-
 .../phoenix/iterate/ScanningResultIterator.java |  38 +-
 .../apache/phoenix/iterate/SerialIterators.java |  23 +-
 .../phoenix/iterate/SpoolingResultIterator.java |  49 +-
 .../phoenix/iterate/TableResultIterator.java    |  17 +-
 .../phoenix/iterate/UnionResultIterators.java   |  70 +-
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  28 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |  21 +-
 .../apache/phoenix/jdbc/PhoenixResultSet.java   |  48 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  20 +-
 .../java/org/apache/phoenix/job/JobManager.java |  60 +-
 .../phoenix/mapreduce/CsvBulkLoadTool.java      |  10 +-
 .../phoenix/mapreduce/PhoenixRecordReader.java  |  12 +-
 .../phoenix/memory/GlobalMemoryManager.java     |   5 -
 .../apache/phoenix/monitoring/AtomicMetric.java |  70 ++
 .../phoenix/monitoring/CombinableMetric.java    |  77 ++
 .../monitoring/CombinableMetricImpl.java        |  77 ++
 .../org/apache/phoenix/monitoring/Counter.java  |  85 --
 .../phoenix/monitoring/GlobalClientMetrics.java | 117 +++
 .../apache/phoenix/monitoring/GlobalMetric.java |  37 +
 .../phoenix/monitoring/GlobalMetricImpl.java    |  74 ++
 .../phoenix/monitoring/MemoryMetricsHolder.java |  43 +
 .../org/apache/phoenix/monitoring/Metric.java   |  45 +-
 .../apache/phoenix/monitoring/MetricType.java   |  55 ++
 .../phoenix/monitoring/MetricsStopWatch.java    |  59 ++
 .../phoenix/monitoring/MutationMetricQueue.java | 131 +++
 .../phoenix/monitoring/NonAtomicMetric.java     |  71 ++
 .../phoenix/monitoring/OverAllQueryMetrics.java | 121 +++
 .../phoenix/monitoring/PhoenixMetrics.java      | 118 ---
 .../phoenix/monitoring/ReadMetricQueue.java     | 180 ++++
 .../phoenix/monitoring/SizeStatistic.java       |  78 --
 .../monitoring/SpoolingMetricsHolder.java       |  43 +
 .../monitoring/TaskExecutionMetricsHolder.java  |  68 ++
 .../phoenix/query/BaseQueryServicesImpl.java    |   2 +-
 .../org/apache/phoenix/query/QueryServices.java |   3 +-
 .../phoenix/query/QueryServicesOptions.java     |  25 +-
 .../phoenix/trace/PhoenixMetricsSink.java       |  36 +-
 .../java/org/apache/phoenix/util/JDBCUtil.java  |   5 +
 .../org/apache/phoenix/util/PhoenixRuntime.java | 175 +++-
 .../iterate/SpoolingResultIteratorTest.java     |   4 +-
 56 files changed, 2933 insertions(+), 849 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java
deleted file mode 100644
index edb4042..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.FAILED_QUERY;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.MUTATION_COUNT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.NUM_SPOOL_FILE;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.QUERY_COUNT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.QUERY_TIMEOUT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.REJECTED_TASK_COUNT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BATCH_SIZE;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BYTES;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_COMMIT_TIME;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.PARALLEL_SCANS;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.QUERY_TIME;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.SCAN_BYTES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-
-import org.apache.phoenix.monitoring.Metric;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.junit.Test;
-
-public class PhoenixMetricsIT extends BaseHBaseManagedTimeIT {
-    
-    @Test
-    public void testResetPhoenixMetrics() {
-        resetMetrics();
-        for (Metric m : PhoenixRuntime.getInternalPhoenixMetrics()) {
-            assertEquals(0, m.getTotalSum());
-            assertEquals(0, m.getNumberOfSamples());
-        }
-    }
-    
-    @Test
-    public void testPhoenixMetricsForQueries() throws Exception {
-        createTableAndInsertValues("T", true);
-        resetMetrics(); // we want to count metrics related only to the below query
-        Connection conn = DriverManager.getConnection(getUrl());
-        String query = "SELECT * FROM T";
-        ResultSet rs = conn.createStatement().executeQuery(query);
-        while (rs.next()) {
-            rs.getString(1);
-            rs.getString(2);
-        }
-        assertEquals(1, PARALLEL_SCANS.getMetric().getTotalSum());
-        assertEquals(1, QUERY_COUNT.getMetric().getTotalSum());
-        assertEquals(0, REJECTED_TASK_COUNT.getMetric().getTotalSum());
-        assertEquals(0, QUERY_TIMEOUT.getMetric().getTotalSum());
-        assertEquals(0, FAILED_QUERY.getMetric().getTotalSum());
-        assertEquals(0, NUM_SPOOL_FILE.getMetric().getTotalSum());
-        assertEquals(0, MUTATION_BATCH_SIZE.getMetric().getTotalSum());
-        assertEquals(0, MUTATION_BYTES.getMetric().getTotalSum());
-        assertEquals(0, MUTATION_COMMIT_TIME.getMetric().getTotalSum());
-        
-        assertTrue(SCAN_BYTES.getMetric().getTotalSum() > 0);
-        assertTrue(QUERY_TIME.getMetric().getTotalSum() > 0);
-    }
-    
-    @Test
-    public void testPhoenixMetricsForMutations() throws Exception {
-        createTableAndInsertValues("T", true);
-        assertEquals(10, MUTATION_BATCH_SIZE.getMetric().getTotalSum());
-        assertEquals(10, MUTATION_COUNT.getMetric().getTotalSum());
-        assertTrue(MUTATION_BYTES.getMetric().getTotalSum() > 0);
-        assertTrue(MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0);
-        assertEquals(0, PARALLEL_SCANS.getMetric().getTotalSum());
-        assertEquals(0, QUERY_COUNT.getMetric().getTotalSum());
-        assertEquals(0, REJECTED_TASK_COUNT.getMetric().getTotalSum());
-        assertEquals(0, QUERY_TIMEOUT.getMetric().getTotalSum());
-        assertEquals(0, FAILED_QUERY.getMetric().getTotalSum());
-        assertEquals(0, NUM_SPOOL_FILE.getMetric().getTotalSum());
-    }
-    
-    
-    @Test
-    public void testPhoenixMetricsForUpsertSelect() throws Exception { 
-        createTableAndInsertValues("T", true);
-        resetMetrics();
-        String ddl = "CREATE TABLE T2 (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
-        Connection conn = DriverManager.getConnection(getUrl());
-        conn.createStatement().execute(ddl);
-        resetMetrics();
-        String dml = "UPSERT INTO T2 (K, V) SELECT K, V FROM T";
-        conn.createStatement().executeUpdate(dml);
-        conn.commit();
-        assertEquals(10, MUTATION_BATCH_SIZE.getMetric().getTotalSum());
-        assertEquals(1, MUTATION_COUNT.getMetric().getTotalSum());
-        assertEquals(1, PARALLEL_SCANS.getMetric().getTotalSum());
-        assertEquals(0, QUERY_TIME.getMetric().getTotalSum());
-        assertTrue(SCAN_BYTES.getMetric().getTotalSum() > 0);
-        assertTrue(MUTATION_BYTES.getMetric().getTotalSum() > 0);
-        assertTrue(MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0);
-        assertEquals(0, QUERY_COUNT.getMetric().getTotalSum());
-        assertEquals(0, REJECTED_TASK_COUNT.getMetric().getTotalSum());
-        assertEquals(0, QUERY_TIMEOUT.getMetric().getTotalSum());
-        assertEquals(0, FAILED_QUERY.getMetric().getTotalSum());
-        assertEquals(0, NUM_SPOOL_FILE.getMetric().getTotalSum());
-    }
-    
-    private static void resetMetrics() {
-        for (Metric m : PhoenixRuntime.getInternalPhoenixMetrics()) {
-            m.reset();
-        }
-    }
-    
-    private static void createTableAndInsertValues(String tableName, boolean resetMetricsAfterTableCreate) throws Exception {
-        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
-        Connection conn = DriverManager.getConnection(getUrl());
-        conn.createStatement().execute(ddl);
-        if (resetMetricsAfterTableCreate) {
-            resetMetrics();
-        }
-        // executing 10 upserts/mutations.
-        String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
-        PreparedStatement stmt = conn.prepareStatement(dml);
-        for (int i = 1; i <= 10; i++) {
-            stmt.setString(1, "key" + i);
-            stmt.setString(2, "value" + i);
-            stmt.executeUpdate();
-        }
-        conn.commit();
-    }
-    
-    
-    
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index c8696e2..e0f0a3c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -260,6 +260,7 @@ public class PartialCommitIT {
         PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class));
         final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator());
         return new PhoenixConnection(phxCon) {
+            @Override
             protected MutationState newMutationState(int maxSize) {
                 return new MutationState(maxSize, this, mutations);
             };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
new file mode 100644
index 0000000..d9ca8e8
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -0,0 +1,815 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_SIZE;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_NUM_PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_REJECTED_TASK_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FILE_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_END_TO_END_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTION_TIME;
+import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
+import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class PhoenixMetricsIT extends BaseOwnClusterHBaseManagedTimeIT {
+
+    private static final List<String> mutationMetricsToSkip = Lists
+            .newArrayList(MetricType.MUTATION_COMMIT_TIME.name());
+    private static final List<String> readMetricsToSkip = Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME.name(),
+            MetricType.TASK_EXECUTION_TIME.name(), MetricType.TASK_END_TO_END_TIME.name());
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        // Enable request metric collection at the driver level
+        props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Test
+    public void testResetGlobalPhoenixMetrics() {
+        resetGlobalMetrics();
+        for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) {
+            assertEquals(0, m.getTotalSum());
+            assertEquals(0, m.getNumberOfSamples());
+        }
+    }
+
+    @Test
+    public void testGlobalPhoenixMetricsForQueries() throws Exception {
+        createTableAndInsertValues("T", true);
+        resetGlobalMetrics(); // we want to count metrics related only to the below query
+        Connection conn = DriverManager.getConnection(getUrl());
+        String query = "SELECT * FROM T";
+        ResultSet rs = conn.createStatement().executeQuery(query);
+        while (rs.next()) {
+            rs.getString(1);
+            rs.getString(2);
+        }
+        assertEquals(1, GLOBAL_NUM_PARALLEL_SCANS.getMetric().getTotalSum());
+        assertEquals(1, GLOBAL_SELECT_SQL_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_REJECTED_TASK_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_QUERY_TIMEOUT_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_MUTATION_BATCH_SIZE.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_MUTATION_BYTES.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_MUTATION_COMMIT_TIME.getMetric().getTotalSum());
+
+        assertTrue(GLOBAL_SCAN_BYTES.getMetric().getTotalSum() > 0);
+        assertTrue(GLOBAL_QUERY_TIME.getMetric().getTotalSum() > 0);
+        assertTrue(GLOBAL_TASK_END_TO_END_TIME.getMetric().getTotalSum() > 0);
+        assertTrue(GLOBAL_TASK_EXECUTION_TIME.getMetric().getTotalSum() > 0);
+    }
+
+    @Test
+    public void testGlobalPhoenixMetricsForMutations() throws Exception {
+        createTableAndInsertValues("T", true);
+        assertEquals(10, GLOBAL_MUTATION_BATCH_SIZE.getMetric().getTotalSum());
+        assertEquals(10, GLOBAL_MUTATION_SQL_COUNTER.getMetric().getTotalSum());
+        assertTrue(GLOBAL_MUTATION_BYTES.getMetric().getTotalSum() > 0);
+        assertTrue(GLOBAL_MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0);
+        assertEquals(0, GLOBAL_NUM_PARALLEL_SCANS.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_SELECT_SQL_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_REJECTED_TASK_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_QUERY_TIMEOUT_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum());
+    }
+
+    @Test
+    public void testGlobalPhoenixMetricsForUpsertSelect() throws Exception {
+        createTableAndInsertValues("T", true);
+        resetGlobalMetrics();
+        String ddl = "CREATE TABLE T2 (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute(ddl);
+        resetGlobalMetrics();
+        String dml = "UPSERT INTO T2 (K, V) SELECT K, V FROM T";
+        conn.createStatement().executeUpdate(dml);
+        conn.commit();
+        assertEquals(10, GLOBAL_MUTATION_BATCH_SIZE.getMetric().getTotalSum());
+        assertEquals(1, GLOBAL_MUTATION_SQL_COUNTER.getMetric().getTotalSum());
+        assertEquals(1, GLOBAL_NUM_PARALLEL_SCANS.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_QUERY_TIME.getMetric().getTotalSum());
+        assertTrue(GLOBAL_SCAN_BYTES.getMetric().getTotalSum() > 0);
+        assertTrue(GLOBAL_MUTATION_BYTES.getMetric().getTotalSum() > 0);
+        assertTrue(GLOBAL_MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0);
+        assertEquals(0, GLOBAL_SELECT_SQL_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_REJECTED_TASK_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_QUERY_TIMEOUT_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum());
+    }
+
+    private static void resetGlobalMetrics() {
+        for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) {
+            m.reset();
+        }
+    }
+
+    private static void createTableAndInsertValues(String tableName, boolean resetGlobalMetricsAfterTableCreate)
+            throws Exception {
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute(ddl);
+        if (resetGlobalMetricsAfterTableCreate) {
+            resetGlobalMetrics();
+        }
+        // executing 10 upserts/mutations.
+        String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+        PreparedStatement stmt = conn.prepareStatement(dml);
+        for (int i = 1; i <= 10; i++) {
+            stmt.setString(1, "key" + i);
+            stmt.setString(2, "value" + i);
+            stmt.executeUpdate();
+        }
+        conn.commit();
+    }
+
+    @Test
+    public void testOverallQueryMetricsForSelect() throws Exception {
+        String tableName = "SCANMETRICS";
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 6";
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute(ddl);
+    }
+
+    @Test
+    public void testReadMetricsForSelect() throws Exception {
+        String tableName = "READMETRICSFORSELECT";
+        long numSaltBuckets = 6;
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+                + numSaltBuckets;
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute(ddl);
+
+        long numRows = 1000;
+        long numExpectedTasks = numSaltBuckets;
+        insertRowsInTable(tableName, numRows);
+
+        String query = "SELECT * FROM " + tableName;
+        Statement stmt = conn.createStatement();
+        ResultSet rs = stmt.executeQuery(query);
+        PhoenixResultSet resultSetBeingTested = rs.unwrap(PhoenixResultSet.class);
+        changeInternalStateForTesting(resultSetBeingTested);
+        while (resultSetBeingTested.next()) {}
+        resultSetBeingTested.close();
+        Set<String> expectedTableNames = Sets.newHashSet(tableName);
+        assertReadMetricValuesForSelectSql(Lists.newArrayList(numRows), Lists.newArrayList(numExpectedTasks),
+                resultSetBeingTested, expectedTableNames);
+    }
+
+    @Test
+    public void testMetricsForUpsert() throws Exception {
+        String tableName = "UPSERTMETRICS";
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 6";
+        Connection ddlConn = DriverManager.getConnection(getUrl());
+        ddlConn.createStatement().execute(ddl);
+        ddlConn.close();
+
+        int numRows = 10;
+        Connection conn = insertRowsInTable(tableName, numRows);
+        PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+        Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+        for (Entry<String, Map<String, Long>> entry : mutationMetrics.entrySet()) {
+            String t = entry.getKey();
+            assertEquals("Table names didn't match!", tableName, t);
+            Map<String, Long> p = entry.getValue();
+            assertEquals("There should have been three metrics", 3, p.size());
+            boolean mutationBatchSizePresent = false;
+            boolean mutationCommitTimePresent = false;
+            boolean mutationBytesPresent = false;
+            for (Entry<String, Long> metric : p.entrySet()) {
+                String metricName = metric.getKey();
+                long metricValue = metric.getValue();
+                if (metricName.equals(MetricType.MUTATION_BATCH_SIZE.name())) {
+                    assertEquals("Mutation batch sizes didn't match!", numRows, metricValue);
+                    mutationBatchSizePresent = true;
+                } else if (metricName.equals(MetricType.MUTATION_COMMIT_TIME.name())) {
+                    assertTrue("Mutation commit time should be greater than zero", metricValue > 0);
+                    mutationCommitTimePresent = true;
+                } else if (metricName.equals(MetricType.MUTATION_BYTES.name())) {
+                    assertTrue("Mutation bytes size should be greater than zero", metricValue > 0);
+                    mutationBytesPresent = true;
+                }
+            }
+            assertTrue(mutationBatchSizePresent);
+            assertTrue(mutationCommitTimePresent);
+            assertTrue(mutationBytesPresent);
+        }
+        Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        assertEquals("Read metrics should be empty", 0, readMetrics.size());
+    }
+
+    @Test
+    public void testMetricsForUpsertSelect() throws Exception {
+        String tableName1 = "UPSERTFROM";
+        long table1SaltBuckets = 6;
+        String ddl = "CREATE TABLE " + tableName1 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+                + table1SaltBuckets;
+        Connection ddlConn = DriverManager.getConnection(getUrl());
+        ddlConn.createStatement().execute(ddl);
+        ddlConn.close();
+        int numRows = 10;
+        insertRowsInTable(tableName1, numRows);
+
+        String tableName2 = "UPSERTTO";
+        ddl = "CREATE TABLE " + tableName2 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 10";
+        ddlConn = DriverManager.getConnection(getUrl());
+        ddlConn.createStatement().execute(ddl);
+        ddlConn.close();
+
+        Connection conn = DriverManager.getConnection(getUrl());
+        String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " + tableName1;
+        conn.createStatement().executeUpdate(upsertSelect);
+        conn.commit();
+        PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+
+        Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+        assertMutationMetrics(tableName2, numRows, mutationMetrics);
+        Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        assertReadMetricsForMutatingSql(tableName1, table1SaltBuckets, readMetrics);
+    }
+
+    @Test
+    public void testMetricsForDelete() throws Exception {
+        String tableName = "DELETEMETRICS";
+        long tableSaltBuckets = 6;
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+                + tableSaltBuckets;
+        Connection ddlConn = DriverManager.getConnection(getUrl());
+        ddlConn.createStatement().execute(ddl);
+        ddlConn.close();
+        int numRows = 10;
+        insertRowsInTable(tableName, numRows);
+        Connection conn = DriverManager.getConnection(getUrl());
+        String delete = "DELETE FROM " + tableName;
+        conn.createStatement().execute(delete);
+        conn.commit();
+        PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+        Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+        assertMutationMetrics(tableName, numRows, mutationMetrics);
+
+        Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        assertReadMetricsForMutatingSql(tableName, tableSaltBuckets, readMetrics);
+    }
+
+    @Test
+    public void testNoMetricsCollectedForConnection() throws Exception {
+        String tableName = "NOMETRICS";
+        long tableSaltBuckets = 6;
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+                + tableSaltBuckets;
+        Connection ddlConn = DriverManager.getConnection(getUrl());
+        ddlConn.createStatement().execute(ddl);
+        ddlConn.close();
+        int numRows = 10;
+        insertRowsInTable(tableName, numRows);
+        Properties props = new Properties();
+        props.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, "false");
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
+        while (rs.next()) {}
+        rs.close();
+        Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getRequestReadMetrics(rs);
+        assertTrue("No read metrics should have been generated", readMetrics.size() == 0);
+        conn.createStatement().executeUpdate("UPSERT INTO " + tableName + " VALUES ('KEY', 'VALUE')");
+        conn.commit();
+        Map<String, Map<String, Long>> writeMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+        assertTrue("No write metrics should have been generated", writeMetrics.size() == 0);
+    }
+
+    @Test
+    public void testMetricsForUpsertWithAutoCommit() throws Exception {
+        String tableName = "VERIFYUPSERTAUTOCOMMIT";
+        long tableSaltBuckets = 6;
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+                + tableSaltBuckets;
+        try (Connection ddlConn = DriverManager.getConnection(getUrl())) {
+            ddlConn.createStatement().execute(ddl);
+        }
+
+        String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+        int numRows = 10;
+        Map<String, Map<String, Long>> mutationMetricsForAutoCommitOff = null;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(false);
+            upsertRows(upsert, numRows, conn);
+            conn.commit();
+            mutationMetricsForAutoCommitOff = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+        }
+
+        // Insert rows now with auto-commit on
+        Map<String, Map<String, Long>> mutationMetricsAutoCommitOn = null;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(true);
+            upsertRows(upsert, numRows, conn);
+            mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+        }
+        // Verify that the mutation metrics are same for both cases
+        assertMetricsAreSame(mutationMetricsForAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
+    }
+
+    private void upsertRows(String upsert, int numRows, Connection conn) throws SQLException {
+        PreparedStatement stmt = conn.prepareStatement(upsert);
+        for (int i = 1; i <= numRows; i++) {
+            stmt.setString(1, "key" + i);
+            stmt.setString(2, "value" + i);
+            stmt.executeUpdate();
+        }
+    }
+
+    @Test
+    public void testMetricsForDeleteWithAutoCommit() throws Exception {
+        String tableName = "VERIFYDELETEAUTOCOMMIT";
+        long tableSaltBuckets = 6;
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+                + tableSaltBuckets;
+        try (Connection ddlConn = DriverManager.getConnection(getUrl())) {
+            ddlConn.createStatement().execute(ddl);
+        }
+
+        String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+        int numRows = 10;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(false);
+            upsertRows(upsert, numRows, conn);
+            conn.commit();
+        }
+
+        String delete = "DELETE FROM " + tableName;
+        // Delete rows now with auto-commit off
+        Map<String, Map<String, Long>> deleteMetricsWithAutoCommitOff = null;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(false);
+            conn.createStatement().executeUpdate(delete);
+            deleteMetricsWithAutoCommitOff = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+        }
+
+        // Upsert the rows back
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(false);
+            upsertRows(upsert, numRows, conn);
+            conn.commit();
+        }
+
+        // Now delete rows with auto-commit on
+        Map<String, Map<String, Long>> deleteMetricsWithAutoCommitOn = null;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(true);
+            conn.createStatement().executeUpdate(delete);
+            deleteMetricsWithAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+        }
+
+        // Verify that the mutation metrics are same for both cases.
+        assertMetricsAreSame(deleteMetricsWithAutoCommitOff, deleteMetricsWithAutoCommitOn, mutationMetricsToSkip);
+    }
+
+    @Test
+    public void testMetricsForUpsertSelectWithAutoCommit() throws Exception {
+        String tableName1 = "UPSERTFROMAUTOCOMMIT";
+        long table1SaltBuckets = 6;
+        String ddl = "CREATE TABLE " + tableName1 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+                + table1SaltBuckets;
+        Connection ddlConn = DriverManager.getConnection(getUrl());
+        ddlConn.createStatement().execute(ddl);
+        ddlConn.close();
+        int numRows = 10;
+        insertRowsInTable(tableName1, numRows);
+
+        String tableName2 = "UPSERTTOAUTCOMMIT";
+        ddl = "CREATE TABLE " + tableName2 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 10";
+        ddlConn = DriverManager.getConnection(getUrl());
+        ddlConn.createStatement().execute(ddl);
+        ddlConn.close();
+
+        String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " + tableName1;
+
+        Map<String, Map<String, Long>> mutationMetricsAutoCommitOff = null;
+        Map<String, Map<String, Long>> readMetricsAutoCommitOff = null;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(false);
+            conn.createStatement().executeUpdate(upsertSelect);
+            conn.commit();
+            PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+            mutationMetricsAutoCommitOff = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+            readMetricsAutoCommitOff = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        }
+
+        Map<String, Map<String, Long>> mutationMetricsAutoCommitOn = null;
+        Map<String, Map<String, Long>> readMetricsAutoCommitOn = null;
+
+        int autoCommitBatchSize = numRows + 1; // batchsize = 11 is less than numRows and is not a divisor of batchsize
+        Properties props = new Properties();
+        props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize));
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            conn.createStatement().executeUpdate(upsertSelect);
+            PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+            mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+            readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        }
+        assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
+        assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOn, readMetricsToSkip);
+
+        autoCommitBatchSize = numRows - 1; // batchsize = 9 is less than numRows and is not a divisor of batchsize
+        props = new Properties();
+        props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize));
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            conn.createStatement().executeUpdate(upsertSelect);
+            PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+            mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+            readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        }
+        assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
+        assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOn, readMetricsToSkip);
+
+        autoCommitBatchSize = numRows;
+        props = new Properties();
+        props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize));
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            conn.createStatement().executeUpdate(upsertSelect);
+            PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+            mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+            readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        }
+        assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
+        assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOn, readMetricsToSkip);
+
+        autoCommitBatchSize = 2; // multiple batches of equal size
+        props = new Properties();
+        props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize));
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            conn.createStatement().executeUpdate(upsertSelect);
+            PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+            mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+            readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        }
+        assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
+        assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOff, readMetricsToSkip);
+    }
+
+    @Test
+    public void testMutationMetricsWhenUpsertingToMultipleTables() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String table1 = "TABLE1";
+            createTableAndInsertValues(true, 10, conn, table1);
+            String table2 = "TABLE2";
+            createTableAndInsertValues(true, 10, conn, table2);
+            String table3 = "TABLE3";
+            createTableAndInsertValues(true, 10, conn, table3);
+            Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+            assertTrue("Mutation metrics not present for " + table1, mutationMetrics.get(table1) != null);
+            assertTrue("Mutation metrics not present for " + table2, mutationMetrics.get(table2) != null);
+            assertTrue("Mutation metrics not present for " + table3, mutationMetrics.get(table3) != null);
+            assertMetricsHaveSameValues(mutationMetrics.get(table1), mutationMetrics.get(table2), mutationMetricsToSkip);
+            assertMetricsHaveSameValues(mutationMetrics.get(table1), mutationMetrics.get(table3), mutationMetricsToSkip);
+        }
+    }
+
+    @Test
+    public void testClosingConnectionClearsMetrics() throws Exception {
+        Connection conn = null;
+        try {
+            conn = DriverManager.getConnection(getUrl());
+            createTableAndInsertValues(true, 10, conn, "clearmetrics");
+            assertTrue("Mutation metrics not present", PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn).size() > 0);
+        } finally {
+            if (conn != null) {
+                conn.close();
+                assertTrue("Closing connection didn't clear metrics",
+                        PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn).size() == 0);
+            }
+        }
+    }
+
+    @Test
+    public void testMetricsForUpsertingIntoImmutableTableWithIndices() throws Exception {
+        String dataTable = "IMMTABLEWITHINDICES";
+        String tableDdl = "CREATE TABLE "
+                + dataTable
+                + " (K1 VARCHAR NOT NULL, K2 VARCHAR NOT NULL, V1 INTEGER, V2 INTEGER, V3 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(K1, K2)) IMMUTABLE_ROWS = true";
+        String index1 = "I1";
+        String index1Ddl = "CREATE INDEX " + index1 + " ON " + dataTable + " (V1) include (V2)";
+        String index2 = "I2";
+        String index2Ddl = "CREATE INDEX " + index2 + " ON " + dataTable + " (V2) include (V3)";
+        String index3 = "I3";
+        String index3Ddl = "CREATE INDEX " + index3 + " ON " + dataTable + " (V3) include (V1)";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute(tableDdl);
+            conn.createStatement().execute(index1Ddl);
+            conn.createStatement().execute(index2Ddl);
+            conn.createStatement().execute(index3Ddl);
+        }
+        String upsert = "UPSERT INTO " + dataTable + " VALUES (?, ?, ?, ?, ?)";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            /*
+             * Upsert data into table. Because the table is immutable, mutations for updating the indices on it are
+             * handled by the client itself. So mutation metrics should include mutations for the indices as well as the
+             * data table.
+             */
+            PreparedStatement stmt = conn.prepareStatement(upsert);
+            for (int i = 1; i < 10; i++) {
+                stmt.setString(1, "key1" + i);
+                stmt.setString(2, "key2" + i);
+                stmt.setInt(3, i);
+                stmt.setInt(4, i);
+                stmt.setInt(5, i);
+                stmt.executeUpdate();
+            }
+            conn.commit();
+            Map<String, Map<String, Long>> metrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+            assertTrue(metrics.get(dataTable).size() > 0);
+            assertTrue(metrics.get(index1).size() > 0);
+            assertTrue(metrics.get(index2).size() > 0);
+            assertMetricsHaveSameValues(metrics.get(index1), metrics.get(index2), mutationMetricsToSkip);
+            assertTrue(metrics.get(index3).size() > 0);
+            assertMetricsHaveSameValues(metrics.get(index1), metrics.get(index3), mutationMetricsToSkip);
+        }
+    }
+    
+    @Test
+    public void testMetricsForUpsertSelectSameTable() throws Exception {
+        String tableName = "UPSERTSAME";
+        long table1SaltBuckets = 6;
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+                + table1SaltBuckets;
+        Connection ddlConn = DriverManager.getConnection(getUrl());
+        ddlConn.createStatement().execute(ddl);
+        ddlConn.close();
+        int numRows = 10;
+        insertRowsInTable(tableName, numRows);
+
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.setAutoCommit(false);
+        String upsertSelect = "UPSERT INTO " + tableName + " SELECT * FROM " + tableName;
+        conn.createStatement().executeUpdate(upsertSelect);
+        conn.commit();
+        PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+        
+        Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+        // Because auto-commit is off, upsert select into the same table will run on the client.
+        // So we should have client side read and write metrics available.
+        assertMutationMetrics(tableName, numRows, mutationMetrics);
+        Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        assertReadMetricsForMutatingSql(tableName, table1SaltBuckets, readMetrics);
+        PhoenixRuntime.resetMetrics(pConn);
+        // With autocommit on, still, this upsert select runs on the client side.
+        conn.setAutoCommit(true);
+        conn.createStatement().executeUpdate(upsertSelect);
+        Map<String, Map<String, Long>> autoCommitMutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+        Map<String, Map<String, Long>> autoCommitReadMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        assertMetricsAreSame(mutationMetrics, autoCommitMutationMetrics, mutationMetricsToSkip);
+        assertMetricsAreSame(readMetrics, autoCommitReadMetrics, readMetricsToSkip);
+    }
+    
+    private void createTableAndInsertValues(boolean commit, int numRows, Connection conn, String tableName)
+            throws SQLException {
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
+        conn.createStatement().execute(ddl);
+        // executing 10 upserts/mutations.
+        String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+        PreparedStatement stmt = conn.prepareStatement(dml);
+        for (int i = 1; i <= numRows; i++) {
+            stmt.setString(1, "key" + i);
+            stmt.setString(2, "value" + i);
+            stmt.executeUpdate();
+        }
+        if (commit) {
+            conn.commit();
+        }
+    }
+
+    private void assertMetricsAreSame(Map<String, Map<String, Long>> metric1, Map<String, Map<String, Long>> metric2,
+            List<String> metricsToSkip) {
+        assertTrue("The two metrics have different or unequal number of table names ",
+                metric1.keySet().equals(metric2.keySet()));
+        for (Entry<String, Map<String, Long>> entry : metric1.entrySet()) {
+            Map<String, Long> metricNameValueMap1 = entry.getValue();
+            Map<String, Long> metricNameValueMap2 = metric2.get(entry.getKey());
+            assertMetricsHaveSameValues(metricNameValueMap1, metricNameValueMap2, metricsToSkip);
+        }
+    }
+
+    private void assertMetricsHaveSameValues(Map<String, Long> metricNameValueMap1,
+            Map<String, Long> metricNameValueMap2, List<String> metricsToSkip) {
+        assertTrue("The two metrics have different or unequal number of metric names ", metricNameValueMap1.keySet()
+                .equals(metricNameValueMap2.keySet()));
+        for (Entry<String, Long> entry : metricNameValueMap1.entrySet()) {
+            String metricName = entry.getKey();
+            if (!metricsToSkip.contains(metricName)) {
+                assertEquals("Unequal values for metric " + metricName, entry.getValue(),
+                        metricNameValueMap2.get(metricName));
+            }
+        }
+    }
+
+    private void changeInternalStateForTesting(PhoenixResultSet rs) {
+        // get and set the internal state for testing purposes.
+        ReadMetricQueue testMetricsQueue = new TestReadMetricsQueue(true);
+        StatementContext ctx = (StatementContext)Whitebox.getInternalState(rs, "context");
+        Whitebox.setInternalState(ctx, "readMetricsQueue", testMetricsQueue);
+        Whitebox.setInternalState(rs, "readMetricsQueue", testMetricsQueue);
+    }
+
+    private void assertReadMetricValuesForSelectSql(ArrayList<Long> numRows, ArrayList<Long> numExpectedTasks,
+            PhoenixResultSet resultSetBeingTested, Set<String> expectedTableNames) throws SQLException {
+        Map<String, Map<String, Long>> metrics = PhoenixRuntime.getRequestReadMetrics(resultSetBeingTested);
+        int counter = 0;
+        for (Entry<String, Map<String, Long>> entry : metrics.entrySet()) {
+            String tableName = entry.getKey();
+            expectedTableNames.remove(tableName);
+            Map<String, Long> metricValues = entry.getValue();
+            boolean scanMetricsPresent = false;
+            boolean taskCounterMetricsPresent = false;
+            boolean taskExecutionTimeMetricsPresent = false;
+            boolean memoryMetricsPresent = false;
+            for (Entry<String, Long> pair : metricValues.entrySet()) {
+                String metricName = pair.getKey();
+                long metricValue = pair.getValue();
+                long n = numRows.get(counter);
+                long numTask = numExpectedTasks.get(counter);
+                if (metricName.equals(SCAN_BYTES.name())) {
+                    // we are using a SCAN_BYTES_DELTA of 1. So number of scan bytes read should be number of rows read
+                    assertEquals(n, metricValue);
+                    scanMetricsPresent = true;
+                } else if (metricName.equals(TASK_EXECUTED_COUNTER.name())) {
+                    assertEquals(numTask, metricValue);
+                    taskCounterMetricsPresent = true;
+                } else if (metricName.equals(TASK_EXECUTION_TIME.name())) {
+                    assertEquals(numTask * TASK_EXECUTION_TIME_DELTA, metricValue);
+                    taskExecutionTimeMetricsPresent = true;
+                } else if (metricName.equals(MEMORY_CHUNK_BYTES.name())) {
+                    assertEquals(numTask * MEMORY_CHUNK_BYTES_DELTA, metricValue);
+                    memoryMetricsPresent = true;
+                }
+            }
+            counter++;
+            assertTrue(scanMetricsPresent);
+            assertTrue(taskCounterMetricsPresent);
+            assertTrue(taskExecutionTimeMetricsPresent);
+            assertTrue(memoryMetricsPresent);
+        }
+        PhoenixRuntime.resetMetrics(resultSetBeingTested);
+        assertTrue("Metrics not found tables " + Joiner.on(",").join(expectedTableNames),
+                expectedTableNames.size() == 0);
+    }
+
+    private Connection insertRowsInTable(String tableName, long numRows) throws SQLException {
+        String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+        Connection conn = DriverManager.getConnection(getUrl());
+        PreparedStatement stmt = conn.prepareStatement(dml);
+        for (int i = 1; i <= numRows; i++) {
+            stmt.setString(1, "key" + i);
+            stmt.setString(2, "value" + i);
+            stmt.executeUpdate();
+        }
+        conn.commit();
+        return conn;
+    }
+
+    // number of records read should be number of bytes at the end
+    public static final long SCAN_BYTES_DELTA = 1;
+
+    // total task execution time should be numTasks * TASK_EXECUTION_TIME_DELTA
+    public static final long TASK_EXECUTION_TIME_DELTA = 10;
+
+    // total task execution time should be numTasks * TASK_EXECUTION_TIME_DELTA
+    public static final long MEMORY_CHUNK_BYTES_DELTA = 100;
+
+    private class TestReadMetricsQueue extends ReadMetricQueue {
+
+        public TestReadMetricsQueue(boolean isRequestMetricsEnabled) {
+            super(isRequestMetricsEnabled);
+        }
+
+        @Override
+        public CombinableMetric getMetric(MetricType type) {
+            switch (type) {
+            case SCAN_BYTES:
+                return new CombinableMetricImpl(type) {
+
+                    @Override
+                    public void change(long delta) {
+                        super.change(SCAN_BYTES_DELTA);
+                    }
+                };
+            case TASK_EXECUTION_TIME:
+                return new CombinableMetricImpl(type) {
+
+                    @Override
+                    public void change(long delta) {
+                        super.change(TASK_EXECUTION_TIME_DELTA);
+                    }
+                };
+            case MEMORY_CHUNK_BYTES:
+                return new CombinableMetricImpl(type) {
+
+                    @Override
+                    public void change(long delta) {
+                        super.change(MEMORY_CHUNK_BYTES_DELTA);
+                    }
+                };
+            }
+            return super.getMetric(type);
+        }
+    }
+
+    private void assertReadMetricsForMutatingSql(String tableName, long tableSaltBuckets,
+            Map<String, Map<String, Long>> readMetrics) {
+        assertTrue("No read metrics present when there should have been!", readMetrics.size() > 0);
+        int numTables = 0;
+        for (Entry<String, Map<String, Long>> entry : readMetrics.entrySet()) {
+            String t = entry.getKey();
+            assertEquals("Table name didn't match for read metrics", tableName, t);
+            numTables++;
+            Map<String, Long> p = entry.getValue();
+            assertTrue("No read metrics present when there should have been", p.size() > 0);
+            for (Entry<String, Long> metric : p.entrySet()) {
+                String metricName = metric.getKey();
+                long metricValue = metric.getValue();
+                if (metricName.equals(TASK_EXECUTED_COUNTER.name())) {
+                    assertEquals(tableSaltBuckets, metricValue);
+                } else if (metricName.equals(SCAN_BYTES.name())) {
+                    assertTrue("Scan bytes read should be greater than zero", metricValue > 0);
+                }
+            }
+        }
+        assertEquals("There should have been read metrics only for one table: " + tableName, 1, numTables);
+    }
+
+    private void assertMutationMetrics(String tableName, int numRows, Map<String, Map<String, Long>> mutationMetrics) {
+        assertTrue("No mutation metrics present when there should have been", mutationMetrics.size() > 0);
+        for (Entry<String, Map<String, Long>> entry : mutationMetrics.entrySet()) {
+            String t = entry.getKey();
+            assertEquals("Table name didn't match for mutation metrics", tableName, t);
+            Map<String, Long> p = entry.getValue();
+            assertEquals("There should have been three metrics", 3, p.size());
+            for (Entry<String, Long> metric : p.entrySet()) {
+                String metricName = metric.getKey();
+                long metricValue = metric.getValue();
+                if (metricName.equals(MetricType.MUTATION_BATCH_SIZE.name())) {
+                    assertEquals("Mutation batch sizes didn't match!", numRows, metricValue);
+                } else if (metricName.equals(MetricType.MUTATION_COMMIT_TIME.name())) {
+                    assertTrue("Mutation commit time should be greater than zero", metricValue > 0);
+                } else if (metricName.equals(MetricType.MUTATION_BYTES.name())) {
+                    assertTrue("Mutation bytes size should be greater than zero", metricValue > 0);
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index 9718709..9ad9ef5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.cache;
 
+import static org.apache.phoenix.monitoring.TaskExecutionMetricsHolder.NO_OP_INSTANCE;
 import static org.apache.phoenix.util.LogUtil.addCustomAnnotations;
 
 import java.io.Closeable;
@@ -57,6 +58,7 @@ import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachin
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -226,6 +228,11 @@ public class ServerCacheClient {
                         public Object getJobId() {
                             return ServerCacheClient.this;
                         }
+                        
+                        @Override
+                        public TaskExecutionMetricsHolder getTaskExecutionMetric() {
+                            return NO_OP_INSTANCE;
+                        }
                     }));
                 } else {
                     if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("NOT adding cache entry to be sent for " + entry + " since one already exists for that entry", connection));}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 575f0f3..a28f614 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -94,8 +94,9 @@ public class DeleteCompiler {
         this.statement = statement;
     }
     
-    private static MutationState deleteRows(PhoenixStatement statement, TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException {
+    private static MutationState deleteRows(StatementContext childContext, TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException {
         PTable table = targetTableRef.getTable();
+        PhoenixStatement statement = childContext.getStatement();
         PhoenixConnection connection = statement.getConnection();
         PName tenantId = connection.getTenantId();
         byte[] tenantIdBytes = null;
@@ -114,19 +115,18 @@ public class DeleteCompiler {
         if (indexTableRef != null) {
             indexMutations = Maps.newHashMapWithExpectedSize(batchSize);
         }
-        try {
-            List<PColumn> pkColumns = table.getPKColumns();
-            boolean isMultiTenant = table.isMultiTenant() && tenantIdBytes != null;
-            boolean isSharedViewIndex = table.getViewIndexId() != null;
-            int offset = (table.getBucketNum() == null ? 0 : 1);
-            byte[][] values = new byte[pkColumns.size()][];
-            if (isMultiTenant) {
-                values[offset++] = tenantIdBytes;
-            }
-            if (isSharedViewIndex) {
-                values[offset++] = MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId());
-            }
-            PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, statement);
+        List<PColumn> pkColumns = table.getPKColumns();
+        boolean isMultiTenant = table.isMultiTenant() && tenantIdBytes != null;
+        boolean isSharedViewIndex = table.getViewIndexId() != null;
+        int offset = (table.getBucketNum() == null ? 0 : 1);
+        byte[][] values = new byte[pkColumns.size()][];
+        if (isMultiTenant) {
+            values[offset++] = tenantIdBytes;
+        }
+        if (isSharedViewIndex) {
+            values[offset++] = MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId());
+        }
+        try (PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
             int rowCount = 0;
             while (rs.next()) {
                 ImmutableBytesPtr ptr = new ImmutableBytesPtr();  // allocate new as this is a key in a Map
@@ -183,8 +183,6 @@ public class DeleteCompiler {
                 state.join(indexState);
             }
             return state;
-        } finally {
-            iterator.close();
         }
     }
     
@@ -199,9 +197,16 @@ public class DeleteCompiler {
         }
         
         @Override
-        protected MutationState mutate(StatementContext context, ResultIterator iterator, PhoenixConnection connection) throws SQLException {
+        protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException {
             PhoenixStatement statement = new PhoenixStatement(connection);
-            return deleteRows(statement, targetTableRef, indexTableRef, iterator, projector, sourceTableRef);
+            /*
+             * We don't want to collect any read metrics within the child context. This is because any read metrics that
+             * need to be captured are already getting collected in the parent statement context enclosed in the result
+             * iterator being used for reading rows out.
+             */
+            StatementContext ctx = new StatementContext(statement, false);
+            MutationState state = deleteRows(ctx, targetTableRef, indexTableRef, iterator, projector, sourceTableRef);
+            return state;
         }
         
         public void setTargetTableRef(TableRef tableRef) {
@@ -559,9 +564,14 @@ public class DeleteCompiler {
                             }
                             // Return total number of rows that have been delete. In the case of auto commit being off
                             // the mutations will all be in the mutation state of the current connection.
-                            return new MutationState(maxSize, connection, totalRowCount);
+                            MutationState state = new MutationState(maxSize, connection, totalRowCount);
+                            
+                            // set the read metrics accumulated in the parent context so that it can be published when the mutations are committed.
+                            state.setReadMetricQueue(plan.getContext().getReadMetricsQueue());
+                            
+                            return state;
                         } else {
-                            return deleteRows(statement, tableRef, deleteFromImmutableIndexToo ? plan.getTableRef() : null, iterator, plan.getProjector(), plan.getTableRef());
+                            return deleteRows(plan.getContext(), tableRef, deleteFromImmutableIndexToo ? plan.getTableRef() : null, iterator, plan.getProjector(), plan.getTableRef());
                         }
                     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index bcac17d..630760c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@ -35,9 +35,9 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.KeyValueUtil;
 
 /**
@@ -53,21 +53,34 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
     /**
      * Method that does the actual mutation work
      */
-    abstract protected MutationState mutate(StatementContext context, ResultIterator iterator, PhoenixConnection connection) throws SQLException;
+    abstract protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException;
     
     @Override
-    public PeekingResultIterator newIterator(StatementContext context, ResultIterator iterator, Scan scan) throws SQLException {
-        final PhoenixConnection connection = new PhoenixConnection(this.connection);
-        MutationState state = mutate(context, iterator, connection);
+    public PeekingResultIterator newIterator(final StatementContext parentContext, ResultIterator iterator, Scan scan, String tableName) throws SQLException {
+        final PhoenixConnection clonedConnection = new PhoenixConnection(this.connection);
+        
+        MutationState state = mutate(parentContext, iterator, clonedConnection);
+        
         long totalRowCount = state.getUpdateCount();
-        if (connection.getAutoCommit()) {
-            connection.getMutationState().join(state);
-            connection.commit();
-            ConnectionQueryServices services = connection.getQueryServices();
-            int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
-            state = new MutationState(maxSize, connection, totalRowCount);
+        if (clonedConnection.getAutoCommit()) {
+            clonedConnection.getMutationState().join(state);
+            clonedConnection.commit();
+            ConnectionQueryServices services = clonedConnection.getQueryServices();
+            int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+            /*
+             * Everything that was mutated as part of the clonedConnection has been committed. However, we want to
+             * report the mutation work done using this clonedConnection as part of the overall mutation work of the
+             * parent connection. So we need to set those metrics in the empty mutation state so that they could be
+             * combined with the parent connection's mutation metrics (as part of combining mutation state) in the
+             * close() method of the iterator being returned. Don't combine the read metrics in parent context yet
+             * though because they are possibly being concurrently modified by other threads at this stage. Instead we
+             * will get hold of the read metrics when all the mutating iterators are done.
+             */
+            state = MutationState.emptyMutationState(maxSize, clonedConnection);
+            state.getMutationMetricQueue().combineMetricQueues(clonedConnection.getMutationState().getMutationMetricQueue());
         }
         final MutationState finalState = state;
+        
         byte[] value = PLong.INSTANCE.toBytes(totalRowCount);
         KeyValue keyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
         final Tuple tuple = new SingleKeyValueTuple(keyValue);
@@ -90,13 +103,17 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
             @Override
             public void close() throws SQLException {
                 try {
-                    // Join the child mutation states in close, since this is called in a single threaded manner
-                    // after the parallel results have been processed.
-                    if (!connection.getAutoCommit()) {
-                        MutatingParallelIteratorFactory.this.connection.getMutationState().join(finalState);
-                    }
+                    /* 
+                     * Join the child mutation states in close, since this is called in a single threaded manner
+                     * after the parallel results have been processed. 
+                     * If auto-commit is on for the cloned child connection, then the finalState here is an empty mutation 
+                     * state (with no mutations). However, it still has the metrics for mutation work done by the 
+                     * mutating-iterator. Joining the mutation state makes sure those metrics are passed over
+                     * to the parent connection.
+                     */ 
+                    MutatingParallelIteratorFactory.this.connection.getMutationState().join(finalState);
                 } finally {
-                    connection.close();
+                    clonedConnection.close();
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index d726488..52bb7f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.monitoring.OverAllQueryMetrics;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
@@ -41,6 +43,7 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.NumberUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
 
 import com.google.common.collect.Maps;
 
@@ -80,10 +83,19 @@ public class StatementContext {
     private TimeRange scanTimeRange = null;
 
     private Map<SelectStatement, Object> subqueryResults;
-
+    private final ReadMetricQueue readMetricsQueue;
+    private final OverAllQueryMetrics overAllQueryMetrics;
+    
     public StatementContext(PhoenixStatement statement) {
         this(statement, new Scan());
     }
+    
+    /**
+     *  Constructor that lets you override whether or not to collect request level metrics.
+     */
+    public StatementContext(PhoenixStatement statement, boolean collectRequestLevelMetrics) {
+        this(statement, FromCompiler.EMPTY_TABLE_RESOLVER, new Scan(), new SequenceManager(statement), collectRequestLevelMetrics);
+    }
 
     public StatementContext(PhoenixStatement statement, Scan scan) {
         this(statement, FromCompiler.EMPTY_TABLE_RESOLVER, new Scan(), new SequenceManager(statement));
@@ -94,6 +106,10 @@ public class StatementContext {
     }
 
     public StatementContext(PhoenixStatement statement, ColumnResolver resolver, Scan scan, SequenceManager seqManager) {
+        this(statement, resolver, scan, seqManager, statement.getConnection().isRequestLevelMetricsEnabled());
+    }
+    
+    public StatementContext(PhoenixStatement statement, ColumnResolver resolver, Scan scan, SequenceManager seqManager, boolean isRequestMetricsEnabled) {
         this.statement = statement;
         this.resolver = resolver;
         this.scan = scan;
@@ -102,20 +118,24 @@ public class StatementContext {
         this.aggregates = new AggregationManager();
         this.expressions = new ExpressionManager();
         PhoenixConnection connection = statement.getConnection();
-        this.dateFormat = connection.getQueryServices().getProps().get(QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT);
+        ReadOnlyProps props = connection.getQueryServices().getProps();
+        this.dateFormat = props.get(QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT);
         this.dateFormatter = DateUtil.getDateFormatter(dateFormat);
-        this.timeFormat = connection.getQueryServices().getProps().get(QueryServices.TIME_FORMAT_ATTRIB, DateUtil.DEFAULT_TIME_FORMAT);
+        this.timeFormat = props.get(QueryServices.TIME_FORMAT_ATTRIB, DateUtil.DEFAULT_TIME_FORMAT);
         this.timeFormatter = DateUtil.getTimeFormatter(timeFormat);
-        this.timestampFormat = connection.getQueryServices().getProps().get(QueryServices.TIMESTAMP_FORMAT_ATTRIB, DateUtil.DEFAULT_TIMESTAMP_FORMAT);
+        this.timestampFormat = props.get(QueryServices.TIMESTAMP_FORMAT_ATTRIB, DateUtil.DEFAULT_TIMESTAMP_FORMAT);
         this.timestampFormatter = DateUtil.getTimestampFormatter(timestampFormat);
-        this.dateFormatTimeZone = TimeZone.getTimeZone(
-                connection.getQueryServices().getProps().get(QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB, DateUtil.DEFAULT_TIME_ZONE_ID));
-        this.numberFormat = connection.getQueryServices().getProps().get(QueryServices.NUMBER_FORMAT_ATTRIB, NumberUtil.DEFAULT_NUMBER_FORMAT);
+        this.dateFormatTimeZone = TimeZone.getTimeZone(props.get(QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB,
+                DateUtil.DEFAULT_TIME_ZONE_ID));
+        this.numberFormat = props.get(QueryServices.NUMBER_FORMAT_ATTRIB, NumberUtil.DEFAULT_NUMBER_FORMAT);
         this.tempPtr = new ImmutableBytesWritable();
         this.currentTable = resolver != null && !resolver.getTables().isEmpty() ? resolver.getTables().get(0) : null;
-        this.whereConditionColumns = new ArrayList<Pair<byte[],byte[]>>();
-        this.dataColumns = this.currentTable == null ? Collections.<PColumn, Integer>emptyMap() : Maps.<PColumn, Integer>newLinkedHashMap();
-        this.subqueryResults = Maps.<SelectStatement, Object>newHashMap();
+        this.whereConditionColumns = new ArrayList<Pair<byte[], byte[]>>();
+        this.dataColumns = this.currentTable == null ? Collections.<PColumn, Integer> emptyMap() : Maps
+                .<PColumn, Integer> newLinkedHashMap();
+        this.subqueryResults = Maps.<SelectStatement, Object> newHashMap();
+        this.readMetricsQueue = new ReadMetricQueue(isRequestMetricsEnabled);
+        this.overAllQueryMetrics = new OverAllQueryMetrics(isRequestMetricsEnabled);
     }
 
     /**
@@ -285,4 +305,13 @@ public class StatementContext {
     public void setSubqueryResult(SelectStatement select, Object result) {
         subqueryResults.put(select, result);
     }
+    
+    public ReadMetricQueue getReadMetricsQueue() {
+        return readMetricsQueue;
+    }
+    
+    public OverAllQueryMetrics getOverallQueryMetrics() {
+        return overAllQueryMetrics;
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 2b35d4f..7b39a28 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -118,43 +118,40 @@ public class UpsertCompiler {
         mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter()));
     }
 
-    private static MutationState upsertSelect(PhoenixStatement statement, 
-            TableRef tableRef, RowProjector projector, ResultIterator iterator, int[] columnIndexes,
-            int[] pkSlotIndexes) throws SQLException {
-        try {
-            PhoenixConnection connection = statement.getConnection();
-            ConnectionQueryServices services = connection.getQueryServices();
-            int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
-            int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
-            boolean isAutoCommit = connection.getAutoCommit();
-            byte[][] values = new byte[columnIndexes.length][];
-            int rowCount = 0;
-            Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
-            PTable table = tableRef.getTable();
-            ResultSet rs = new PhoenixResultSet(iterator, projector, statement);
+    private static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector,
+            ResultIterator iterator, int[] columnIndexes, int[] pkSlotIndexes) throws SQLException {
+        PhoenixStatement statement = childContext.getStatement();
+        PhoenixConnection connection = statement.getConnection();
+        ConnectionQueryServices services = connection.getQueryServices();
+        int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
+                QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+        int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
+        boolean isAutoCommit = connection.getAutoCommit();
+        byte[][] values = new byte[columnIndexes.length][];
+        int rowCount = 0;
+        Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
+        PTable table = tableRef.getTable();
+        try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
             ImmutableBytesWritable ptr = new ImmutableBytesWritable();
             while (rs.next()) {
                 for (int i = 0; i < values.length; i++) {
                     PColumn column = table.getColumns().get(columnIndexes[i]);
-                    byte[] bytes = rs.getBytes(i+1);
+                    byte[] bytes = rs.getBytes(i + 1);
                     ptr.set(bytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : bytes);
-                    Object value = rs.getObject(i+1);
-                    int rsPrecision = rs.getMetaData().getPrecision(i+1);
+                    Object value = rs.getObject(i + 1);
+                    int rsPrecision = rs.getMetaData().getPrecision(i + 1);
                     Integer precision = rsPrecision == 0 ? null : rsPrecision;
-                    int rsScale = rs.getMetaData().getScale(i+1);
+                    int rsScale = rs.getMetaData().getScale(i + 1);
                     Integer scale = rsScale == 0 ? null : rsScale;
                     // We are guaranteed that the two column will have compatible types,
                     // as we checked that before.
-                    if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(),
-                            precision, scale,
-                            column.getMaxLength(),column.getScale())) {
-                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY)
-                            .setColumnName(column.getName().getString())
-                            .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build().buildException();
-                    }
-                    column.getDataType().coerceBytes(ptr, value, column.getDataType(),
-                            precision, scale, SortOrder.getDefault(),
-                            column.getMaxLength(), column.getScale(), column.getSortOrder());
+                    if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), precision, scale,
+                            column.getMaxLength(), column.getScale())) { throw new SQLExceptionInfo.Builder(
+                            SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
+                            .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build()
+                            .buildException(); }
+                    column.getDataType().coerceBytes(ptr, value, column.getDataType(), precision, scale,
+                            SortOrder.getDefault(), column.getMaxLength(), column.getScale(), column.getSortOrder());
                     values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
                 }
                 setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement);
@@ -169,8 +166,6 @@ public class UpsertCompiler {
             }
             // If auto commit is true, this last batch will be committed upon return
             return new MutationState(tableRef, mutation, rowCount / batchSize * batchSize, maxSize, connection);
-        } finally {
-            iterator.close();
         }
     }
 
@@ -186,14 +181,21 @@ public class UpsertCompiler {
         }
 
         @Override
-        protected MutationState mutate(StatementContext context, ResultIterator iterator, PhoenixConnection connection) throws SQLException {
-            if (context.getSequenceManager().getSequenceCount() > 0) {
+        protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException {
+            if (parentContext.getSequenceManager().getSequenceCount() > 0) {
                 throw new IllegalStateException("Cannot pipeline upsert when sequence is referenced");
             }
             PhoenixStatement statement = new PhoenixStatement(connection);
+            /*
+             * We don't want to collect any read metrics within the child context. This is because any read metrics that
+             * need to be captured are already getting collected in the parent statement context enclosed in the result
+             * iterator being used for reading rows out.
+             */
+            StatementContext childContext = new StatementContext(statement, false);
             // Clone the row projector as it's not thread safe and would be used simultaneously by
             // multiple threads otherwise.
-            return upsertSelect(statement, tableRef, projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes);
+            MutationState state = upsertSelect(childContext, tableRef, projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes);
+            return state;
         }
         
         public void setRowProjector(RowProjector projector) {
@@ -669,7 +671,7 @@ public class UpsertCompiler {
                 public MutationState execute() throws SQLException {
                     ResultIterator iterator = queryPlan.iterator();
                     if (parallelIteratorFactory == null) {
-                        return upsertSelect(statement, tableRef, projector, iterator, columnIndexes, pkSlotIndexes);
+                        return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes);
                     }
                     try {
                         parallelIteratorFactory.setRowProjector(projector);
@@ -677,13 +679,21 @@ public class UpsertCompiler {
                         parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
                         Tuple tuple;
                         long totalRowCount = 0;
+                        StatementContext context = queryPlan.getContext();
                         while ((tuple=iterator.next()) != null) {// Runs query
                             Cell kv = tuple.getValue(0);
                             totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
                         }
                         // Return total number of rows that have been updated. In the case of auto commit being off
                         // the mutations will all be in the mutation state of the current connection.
-                        return new MutationState(maxSize, statement.getConnection(), totalRowCount);
+                        MutationState mutationState = new MutationState(maxSize, statement.getConnection(), totalRowCount);
+                        /*
+                         *  All the metrics collected for measuring the reads done by the parallel mutating iterators
+                         *  is included in the ReadMetricHolder of the statement context. Include these metrics in the
+                         *  returned mutation state so they can be published on commit. 
+                         */
+                        mutationState.setReadMetricQueue(context.getReadMetricsQueue());
+                        return mutationState; 
                     } finally {
                         iterator.close();
                     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index ba137f8..00e843d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -102,7 +102,7 @@ public class AggregatePlan extends BaseQueryPlan {
             this.services = services;
         }
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException {
+        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException {
             Expression expression = RowKeyExpression.INSTANCE;
             OrderByExpression orderByExpression = new OrderByExpression(expression, false, true);
             int threshold = services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
@@ -119,9 +119,9 @@ public class AggregatePlan extends BaseQueryPlan {
             this.outerFactory = outerFactory;
         }
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException {
-            PeekingResultIterator iterator = innerFactory.newIterator(context, scanner, scan);
-            return outerFactory.newIterator(context, iterator, scan);
+        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException {
+            PeekingResultIterator iterator = innerFactory.newIterator(context, scanner, scan, tableName);
+            return outerFactory.newIterator(context, iterator, scan, tableName);
         }
     }
 


[3/4] phoenix git commit: PHOENIX-1819 Build a framework to capture and report phoenix client side request level metrics

Posted by sa...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 857a952..57fa25a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.execute;
 
+import static org.apache.phoenix.monitoring.TaskExecutionMetricsHolder.NO_OP_INSTANCE;
 import static org.apache.phoenix.util.LogUtil.addCustomAnnotations;
 
 import java.sql.SQLException;
@@ -54,6 +55,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.join.HashCacheClient;
 import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SQLParser;
@@ -140,6 +142,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
                 public Object getJobId() {
                     return HashJoinPlan.this;
                 }
+
+                @Override
+                public TaskExecutionMetricsHolder getTaskExecutionMetric() {
+                    return NO_OP_INSTANCE;
+                }
             }));
         }
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 9ffa135..0de7aa3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -17,6 +17,10 @@
  */
 package org.apache.phoenix.execute;
 
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_SIZE;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_COMMIT_TIME;
+
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Arrays;
@@ -39,7 +43,11 @@ import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.IndexMetaDataCacheClient;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.monitoring.PhoenixMetrics;
+import org.apache.phoenix.monitoring.GlobalClientMetrics;
+import org.apache.phoenix.monitoring.MutationMetricQueue;
+import org.apache.phoenix.monitoring.MutationMetricQueue.MutationMetric;
+import org.apache.phoenix.monitoring.MutationMetricQueue.NoOpMutationMetricsQueue;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.MetaDataClient;
@@ -65,9 +73,6 @@ import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.sun.istack.NotNull;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BYTES;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BATCH_SIZE;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_COMMIT_TIME;
 
 /**
  * 
@@ -85,11 +90,17 @@ public class MutationState implements SQLCloseable {
     private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations;
     private long sizeOffset;
     private int numRows = 0;
+    private final MutationMetricQueue mutationMetricQueue;
+    private ReadMetricQueue readMetricQueue;
     
-    MutationState(long maxSize, PhoenixConnection connection, Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations) {
+    MutationState(long maxSize, PhoenixConnection connection,
+            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations) {
         this.maxSize = maxSize;
         this.connection = connection;
         this.mutations = mutations;
+        boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
+        this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue()
+                : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
     }
 
     public MutationState(long maxSize, PhoenixConnection connection) {
@@ -108,6 +119,12 @@ public class MutationState implements SQLCloseable {
         throwIfTooBig();
     }
     
+    public static MutationState emptyMutationState(long maxSize, PhoenixConnection connection) {
+        MutationState state = new MutationState(maxSize, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap());
+        state.sizeOffset = 0;
+        return state;
+    }
+    
     private void throwIfTooBig() {
         if (numRows > maxSize) {
             // TODO: throw SQLException ?
@@ -120,17 +137,18 @@ public class MutationState implements SQLCloseable {
     }
     
     /**
-     * Combine a newer mutation with this one, where in the event of overlaps,
-     * the newer one will take precedence.
-     * @param newMutation the newer mutation
+     * Combine a newer mutation with this one, where in the event of overlaps, the newer one will take precedence.
+     * Combine any metrics collected for the newer mutation.
+     * 
+     * @param newMutationState the newer mutation state
      */
-    public void join(MutationState newMutation) {
-        if (this == newMutation) { // Doesn't make sense
+    public void join(MutationState newMutationState) {
+        if (this == newMutationState) { // Doesn't make sense
             return;
         }
-        this.sizeOffset += newMutation.sizeOffset;
+        this.sizeOffset += newMutationState.sizeOffset;
         // Merge newMutation with this one, keeping state from newMutation for any overlaps
-        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : newMutation.mutations.entrySet()) {
+        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : newMutationState.mutations.entrySet()) {
             // Replace existing entries for the table with new entries
             TableRef tableRef = entry.getKey();
             PTable table = tableRef.getTable();
@@ -168,6 +186,12 @@ public class MutationState implements SQLCloseable {
                 }
             }
         }
+        mutationMetricQueue.combineMetricQueues(newMutationState.mutationMetricQueue);
+        if (readMetricQueue == null) {
+            readMetricQueue = newMutationState.readMetricQueue;
+        } else if (readMetricQueue != null && newMutationState.readMetricQueue != null) {
+            readMetricQueue.combineReadMetrics(newMutationState.readMetricQueue);
+        }
         throwIfTooBig();
     }
     
@@ -332,18 +356,15 @@ public class MutationState implements SQLCloseable {
         return timeStamps;
     }
     
-    private static void logMutationSize(HTableInterface htable, List<Mutation> mutations, PhoenixConnection connection) {
+    private static long calculateMutationSize(List<Mutation> mutations) {
         long byteSize = 0;
-        int keyValueCount = 0;
-        if (PhoenixMetrics.isMetricsEnabled() || logger.isDebugEnabled()) {
+        if (GlobalClientMetrics.isMetricsEnabled()) {
             for (Mutation mutation : mutations) {
                 byteSize += mutation.heapSize();
             }
-            MUTATION_BYTES.update(byteSize);
-            if (logger.isDebugEnabled()) {
-                logger.debug(LogUtil.addCustomAnnotations("Sending " + mutations.size() + " mutations for " + Bytes.toString(htable.getTableName()) + " with " + keyValueCount + " key values of total size " + byteSize + " bytes", connection));
-            }
         }
+        GLOBAL_MUTATION_BYTES.update(byteSize);
+        return byteSize;
     }
     
     @SuppressWarnings("deprecation")
@@ -352,126 +373,134 @@ public class MutationState implements SQLCloseable {
         byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
         long[] serverTimeStamps = validate();
         Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator();
-
         // add tracing for this operation
-        TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables");
-        Span span = trace.getSpan();
-        while (iterator.hasNext()) {
-            Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry = iterator.next();
-            Map<ImmutableBytesPtr,RowMutationState> valuesMap = entry.getValue();
-            TableRef tableRef = entry.getKey();
-            PTable table = tableRef.getTable();
-            table.getIndexMaintainers(tempPtr, connection);
-            boolean hasIndexMaintainers = tempPtr.getLength() > 0;
-            boolean isDataTable = true;
-            long serverTimestamp = serverTimeStamps[i++];
-            Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false);
-            while (mutationsIterator.hasNext()) {
-                Pair<byte[],List<Mutation>> pair = mutationsIterator.next();
-                byte[] htableName = pair.getFirst();
-                List<Mutation> mutations = pair.getSecond();
-                
-                //create a span per target table
-                //TODO maybe we can be smarter about the table name to string here?
-                Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName));
+        try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) {
+            Span span = trace.getSpan();
+            while (iterator.hasNext()) {
+                Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry = iterator.next();
+                // at this point we are going through mutations for each table
 
-                int retryCount = 0;
-                boolean shouldRetry = false;
-                do {
-                    ServerCache cache = null;
-                    if (hasIndexMaintainers && isDataTable) {
-                        byte[] attribValue = null;
-                        byte[] uuidValue;
-                        if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength())) {
-                            IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
-                            cache = client.addIndexMetadataCache(mutations, tempPtr);
-                            child.addTimelineAnnotation("Updated index metadata cache");
-                            uuidValue = cache.getId();
-                            // If we haven't retried yet, retry for this case only, as it's possible that
-                            // a split will occur after we send the index metadata cache to all known
-                            // region servers.
-                            shouldRetry = true;
-                        } else {
-                            attribValue = ByteUtil.copyKeyBytesIfNecessary(tempPtr);
-                            uuidValue = ServerCacheClient.generateId();
-                        }
-                        // Either set the UUID to be able to access the index metadata from the cache
-                        // or set the index metadata directly on the Mutation
-                        for (Mutation mutation : mutations) {
-                            if (tenantId != null) {
-                                mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
-                            }
-                            mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                            if (attribValue != null) {
-                                mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
-                            }
-                        }
-                    }
-                    
-                    SQLException sqlE = null;
-                    HTableInterface hTable = connection.getQueryServices().getTable(htableName);
-                    try {
-                        logMutationSize(hTable, mutations, connection);
-                        MUTATION_BATCH_SIZE.update(mutations.size());
-                        long startTime = System.currentTimeMillis();
-                        child.addTimelineAnnotation("Attempt " + retryCount);
-                        hTable.batch(mutations);
-                        child.stop();
-                        long duration = System.currentTimeMillis() - startTime;
-                        MUTATION_COMMIT_TIME.update(duration);
-                        shouldRetry = false;
-                        if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Total time for batch call of  " + mutations.size() + " mutations into " + table.getName().getString() + ": " + duration + " ms", connection));
-                    } catch (Exception e) {
-                        SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
-                        if (inferredE != null) {
-                            if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
-                                // Swallow this exception once, as it's possible that we split after sending the index metadata
-                                // and one of the region servers doesn't have it. This will cause it to have it the next go around.
-                                // If it fails again, we don't retry.
-                                String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE;
-                                logger.warn(LogUtil.addCustomAnnotations(msg, connection));
-                                connection.getQueryServices().clearTableRegionCache(htableName);
+                Map<ImmutableBytesPtr,RowMutationState> valuesMap = entry.getValue();
+                // above is mutations for a table where the first part is the row key and the second part is column values.
 
-                                // add a new child span as this one failed
-                                child.addTimelineAnnotation(msg);
-                                child.stop();
-                                child = Tracing.child(span,"Failed batch, attempting retry");
+                TableRef tableRef = entry.getKey();
+                PTable table = tableRef.getTable();
+                table.getIndexMaintainers(tempPtr, connection);
+                boolean hasIndexMaintainers = tempPtr.getLength() > 0;
+                boolean isDataTable = true;
+                long serverTimestamp = serverTimeStamps[i++];
+                Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false);
+                // above returns an iterator of pair where the first  
+                while (mutationsIterator.hasNext()) {
+                    Pair<byte[],List<Mutation>> pair = mutationsIterator.next();
+                    byte[] htableName = pair.getFirst();
+                    List<Mutation> mutations = pair.getSecond();
 
-                                continue;
+                    //create a span per target table
+                    //TODO maybe we can be smarter about the table name to string here?
+                    Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName));
+
+                    int retryCount = 0;
+                    boolean shouldRetry = false;
+                    do {
+                        ServerCache cache = null;
+                        if (hasIndexMaintainers && isDataTable) {
+                            byte[] attribValue = null;
+                            byte[] uuidValue;
+                            if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength())) {
+                                IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
+                                cache = client.addIndexMetadataCache(mutations, tempPtr);
+                                child.addTimelineAnnotation("Updated index metadata cache");
+                                uuidValue = cache.getId();
+                                // If we haven't retried yet, retry for this case only, as it's possible that
+                                // a split will occur after we send the index metadata cache to all known
+                                // region servers.
+                                shouldRetry = true;
+                            } else {
+                                attribValue = ByteUtil.copyKeyBytesIfNecessary(tempPtr);
+                                uuidValue = ServerCacheClient.generateId();
+                            }
+                            // Either set the UUID to be able to access the index metadata from the cache
+                            // or set the index metadata directly on the Mutation
+                            for (Mutation mutation : mutations) {
+                                if (tenantId != null) {
+                                    mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+                                }
+                                mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+                                if (attribValue != null) {
+                                    mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+                                }
                             }
-                            e = inferredE;
                         }
-                        sqlE = new CommitException(e, getUncommittedSattementIndexes());
-                    } finally {
+
+                        SQLException sqlE = null;
+                        HTableInterface hTable = connection.getQueryServices().getTable(htableName);
                         try {
-                            hTable.close();
-                        } catch (IOException e) {
-                            if (sqlE != null) {
-                                sqlE.setNextException(ServerUtil.parseServerException(e));
-                            } else {
-                                sqlE = ServerUtil.parseServerException(e);
+                            long numMutations = mutations.size();
+                            GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
+                            
+                            long startTime = System.currentTimeMillis();
+                            child.addTimelineAnnotation("Attempt " + retryCount);
+                            hTable.batch(mutations);
+                            child.stop();
+                            shouldRetry = false;
+                            long mutationCommitTime = System.currentTimeMillis() - startTime;
+                            GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
+                            
+                            long mutationSizeBytes = calculateMutationSize(mutations);
+                            MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime);
+                            mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric);
+                        } catch (Exception e) {
+                            SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
+                            if (inferredE != null) {
+                                if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
+                                    // Swallow this exception once, as it's possible that we split after sending the index metadata
+                                    // and one of the region servers doesn't have it. This will cause it to have it the next go around.
+                                    // If it fails again, we don't retry.
+                                    String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE;
+                                    logger.warn(LogUtil.addCustomAnnotations(msg, connection));
+                                    connection.getQueryServices().clearTableRegionCache(htableName);
+
+                                    // add a new child span as this one failed
+                                    child.addTimelineAnnotation(msg);
+                                    child.stop();
+                                    child = Tracing.child(span,"Failed batch, attempting retry");
+
+                                    continue;
+                                }
+                                e = inferredE;
                             }
+                            sqlE = new CommitException(e, getUncommittedStatementIndexes());
                         } finally {
                             try {
-                                if (cache != null) {
-                                    cache.close();
+                                hTable.close();
+                            } catch (IOException e) {
+                                if (sqlE != null) {
+                                    sqlE.setNextException(ServerUtil.parseServerException(e));
+                                } else {
+                                    sqlE = ServerUtil.parseServerException(e);
                                 }
                             } finally {
-                                if (sqlE != null) {
-                                    throw sqlE;
+                                try {
+                                    if (cache != null) {
+                                        cache.close();
+                                    }
+                                } finally {
+                                    if (sqlE != null) {
+                                        throw sqlE;
+                                    }
                                 }
                             }
                         }
-                    }
-                } while (shouldRetry && retryCount++ < 1);
-                isDataTable = false;
-            }
-            if (tableRef.getTable().getType() != PTableType.INDEX) {
-                numRows -= entry.getValue().size();
+                    } while (shouldRetry && retryCount++ < 1);
+                    isDataTable = false;
+                }
+                if (tableRef.getTable().getType() != PTableType.INDEX) {
+                    numRows -= entry.getValue().size();
+                }
+                iterator.remove(); // Remove batches as we process them
             }
-            iterator.remove(); // Remove batches as we process them
         }
-        trace.close();
         assert(numRows==0);
         assert(this.mutations.isEmpty());
     }
@@ -481,7 +510,7 @@ public class MutationState implements SQLCloseable {
         numRows = 0;
     }
     
-    private int[] getUncommittedSattementIndexes() {
+    private int[] getUncommittedStatementIndexes() {
     	int[] result = new int[0];
     	for (Map<ImmutableBytesPtr, RowMutationState> rowMutations : mutations.values()) {
     		for (RowMutationState rowMutationState : rowMutations.values()) {
@@ -533,12 +562,23 @@ public class MutationState implements SQLCloseable {
         int[] getStatementIndexes() {
             return statementIndexes;
         }
-        
+
         void join(RowMutationState newRow) {
             getColumnValues().putAll(newRow.getColumnValues());
             statementIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes());
         }
-        
+    }
+    
+    public ReadMetricQueue getReadMetricQueue() {
+        return readMetricQueue;
+    }
 
+    public void setReadMetricQueue(ReadMetricQueue readMetricQueue) {
+        this.readMetricQueue = readMetricQueue;
     }
+
+    public MutationMetricQueue getMutationMetricQueue() {
+        return mutationMetricQueue;
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index 031b58b..2bed3a0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -49,7 +49,7 @@ public class UnionPlan implements QueryPlan {
     private final FilterableStatement statement;
     private final ParameterMetaData paramMetaData;
     private final OrderBy orderBy;
-    private final StatementContext context;
+    private final StatementContext parentContext;
     private final Integer limit;
     private final GroupBy groupBy;
     private final RowProjector projector;
@@ -59,7 +59,7 @@ public class UnionPlan implements QueryPlan {
 
     public UnionPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector,
             Integer limit, OrderBy orderBy, GroupBy groupBy, List<QueryPlan> plans, ParameterMetaData paramMetaData) throws SQLException {
-        this.context = context;
+        this.parentContext = context;
         this.statement = statement;
         this.tableRef = table;
         this.projector = projector;
@@ -128,7 +128,7 @@ public class UnionPlan implements QueryPlan {
     }
 
     public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies) throws SQLException {
-        this.iterators = new UnionResultIterators(plans);
+        this.iterators = new UnionResultIterators(plans, parentContext);
         ResultIterator scanner;      
         boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
 
@@ -175,7 +175,7 @@ public class UnionPlan implements QueryPlan {
 
     @Override
     public StatementContext getContext() {
-        return context;
+        return parentContext;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 6a3847b..43731cb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -18,8 +18,8 @@
 package org.apache.phoenix.iterate;
 
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.FAILED_QUERY;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.QUERY_TIMEOUT;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
 import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
 
 import java.sql.SQLException;
@@ -540,12 +540,13 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                     } catch (ExecutionException e) {
                         try { // Rethrow as SQLException
                             throw ServerUtil.parseServerException(e);
-                        } catch (StaleRegionBoundaryCacheException e2) { 
+                        } catch (StaleRegionBoundaryCacheException e2) {
                             // Catch only to try to recover from region boundary cache being out of date
                             List<List<Pair<Scan,Future<PeekingResultIterator>>>> newFutures = Lists.newArrayListWithExpectedSize(2);
                             if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
                                 services.clearTableRegionCache(physicalTableName);
                                 clearedCache = true;
+                                context.getOverallQueryMetrics().cacheRefreshedDueToSplits();
                             }
                             // Resubmit just this portion of work again
                             Scan oldScan = scanPair.getFirst();
@@ -582,7 +583,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
             success = true;
             return iterators;
         } catch (TimeoutException e) {
-            QUERY_TIMEOUT.increment();
+            context.getOverallQueryMetrics().queryTimedOut();
+            GLOBAL_QUERY_TIMEOUT_COUNTER.increment();
             // thrown when a thread times out waiting for the future.get() call to return
             toThrow = new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT)
                     .setMessage(". Query couldn't be completed in the alloted time: " + queryTimeOut + " ms")
@@ -616,7 +618,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                 }
             } finally {
                 if (toThrow != null) {
-                    FAILED_QUERY.increment();
+                    GLOBAL_FAILED_QUERY_COUNTER.increment();
+                    context.getOverallQueryMetrics().queryFailed();
                     throw toThrow;
                 }
             }
@@ -639,7 +642,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                         if (futurePair != null) {
                             Future<PeekingResultIterator> future = futurePair.getSecond();
                             if (future != null) {
-                                cancelledWork |= future.cancel(false);
+                                future.cancel(false);
                             }
                         }
                     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index e1ee8db..f272e55 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -19,6 +19,7 @@
 package org.apache.phoenix.iterate;
 
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.STARTKEY_OFFSET;
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
 
 import java.sql.SQLException;
 import java.util.List;
@@ -66,18 +67,17 @@ public class ChunkedResultIterator implements PeekingResultIterator {
         }
 
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException {
-            scanner.close(); //close the iterator since we don't need it anymore.
+        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException {
             if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("ChunkedResultIteratorFactory.newIterator over " + tableRef.getTable().getName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan)));
             return new ChunkedResultIterator(delegateFactory, context, tableRef, scan,
                     context.getConnection().getQueryServices().getProps().getLong(
                                         QueryServices.SCAN_RESULT_CHUNK_SIZE,
-                                        QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE));
+                                        QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE), scanner);
         }
     }
 
-    public ChunkedResultIterator(ParallelIteratorFactory delegateIteratorFactory,
-            StatementContext context, TableRef tableRef, Scan scan, long chunkSize) throws SQLException {
+    private ChunkedResultIterator(ParallelIteratorFactory delegateIteratorFactory,
+            StatementContext context, TableRef tableRef, Scan scan, long chunkSize, ResultIterator scanner) throws SQLException {
         this.delegateIteratorFactory = delegateIteratorFactory;
         this.context = context;
         this.tableRef = tableRef;
@@ -87,9 +87,9 @@ public class ChunkedResultIterator implements PeekingResultIterator {
         // to get parallel scans kicked off in separate threads. If we delay this,
         // we'll get serialized behavior (see PHOENIX-
         if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get first chunked result iterator over " + tableRef.getTable().getName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan)));
-        ResultIterator singleChunkResultIterator = new SingleChunkResultIterator(
-                new TableResultIterator(context, tableRef, scan), chunkSize);
-        resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan);
+        ResultIterator singleChunkResultIterator = new SingleChunkResultIterator(scanner, chunkSize);
+        String tableName = tableRef.getTable().getPhysicalName().getString();
+        resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName);
     }
 
     @Override
@@ -118,9 +118,10 @@ public class ChunkedResultIterator implements PeekingResultIterator {
             scan = ScanUtil.newScan(scan);
             scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey));
             if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get next chunked result iterator over " + tableRef.getTable().getName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan)));
+            String tableName = tableRef.getTable().getPhysicalName().getString();
             ResultIterator singleChunkResultIterator = new SingleChunkResultIterator(
-                    new TableResultIterator(context, tableRef, scan), chunkSize);
-            resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan);
+                    new TableResultIterator(context, tableRef, scan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName)), chunkSize);
+            resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName);
         }
         return resultIterator;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
index df8f658..f25e373 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
@@ -25,10 +25,10 @@ import org.apache.phoenix.compile.StatementContext;
 public interface ParallelIteratorFactory {
     public static ParallelIteratorFactory NOOP_FACTORY = new ParallelIteratorFactory() {
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan)
+        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName)
                 throws SQLException {
             return LookAheadResultIterator.wrap(scanner);
         }
     };
-    PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException;
+    PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName) throws SQLException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index be10c20..2dfbfe3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -17,7 +17,7 @@
  */
 package org.apache.phoenix.iterate;
 
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_NUM_PARALLEL_SCANS;
 
 import java.sql.SQLException;
 import java.util.Collections;
@@ -30,6 +30,10 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.job.JobManager.JobCallable;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.monitoring.CombinableMetric;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
+import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.ScanUtil;
@@ -79,19 +83,25 @@ public class ParallelIterators extends BaseResultIterators {
         // Shuffle so that we start execution across many machines
         // before we fill up the thread pool
         Collections.shuffle(scanLocations);
-        PARALLEL_SCANS.update(scanLocations.size());
+        ReadMetricQueue readMetrics = context.getReadMetricsQueue();
+        final String physicalTableName = tableRef.getTable().getPhysicalName().getString();
+        int numScans = scanLocations.size();
+        context.getOverallQueryMetrics().updateNumParallelScans(numScans);
+        GLOBAL_NUM_PARALLEL_SCANS.update(numScans);
         for (ScanLocator scanLocation : scanLocations) {
             final Scan scan = scanLocation.getScan();
+            final CombinableMetric scanMetrics = readMetrics.allotMetric(MetricType.SCAN_BYTES, physicalTableName);
+            final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(readMetrics, physicalTableName);
             Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
-
+                
                 @Override
                 public PeekingResultIterator call() throws Exception {
                     long startTime = System.currentTimeMillis();
-                    ResultIterator scanner = new TableResultIterator(context, tableRef, scan);
+                    ResultIterator scanner = new TableResultIterator(context, tableRef, scan, scanMetrics);
                     if (logger.isDebugEnabled()) {
                         logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan)));
                     }
-                    PeekingResultIterator iterator = iteratorFactory.newIterator(context, scanner, scan);
+                    PeekingResultIterator iterator = iteratorFactory.newIterator(context, scanner, scan, physicalTableName);
                     
                     // Fill the scanner's cache. This helps reduce latency since we are parallelizing the I/O needed.
                     iterator.peek();
@@ -109,6 +119,11 @@ public class ParallelIterators extends BaseResultIterators {
                 public Object getJobId() {
                     return ParallelIterators.this;
                 }
+
+                @Override
+                public TaskExecutionMetricsHolder getTaskExecutionMetric() {
+                    return taskMetrics;
+                }
             }, "Parallel scanner for table: " + tableRef.getTable().getName().getString()));
             // Add our future in the right place so that we can concatenate the
             // results of the inner futures versus merge sorting across all of them.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java
index 4a9ad3e..92ac570 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java
@@ -18,7 +18,7 @@
 package org.apache.phoenix.iterate;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.FAILED_QUERY;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
 
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -268,7 +268,7 @@ public class RoundRobinResultIterator implements ResultIterator {
                 }
             } finally {
                 if (toThrow != null) {
-                    FAILED_QUERY.increment();
+                    GLOBAL_FAILED_QUERY_COUNTER.increment();
                     throw toThrow;
                 }
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
index fd65d0c..b722794 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -17,7 +17,7 @@
  */
 package org.apache.phoenix.iterate;
 
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.SCAN_BYTES;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -28,15 +28,20 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.phoenix.monitoring.PhoenixMetrics;
+import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric;
+import org.apache.phoenix.monitoring.GlobalClientMetrics;
+import org.apache.phoenix.monitoring.CombinableMetric;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ServerUtil;
 
 public class ScanningResultIterator implements ResultIterator {
     private final ResultScanner scanner;
-    public ScanningResultIterator(ResultScanner scanner) {
+    private final CombinableMetric scanMetrics;
+    
+    public ScanningResultIterator(ResultScanner scanner, CombinableMetric scanMetrics) {
         this.scanner = scanner;
+        this.scanMetrics = scanMetrics;
     }
     
     @Override
@@ -66,17 +71,18 @@ public class ScanningResultIterator implements ResultIterator {
 		return "ScanningResultIterator [scanner=" + scanner + "]";
 	}
 	
-	private static void calculateScanSize(Result result) {
-	    if (PhoenixMetrics.isMetricsEnabled()) {
-	        if (result != null) {
-	            Cell[] cells = result.rawCells();
-	            long scanResultSize = 0;
-	            for (Cell cell : cells) {
-	                KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-	                scanResultSize += kv.heapSize();
-	            }
-	            SCAN_BYTES.update(scanResultSize);
-	        }
-	    }
-	}
+    private void calculateScanSize(Result result) {
+        if (GlobalClientMetrics.isMetricsEnabled() || scanMetrics != NoOpRequestMetric.INSTANCE) {
+            if (result != null) {
+                Cell[] cells = result.rawCells();
+                long scanResultSize = 0;
+                for (Cell cell : cells) {
+                    KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+                    scanResultSize += kv.heapSize();
+                }
+                scanMetrics.change(scanResultSize);
+                GLOBAL_SCAN_BYTES.update(scanResultSize);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index 6b3b5e3..516d73e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.iterate;
 
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
+
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
@@ -29,11 +31,9 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.iterate.TableResultIterator.ScannerCreation;
 import org.apache.phoenix.job.JobManager.JobCallable;
+import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
 import org.apache.phoenix.trace.util.Tracing;
-import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.ScanUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -48,7 +48,6 @@ import com.google.common.collect.Lists;
  * @since 0.1
  */
 public class SerialIterators extends BaseResultIterators {
-	private static final Logger logger = LoggerFactory.getLogger(SerialIterators.class);
 	private static final String NAME = "SERIAL";
     private final ParallelIteratorFactory iteratorFactory;
     
@@ -74,18 +73,15 @@ public class SerialIterators extends BaseResultIterators {
             Scan lastScan = scans.get(scans.size()-1);
             final Scan overallScan = ScanUtil.newScan(firstScan);
             overallScan.setStopRow(lastScan.getStopRow());
+            final String tableName = tableRef.getTable().getPhysicalName().getString();
+            final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(context.getReadMetricsQueue(), tableName);
             Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
-
                 @Override
                 public PeekingResultIterator call() throws Exception {
                 	List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(scans.size());
                 	for (final Scan scan : scans) {
-	                    long startTime = System.currentTimeMillis();
-	                    ResultIterator scanner = new TableResultIterator(context, tableRef, scan, ScannerCreation.DELAYED);
-	                    if (logger.isDebugEnabled()) {
-	                        logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan)));
-	                    }
-	                    concatIterators.add(iteratorFactory.newIterator(context, scanner, scan));
+                	    ResultIterator scanner = new TableResultIterator(context, tableRef, scan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), ScannerCreation.DELAYED);
+                	    concatIterators.add(iteratorFactory.newIterator(context, scanner, scan, tableName));
                 	}
                 	PeekingResultIterator concatIterator = ConcatResultIterator.newIterator(concatIterators);
                     allIterators.add(concatIterator);
@@ -101,6 +97,11 @@ public class SerialIterators extends BaseResultIterators {
                 public Object getJobId() {
                     return SerialIterators.this;
                 }
+
+                @Override
+                public TaskExecutionMetricsHolder getTaskExecutionMetric() {
+                    return taskMetrics;
+                }
             }, "Serial scanner for table: " + tableRef.getTable().getName().getString()));
             // Add our singleton Future which will execute serially
             nestedFutures.add(Collections.singletonList(new Pair<Scan,Future<PeekingResultIterator>>(overallScan,future)));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
index f49bce5..540b410 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
@@ -17,6 +17,11 @@
  */
 package org.apache.phoenix.iterate;
 
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MEMORY_CHUNK_BYTES;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MEMORY_WAIT_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FILE_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FILE_SIZE;
+
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -34,6 +39,9 @@ import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.memory.MemoryManager;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.monitoring.MemoryMetricsHolder;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
+import org.apache.phoenix.monitoring.SpoolingMetricsHolder;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.tuple.ResultTuple;
@@ -42,8 +50,6 @@ import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ResultUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TupleUtil;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.NUM_SPOOL_FILE;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.SPOOL_FILE_SIZE;
 
 
 
@@ -56,7 +62,10 @@ import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.SPOOL_FILE
  * @since 0.1
  */
 public class SpoolingResultIterator implements PeekingResultIterator {
+    
     private final PeekingResultIterator spoolFrom;
+    private final SpoolingMetricsHolder spoolMetrics;
+    private final MemoryMetricsHolder memoryMetrics;
 
     public static class SpoolingResultIteratorFactory implements ParallelIteratorFactory {
         private final QueryServices services;
@@ -65,14 +74,16 @@ public class SpoolingResultIterator implements PeekingResultIterator {
             this.services = services;
         }
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException {
-            return new SpoolingResultIterator(scanner, services);
+        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName) throws SQLException {
+            ReadMetricQueue readRequestMetric = context.getReadMetricsQueue();
+            SpoolingMetricsHolder spoolMetrics = new SpoolingMetricsHolder(readRequestMetric, physicalTableName);
+            MemoryMetricsHolder memoryMetrics = new MemoryMetricsHolder(readRequestMetric, physicalTableName);
+            return new SpoolingResultIterator(spoolMetrics, memoryMetrics, scanner, services);
         }
-
     }
 
-    public SpoolingResultIterator(ResultIterator scanner, QueryServices services) throws SQLException {
-        this (scanner, services.getMemoryManager(),
+    private SpoolingResultIterator(SpoolingMetricsHolder spoolMetrics, MemoryMetricsHolder memoryMetrics, ResultIterator scanner, QueryServices services) throws SQLException {
+        this (spoolMetrics, memoryMetrics, scanner, services.getMemoryManager(),
                 services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES),
                 services.getProps().getLong(QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SPOOL_TO_DISK_BYTES),
                 services.getProps().get(QueryServices.SPOOL_DIRECTORY, QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY));
@@ -87,9 +98,15 @@ public class SpoolingResultIterator implements PeekingResultIterator {
     *  the memory manager) is exceeded.
     * @throws SQLException
     */
-    SpoolingResultIterator(ResultIterator scanner, MemoryManager mm, final int thresholdBytes, final long maxSpoolToDisk, final String spoolDirectory) throws SQLException {
+    SpoolingResultIterator(SpoolingMetricsHolder sMetrics, MemoryMetricsHolder mMetrics, ResultIterator scanner, MemoryManager mm, final int thresholdBytes, final long maxSpoolToDisk, final String spoolDirectory) throws SQLException {
+        this.spoolMetrics = sMetrics;
+        this.memoryMetrics = mMetrics;
         boolean success = false;
+        long startTime = System.currentTimeMillis();
         final MemoryChunk chunk = mm.allocate(0, thresholdBytes);
+        long waitTime = System.currentTimeMillis() - startTime;
+        GLOBAL_MEMORY_WAIT_TIME.update(waitTime);
+        memoryMetrics.getMemoryWaitTimeMetric().change(waitTime);
         DeferredFileOutputStream spoolTo = null;
         try {
             // Can't be bigger than int, since it's the max of the above allocation
@@ -97,8 +114,11 @@ public class SpoolingResultIterator implements PeekingResultIterator {
             spoolTo = new DeferredFileOutputStream(size, "ResultSpooler",".bin", new File(spoolDirectory)) {
                 @Override
                 protected void thresholdReached() throws IOException {
-                    super.thresholdReached();
-                    chunk.close();
+                    try {
+                        super.thresholdReached();
+                    } finally {
+                        chunk.close();
+                    }
                 }
             };
             DataOutputStream out = new DataOutputStream(spoolTo);
@@ -116,9 +136,14 @@ public class SpoolingResultIterator implements PeekingResultIterator {
                 byte[] data = spoolTo.getData();
                 chunk.resize(data.length);
                 spoolFrom = new InMemoryResultIterator(data, chunk);
+                GLOBAL_MEMORY_CHUNK_BYTES.update(data.length);
+                memoryMetrics.getMemoryChunkSizeMetric().change(data.length);
             } else {
-                NUM_SPOOL_FILE.increment();
-                SPOOL_FILE_SIZE.update(spoolTo.getFile().length());
+                long sizeOfSpoolFile = spoolTo.getFile().length();
+                GLOBAL_SPOOL_FILE_SIZE.update(sizeOfSpoolFile);
+                GLOBAL_SPOOL_FILE_COUNTER.increment();
+                spoolMetrics.getNumSpoolFileMetric().increment();
+                spoolMetrics.getSpoolFileSizeMetric().change(sizeOfSpoolFile);
                 spoolFrom = new OnDiskResultIterator(spoolTo.getFile());
             }
             success = true;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index ea13dfd..6f040d1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.monitoring.CombinableMetric;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.Closeables;
@@ -44,9 +45,10 @@ public class TableResultIterator extends ExplainTable implements ResultIterator
     private final Scan scan;
     private final HTableInterface htable;
     private volatile ResultIterator delegate;
-
-    public TableResultIterator(StatementContext context, TableRef tableRef) throws SQLException {
-        this(context, tableRef, context.getScan());
+    private final CombinableMetric scanMetrics;
+    
+    public TableResultIterator(StatementContext context, TableRef tableRef, CombinableMetric scanMetrics) throws SQLException {
+        this(context, tableRef, context.getScan(), scanMetrics);
     }
 
     /*
@@ -62,7 +64,7 @@ public class TableResultIterator extends ExplainTable implements ResultIterator
                 delegate = this.delegate;
                 if (delegate == null) {
                     try {
-                        this.delegate = delegate = isClosing ? ResultIterator.EMPTY_ITERATOR : new ScanningResultIterator(htable.getScanner(scan));
+                        this.delegate = delegate = isClosing ? ResultIterator.EMPTY_ITERATOR : new ScanningResultIterator(htable.getScanner(scan), scanMetrics);
                     } catch (IOException e) {
                         Closeables.closeQuietly(htable);
                         throw ServerUtil.parseServerException(e);
@@ -73,13 +75,14 @@ public class TableResultIterator extends ExplainTable implements ResultIterator
         return delegate;
     }
     
-    public TableResultIterator(StatementContext context, TableRef tableRef, Scan scan) throws SQLException {
-        this(context, tableRef, scan, ScannerCreation.IMMEDIATE);
+    public TableResultIterator(StatementContext context, TableRef tableRef, Scan scan, CombinableMetric scanMetrics) throws SQLException {
+        this(context, tableRef, scan, scanMetrics, ScannerCreation.IMMEDIATE);
     }
 
-    public TableResultIterator(StatementContext context, TableRef tableRef, Scan scan, ScannerCreation creationMode) throws SQLException {
+    public TableResultIterator(StatementContext context, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, ScannerCreation creationMode) throws SQLException {
         super(context, tableRef);
         this.scan = scan;
+        this.scanMetrics = scanMetrics;
         htable = context.getConnection().getQueryServices().getTable(tableRef.getTable().getPhysicalName().getBytes());
         if (creationMode == ScannerCreation.IMMEDIATE) {
         	getDelegate(false);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UnionResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UnionResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UnionResultIterators.java
index b7c8b21..2296982 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UnionResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UnionResultIterators.java
@@ -22,6 +22,9 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.monitoring.OverAllQueryMetrics;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.util.ServerUtil;
 
@@ -39,14 +42,22 @@ public class UnionResultIterators implements ResultIterators {
     private final List<List<Scan>> scans;
     private final List<PeekingResultIterator> iterators;
     private final List<QueryPlan> plans;
-
-    public UnionResultIterators(List<QueryPlan> plans) throws SQLException {
+    private final List<ReadMetricQueue> readMetricsList;
+    private final List<OverAllQueryMetrics> overAllQueryMetricsList;
+    private boolean closed;
+    private final StatementContext parentStmtCtx;
+    public UnionResultIterators(List<QueryPlan> plans, StatementContext parentStmtCtx) throws SQLException {
+        this.parentStmtCtx = parentStmtCtx;
         this.plans = plans;
         int nPlans = plans.size();
         iterators = Lists.newArrayListWithExpectedSize(nPlans);
         splits = Lists.newArrayListWithExpectedSize(nPlans * 30); 
         scans = Lists.newArrayListWithExpectedSize(nPlans * 10); 
+        readMetricsList = Lists.newArrayListWithCapacity(nPlans);
+        overAllQueryMetricsList = Lists.newArrayListWithCapacity(nPlans);
         for (QueryPlan plan : this.plans) {
+            readMetricsList.add(plan.getContext().getReadMetricsQueue());
+            overAllQueryMetricsList.add(plan.getContext().getOverallQueryMetrics());
             iterators.add(LookAheadResultIterator.wrap(plan.iterator()));
             splits.addAll(plan.getSplits()); 
             scans.addAll(plan.getScans());
@@ -59,32 +70,47 @@ public class UnionResultIterators implements ResultIterators {
     }
 
     @Override
-    public void close() throws SQLException {   
-        SQLException toThrow = null;
-        try {
-            if (iterators != null) {
-                for (int index=0; index < iterators.size(); index++) {
-                    PeekingResultIterator iterator = iterators.get(index);
-                    try {
-                        iterator.close();
-                    } catch (Exception e) {
-                        if (toThrow == null) {
-                            toThrow = ServerUtil.parseServerException(e);
-                        } else {
-                            toThrow.setNextException(ServerUtil.parseServerException(e));
+    public void close() throws SQLException {
+        if (!closed) {
+            closed = true;
+            SQLException toThrow = null;
+            try {
+                if (iterators != null) {
+                    for (int index=0; index < iterators.size(); index++) {
+                        PeekingResultIterator iterator = iterators.get(index);
+                        try {
+                            iterator.close();
+                        } catch (Exception e) {
+                            if (toThrow == null) {
+                                toThrow = ServerUtil.parseServerException(e);
+                            } else {
+                                toThrow.setNextException(ServerUtil.parseServerException(e));
+                            }
                         }
                     }
                 }
-            }
-        } catch (Exception e) {
-            toThrow = ServerUtil.parseServerException(e);
-        } finally {
-            if (toThrow != null) {
-                throw toThrow;
+            } catch (Exception e) {
+                toThrow = ServerUtil.parseServerException(e);
+            } finally {
+                setMetricsInParentContext();
+                if (toThrow != null) {
+                    throw toThrow;
+                }
             }
         }
     }
-
+    
+    private void setMetricsInParentContext() {
+        ReadMetricQueue parentCtxReadMetrics = parentStmtCtx.getReadMetricsQueue();
+        for (ReadMetricQueue readMetrics : readMetricsList) {
+            parentCtxReadMetrics.combineReadMetrics(readMetrics);
+        }
+        OverAllQueryMetrics parentCtxQueryMetrics = parentStmtCtx.getOverallQueryMetrics();
+        for (OverAllQueryMetrics metric : overAllQueryMetricsList) {
+            parentCtxQueryMetrics.combine(metric);
+        }
+    }
+    
     @Override
     public List<List<Scan>> getScans() {
         return scans;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index e33e8ee..a6479f3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -122,7 +122,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
     private final Properties info;
     private List<SQLCloseable> statements = new ArrayList<SQLCloseable>();
     private final Map<PDataType<?>, Format> formatters = new HashMap<>();
-    private MutationState mutationState;
+    private final MutationState mutationState;
     private final int mutateBatchSize;
     private final Long scn;
     private boolean isAutoCommit = false;
@@ -136,8 +136,9 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
     private boolean isClosed = false;
     private Sampler<?> sampler;
     private boolean readOnly = false;
-    private Map<String, String> customTracingAnnotations = emptyMap(); 
- 
+    private Map<String, String> customTracingAnnotations = emptyMap();
+    private final boolean isRequestLevelMetricsEnabled;
+    
     static {
         Tracing.addTraceMetricsSource();
     }
@@ -232,6 +233,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
                          ! Objects.equal(tenantId, function.getTenantId()));
             }
         };
+        this.isRequestLevelMetricsEnabled = JDBCUtil.isCollectingRequestLevelMetricsEnabled(url, info, this.services.getProps());
         this.mutationState = newMutationState(maxSize);
         this.metaData = metaData.pruneTables(pruner);
         this.metaData = metaData.pruneFunctions(pruner);
@@ -433,6 +435,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
             return;
         }
         try {
+            clearMetrics();
             try {
                 if (traceScope != null) {
                     traceScope.close();
@@ -853,4 +856,23 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
     public void setTraceScope(TraceScope traceScope) {
         this.traceScope = traceScope;
     }
+    
+    public Map<String, Map<String, Long>> getMutationMetrics() {
+        return mutationState.getMutationMetricQueue().aggregate();
+    }
+    
+    public Map<String, Map<String, Long>> getReadMetrics() {
+        return mutationState.getReadMetricQueue() != null ? mutationState.getReadMetricQueue().aggregate() : Collections.<String, Map<String, Long>>emptyMap();
+    }
+    
+    public boolean isRequestLevelMetricsEnabled() {
+        return isRequestLevelMetricsEnabled;
+    }
+    
+    public void clearMetrics() {
+        mutationState.getMutationMetricQueue().clearMetrics();
+        if (mutationState.getReadMetricQueue() != null) {
+            mutationState.getReadMetricQueue().clearMetrics();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index d1b3b27..2dd8af4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.compile.ColumnProjector;
 import org.apache.phoenix.compile.ExpressionProjector;
 import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -311,7 +312,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
     public static final int CLIENT_KEY_VALUE_BUILDER_THRESHOLD = VersionUtil.encodeVersion("0", "94", "14");
     
     PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException {
-        this.emptyResultSet = new PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, new PhoenixStatement(connection));
+        this.emptyResultSet = new PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, new StatementContext(new PhoenixStatement(connection), false));
         this.connection = connection;
     }
 
@@ -509,11 +510,10 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
                 public PhoenixStatement newStatement(PhoenixConnection connection) {
                     return new PhoenixStatement(connection) {
                         @Override
-                        protected PhoenixResultSet newResultSet(ResultIterator iterator, RowProjector projector)
-                                throws SQLException {
-                            return new PhoenixResultSet(
-                                    new TenantColumnFilteringIterator(iterator, projector),
-                                    projector, this);
+                        protected PhoenixResultSet newResultSet(ResultIterator iterator, RowProjector projector,
+                                StatementContext context) throws SQLException {
+                            return new PhoenixResultSet(new TenantColumnFilteringIterator(iterator, projector),
+                                    projector, context);
                         }
                     };
                 }
@@ -523,7 +523,12 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
         }
         return stmt.executeQuery(buf.toString());
     }
-
+    
+//    private ColumnResolver getColumnResolverForCatalogTable() throws SQLException {
+//        TableRef tableRef = new TableRef(getTable(connection, SYSTEM_CATALOG_NAME));
+//        return FromCompiler.getResolver(tableRef);
+//    }
+    
     /**
      * Filters the tenant id column out of a column metadata result set (thus, where each row is a column definition).
      * The tenant id is by definition the first column of the primary key, but the primary key does not necessarily
@@ -1007,7 +1012,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
     }
     @Override
     public ResultSet getTableTypes() throws SQLException {
-        return new PhoenixResultSet(new MaterializedResultIterator(TABLE_TYPE_TUPLES), TABLE_TYPE_ROW_PROJECTOR, new PhoenixStatement(connection));
+        return new PhoenixResultSet(new MaterializedResultIterator(TABLE_TYPE_TUPLES), TABLE_TYPE_ROW_PROJECTOR, new StatementContext(new PhoenixStatement(connection), false));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index 8ee56ea..da06370 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -39,16 +39,21 @@ import java.sql.Time;
 import java.sql.Timestamp;
 import java.text.Format;
 import java.util.Calendar;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.ColumnProjector;
 import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.monitoring.OverAllQueryMetrics;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PBoolean;
@@ -109,18 +114,25 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.pho
     private final ResultIterator scanner;
     private final RowProjector rowProjector;
     private final PhoenixStatement statement;
+    private final StatementContext context;
+    private final ReadMetricQueue readMetricsQueue;
+    private final OverAllQueryMetrics overAllQueryMetrics;
     private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
 
     private Tuple currentRow = BEFORE_FIRST;
     private boolean isClosed = false;
     private boolean wasNull = false;
-
-    public PhoenixResultSet(ResultIterator resultIterator, RowProjector rowProjector, PhoenixStatement statement) throws SQLException {
+    private boolean firstRecordRead = false;
+    
+    public PhoenixResultSet(ResultIterator resultIterator, RowProjector rowProjector, StatementContext ctx) throws SQLException {
         this.rowProjector = rowProjector;
         this.scanner = resultIterator;
-        this.statement = statement;
+        this.context = ctx;
+        this.statement = context.getStatement();
+        this.readMetricsQueue = context.getReadMetricsQueue();
+        this.overAllQueryMetrics = context.getOverallQueryMetrics();
     }
-
+    
     @Override
     public boolean absolute(int row) throws SQLException {
         throw new SQLFeatureNotSupportedException();
@@ -147,14 +159,14 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.pho
 
     @Override
     public void close() throws SQLException {
-        if (isClosed) {
-            return;
-        }
+        if (isClosed) { return; }
         try {
             scanner.close();
         } finally {
             isClosed = true;
             statement.getResultSets().remove(this);
+            overAllQueryMetrics.endQuery();
+            overAllQueryMetrics.stopResultSetWatch();
         }
     }
 
@@ -754,6 +766,10 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.pho
     public boolean next() throws SQLException {
         checkOpen();
         try {
+            if (!firstRecordRead) {
+                firstRecordRead = true;
+                overAllQueryMetrics.startResultSetWatch();
+            }
             currentRow = scanner.next();
             rowProjector.reset();
         } catch (RuntimeException e) {
@@ -764,6 +780,10 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.pho
             }
             throw e;
         }
+        if (currentRow == null) {
+            overAllQueryMetrics.endQuery();
+            overAllQueryMetrics.stopResultSetWatch();
+        }
         return currentRow != null;
     }
 
@@ -1261,4 +1281,18 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.pho
     public ResultIterator getUnderlyingIterator() {
         return scanner;
     }
+    
+    public Map<String, Map<String, Long>> getReadMetrics() {
+        return readMetricsQueue.aggregate();
+    }
+
+    public Map<String, Long> getOverAllRequestReadMetrics() {
+        return overAllQueryMetrics.publish();
+    }
+    
+    public void resetMetrics() {
+        readMetricsQueue.clearMetrics();
+        overAllQueryMetrics.reset();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 51fb027..9b67c22 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -17,9 +17,9 @@
  */
 package org.apache.phoenix.jdbc;
 
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.MUTATION_COUNT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.QUERY_COUNT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.QUERY_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQL_COUNTER;
 
 import java.io.IOException;
 import java.io.Reader;
@@ -213,8 +213,8 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
         return resultSets;
     }
     
-    protected PhoenixResultSet newResultSet(ResultIterator iterator, RowProjector projector) throws SQLException {
-        return new PhoenixResultSet(iterator, projector, this);
+    protected PhoenixResultSet newResultSet(ResultIterator iterator, RowProjector projector, StatementContext context) throws SQLException {
+        return new PhoenixResultSet(iterator, projector, context);
     }
     
     protected boolean execute(final CompilableStatement stmt) throws SQLException {
@@ -232,7 +232,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
     }
     
     protected PhoenixResultSet executeQuery(final CompilableStatement stmt) throws SQLException {
-        QUERY_COUNT.increment();
+        GLOBAL_SELECT_SQL_COUNTER.increment();
         try {
             return CallRunner.run(
                 new CallRunner.CallableThrowable<PhoenixResultSet, SQLException>() {
@@ -250,7 +250,9 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                             String explainPlan = QueryUtil.getExplainPlan(resultIterator);
                             logger.debug(LogUtil.addCustomAnnotations("Explain plan: " + explainPlan, connection));
                         }
-                        PhoenixResultSet rs = newResultSet(resultIterator, plan.getProjector());
+                        StatementContext context = plan.getContext();
+                        context.getOverallQueryMetrics().startQuery();
+                        PhoenixResultSet rs = newResultSet(resultIterator, plan.getProjector(), context);
                         resultSets.add(rs);
                         setLastQueryPlan(plan);
                         setLastResultSet(rs);
@@ -269,7 +271,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                         // Regardless of whether the query was successfully handled or not, 
                         // update the time spent so far. If needed, we can separate out the
                         // success times and failure times.
-                        QUERY_TIME.update(System.currentTimeMillis() - startTime);
+                        GLOBAL_QUERY_TIME.update(System.currentTimeMillis() - startTime);
                     }
                 }
                 }, PhoenixContextExecutor.inContext());
@@ -285,7 +287,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                 SQLExceptionCode.READ_ONLY_CONNECTION).
                 build().buildException();
         }
-	    MUTATION_COUNT.increment();
+	    GLOBAL_MUTATION_SQL_COUNTER.increment();
         try {
             return CallRunner
                     .run(

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java b/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
index 31ef742..7406e46 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
@@ -17,11 +17,11 @@
  */
 package org.apache.phoenix.job;
 
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.REJECTED_TASK_COUNT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.TASK_COUNT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.TASK_END_TO_END_TIME;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.TASK_EXECUTION_TIME;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.TASK_QUEUE_WAIT_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_REJECTED_TASK_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_END_TO_END_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTED_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTION_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_QUEUE_WAIT_TIME;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
@@ -36,6 +36,10 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.annotation.Nullable;
+
+import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
+
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 /**
  * 
@@ -63,6 +67,7 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> {
 
     public static interface JobRunnable<T> extends Runnable {
         public Object getJobId();
+        public TaskExecutionMetricsHolder getTaskExecutionMetric();
     }
 
     public static ThreadPoolExecutor createThreadPoolExec(int keepAliveMs, int size, int queueSize, boolean useInstrumentedThreadPool) {
@@ -117,13 +122,17 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> {
      */
     static class JobFutureTask<T> extends FutureTask<T> {
         private final Object jobId;
+        @Nullable
+        private final TaskExecutionMetricsHolder taskMetric;
         
         public JobFutureTask(Runnable r, T t) {
             super(r, t);
             if(r instanceof JobRunnable){
               	this.jobId = ((JobRunnable)r).getJobId();
+              	this.taskMetric = ((JobRunnable)r).getTaskExecutionMetric();
             } else {
             	this.jobId = this;
+            	this.taskMetric = null;
             }
         }
         
@@ -132,8 +141,10 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> {
             // FIXME: this fails when executor used by hbase
             if (c instanceof JobCallable) {
                 this.jobId = ((JobCallable<T>) c).getJobId();
+                this.taskMetric = ((JobCallable<T>) c).getTaskExecutionMetric();
             } else {
                 this.jobId = this;
+                this.taskMetric = null;
             }
         }
         
@@ -187,6 +198,7 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> {
      */
     public static interface JobCallable<T> extends Callable<T> {
         public Object getJobId();
+        public TaskExecutionMetricsHolder getTaskExecutionMetric();
     }
 
 
@@ -224,27 +236,40 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> {
         private final RejectedExecutionHandler rejectedExecHandler = new RejectedExecutionHandler() {
             @Override
             public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
-                REJECTED_TASK_COUNT.increment();
+                TaskExecutionMetricsHolder metrics = getRequestMetric(r);
+                if (metrics != null) {
+                    metrics.getNumRejectedTasks().increment();
+                }
+                GLOBAL_REJECTED_TASK_COUNTER.increment();
                 throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + executor.toString());
             }
         };
 
-        public InstrumentedThreadPoolExecutor(String threadPoolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
-                BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
+        public InstrumentedThreadPoolExecutor(String threadPoolName, int corePoolSize, int maximumPoolSize,
+                long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
             super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
             setRejectedExecutionHandler(rejectedExecHandler);
         }
 
         @Override
         public void execute(Runnable task) {
-            TASK_COUNT.increment();
+            TaskExecutionMetricsHolder metrics = getRequestMetric(task);
+            if (metrics != null) {
+                metrics.getNumTasks().increment();
+            }
+            GLOBAL_TASK_EXECUTED_COUNTER.increment();
             super.execute(task);
         }
 
         @Override
         protected void beforeExecute(Thread worker, Runnable task) {
             InstrumentedJobFutureTask instrumentedTask = (InstrumentedJobFutureTask)task;
-            TASK_QUEUE_WAIT_TIME.update(System.currentTimeMillis() - instrumentedTask.getTaskSubmissionTime());
+            long queueWaitTime = System.currentTimeMillis() - instrumentedTask.getTaskSubmissionTime();
+            GLOBAL_TASK_QUEUE_WAIT_TIME.update(queueWaitTime);
+            TaskExecutionMetricsHolder metrics = getRequestMetric(task);
+            if (metrics != null) {
+                metrics.getTaskQueueWaitTime().change(queueWaitTime);
+            }
             super.beforeExecute(worker, instrumentedTask);
         }
 
@@ -254,10 +279,21 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> {
             try {
                 super.afterExecute(instrumentedTask, t);
             } finally {
-                TASK_EXECUTION_TIME.update(System.currentTimeMillis() - instrumentedTask.getTaskExecutionStartTime());
-                TASK_END_TO_END_TIME.update(System.currentTimeMillis() - instrumentedTask.getTaskSubmissionTime());
+                long taskExecutionTime = System.currentTimeMillis() - instrumentedTask.getTaskExecutionStartTime();
+                long endToEndTaskTime = System.currentTimeMillis() - instrumentedTask.getTaskSubmissionTime();
+                TaskExecutionMetricsHolder metrics = getRequestMetric(task);
+                if (metrics != null) {
+                    metrics.getTaskExecutionTime().change(taskExecutionTime);
+                    metrics.getTaskEndToEndTime().change(endToEndTaskTime);
+                }
+                GLOBAL_TASK_EXECUTION_TIME.update(taskExecutionTime);
+                GLOBAL_TASK_END_TO_END_TIME.update(endToEndTaskTime);
             }
         }
+
+        private static TaskExecutionMetricsHolder getRequestMetric(Runnable task) {
+            return ((JobFutureTask)task).taskMetric;
+        }
     }
 }
 


[2/4] phoenix git commit: PHOENIX-1819 Build a framework to capture and report phoenix client side request level metrics

Posted by sa...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
index 5270277..bb4054b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
@@ -57,6 +57,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.job.JobManager;
+import org.apache.phoenix.monitoring.GlobalClientMetrics;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -255,12 +256,9 @@ public class CsvBulkLoadTool extends Configured implements Tool {
         }
         
         List<Future<Boolean>> runningJobs = new ArrayList<Future<Boolean>>();
-        boolean useInstrumentedPool = conn
-                .unwrap(PhoenixConnection.class)
-                .getQueryServices()
-                .getProps()
-                .getBoolean(QueryServices.METRICS_ENABLED,
-                        QueryServicesOptions.DEFAULT_IS_METRICS_ENABLED);
+        boolean useInstrumentedPool = GlobalClientMetrics.isMetricsEnabled()
+                || conn.unwrap(PhoenixConnection.class).isRequestLevelMetricsEnabled();
+                        
         ExecutorService executor =
                 JobManager.createThreadPoolExec(Integer.MAX_VALUE, 5, 20, useInstrumentedPool);
         try{

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index eb6dc3d..b500a25 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.mapreduce;
 
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
+
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
@@ -32,6 +34,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.LookAheadResultIterator;
 import org.apache.phoenix.iterate.PeekingResultIterator;
@@ -40,6 +43,7 @@ import org.apache.phoenix.iterate.RoundRobinResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.iterate.TableResultIterator;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
@@ -100,8 +104,12 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
         final List<Scan> scans = pSplit.getScans();
         try {
             List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size());
+            StatementContext ctx = queryPlan.getContext();
+            ReadMetricQueue readMetrics = ctx.getReadMetricsQueue();
+            String tableName = queryPlan.getTableRef().getTable().getPhysicalName().getString();
             for (Scan scan : scans) {
-                final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(), scan);
+                final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(),
+                        queryPlan.getTableRef(), scan, readMetrics.allotMetric(SCAN_BYTES, tableName));
                 PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
                 iterators.add(peekingResultIterator);
             }
@@ -112,7 +120,7 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
             this.resultIterator = iterator;
             // Clone the row projector as it's not thread safe and would be used simultaneously by
             // multiple threads otherwise.
-            this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector().cloneIfNecessary(),queryPlan.getContext().getStatement());
+            this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector().cloneIfNecessary(), queryPlan.getContext());
         } catch (SQLException e) {
             LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ",e.getMessage()));
             Throwables.propagate(e);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
index 02c1dea..79b49c5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
@@ -17,9 +17,6 @@
  */
 package org.apache.phoenix.memory;
 
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MEMORY_MANAGER_BYTES;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MEMORY_WAIT_TIME;
-
 import org.apache.http.annotation.GuardedBy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,8 +89,6 @@ public class GlobalMemoryManager implements MemoryManager {
             }
             usedMemoryBytes += nBytes;
         }
-        MEMORY_WAIT_TIME.update(System.currentTimeMillis() - startTimeMs);
-        MEMORY_MANAGER_BYTES.update(nBytes);
         return nBytes;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java
new file mode 100644
index 0000000..796e8ba
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Version of {@link Metric} that can be used when the metric is being concurrently accessed or modified by multiple
+ * threads.
+ */
+public class AtomicMetric implements Metric {
+
+    private final MetricType type;
+    private final AtomicLong value = new AtomicLong();
+
+    public AtomicMetric(MetricType type) {
+        this.type = type;
+    }
+
+    @Override
+    public String getName() {
+        return type.name();
+    }
+
+    @Override
+    public String getDescription() {
+        return type.description();
+    }
+
+    @Override
+    public long getValue() {
+        return value.get();
+    }
+
+    @Override
+    public void change(long delta) {
+        value.addAndGet(delta);
+    }
+
+    @Override
+    public void increment() {
+        value.incrementAndGet();
+    }
+
+    @Override
+    public String getCurrentMetricState() {
+        return getName() + ": " + value.get();
+    }
+
+    @Override
+    public void reset() {
+        value.set(0);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java
new file mode 100644
index 0000000..7ebb0c1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+
+
+/**
+ * Interface for representing a metric that could be published and possibly combined with a metric of the same
+ * type.
+ */
+public interface CombinableMetric extends Metric {
+
+    String getPublishString();
+
+    CombinableMetric combine(CombinableMetric metric);
+
+    public class NoOpRequestMetric implements CombinableMetric {
+
+        public static NoOpRequestMetric INSTANCE = new NoOpRequestMetric();
+        private static final String EMPTY_STRING = "";
+
+        @Override
+        public String getName() {
+            return EMPTY_STRING;
+        }
+
+        @Override
+        public String getDescription() {
+            return EMPTY_STRING;
+        }
+
+        @Override
+        public long getValue() {
+            return 0;
+        }
+
+        @Override
+        public void change(long delta) {}
+
+        @Override
+        public void increment() {}
+
+        @Override
+        public String getCurrentMetricState() {
+            return EMPTY_STRING;
+        }
+
+        @Override
+        public void reset() {}
+
+        @Override
+        public String getPublishString() {
+            return EMPTY_STRING;
+        }
+
+        @Override
+        public CombinableMetric combine(CombinableMetric metric) {
+            return INSTANCE;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java
new file mode 100644
index 0000000..fa6f7d3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class CombinableMetricImpl implements CombinableMetric {
+
+    private final Metric metric;
+
+    public CombinableMetricImpl(MetricType type) {
+        metric = new NonAtomicMetric(type);
+    }
+
+    @Override
+    public String getName() {
+        return metric.getName();
+    }
+
+    @Override
+    public String getDescription() {
+        return metric.getDescription();
+    }
+
+    @Override
+    public long getValue() {
+        return metric.getValue();
+    }
+
+    @Override
+    public void change(long delta) {
+        metric.change(delta);
+    }
+
+    @Override
+    public void increment() {
+        metric.increment();
+    }
+
+    @Override
+    public String getCurrentMetricState() {
+        return metric.getCurrentMetricState();
+    }
+
+    @Override
+    public void reset() {
+        metric.reset();
+    }
+
+    @Override
+    public String getPublishString() {
+        return getCurrentMetricState();
+    }
+
+    @Override
+    public CombinableMetric combine(CombinableMetric metric) {
+        checkArgument(this.getClass().equals(metric.getClass()));
+        this.metric.change(metric.getValue());
+        return this;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Counter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Counter.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Counter.java
deleted file mode 100644
index 141294d..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Counter.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.monitoring;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-/**
- * Incrementing only counter that keeps track of the 
- * number of occurrences of something.
- * 
- */
-@ThreadSafe
-class Counter implements Metric {
-    
-    private final AtomicLong counter;
-    private final String name;
-    private final String description;
-    
-    public Counter(String name, String description) {
-        this.name = name;
-        this.description = description;
-        this.counter = new AtomicLong(0);
-    }
-    
-    public long increment() {
-        return counter.incrementAndGet();
-    }
-    
-    public long getCurrentCount() {
-        return counter.get();
-    }
-    
-    @Override
-    public String getName() {
-        return name;
-    }
-    
-    @Override
-    public String getDescription() {
-        return description;
-    }
-    
-    @Override
-    public void reset() {
-        counter.set(0);
-    }
-    
-    @Override
-    public String toString() {
-        return "Name: " + name + ", Current count: " + counter.get();
-    }
-    
-    @Override
-    public String getCurrentMetricState() {
-        return toString();
-    }
-
-    @Override
-    public long getNumberOfSamples() {
-        return getCurrentCount();
-    }
-
-    @Override
-    public long getTotalSum() {
-        return getCurrentCount();
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
new file mode 100644
index 0000000..a8f3bb4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.MEMORY_WAIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_TIME;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.TASK_REJECTED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SPOOL_FILE_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SPOOL_FILE_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_QUEUE_WAIT_TIME;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.phoenix.query.QueryServicesOptions;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Central place where we keep track of all the global client phoenix metrics. These metrics are different from
+ * {@link ReadMetricQueue} or {@link MutationMetricQueue} as they are collected at the client JVM level as opposed
+ * to the above two which are collected for every phoenix request.
+ */
+
+public enum GlobalClientMetrics {
+    
+    GLOBAL_MUTATION_BATCH_SIZE(MUTATION_BATCH_SIZE),
+    GLOBAL_MUTATION_BYTES(MUTATION_BYTES),
+    GLOBAL_MUTATION_COMMIT_TIME(MUTATION_COMMIT_TIME),
+    GLOBAL_QUERY_TIME(QUERY_TIME),
+    GLOBAL_NUM_PARALLEL_SCANS(NUM_PARALLEL_SCANS),
+    GLOBAL_SCAN_BYTES(SCAN_BYTES),
+    GLOBAL_SPOOL_FILE_SIZE(SPOOL_FILE_SIZE),
+    GLOBAL_MEMORY_CHUNK_BYTES(MEMORY_CHUNK_BYTES),
+    GLOBAL_MEMORY_WAIT_TIME(MEMORY_WAIT_TIME),
+    GLOBAL_TASK_QUEUE_WAIT_TIME(TASK_QUEUE_WAIT_TIME),
+    GLOBAL_TASK_END_TO_END_TIME(TASK_END_TO_END_TIME),
+    GLOBAL_TASK_EXECUTION_TIME(TASK_EXECUTION_TIME),
+    GLOBAL_MUTATION_SQL_COUNTER(MUTATION_SQL_COUNTER),
+    GLOBAL_SELECT_SQL_COUNTER(SELECT_SQL_COUNTER),
+    GLOBAL_TASK_EXECUTED_COUNTER(TASK_EXECUTED_COUNTER),
+    GLOBAL_REJECTED_TASK_COUNTER(TASK_REJECTED_COUNTER),
+    GLOBAL_QUERY_TIMEOUT_COUNTER(QUERY_TIMEOUT_COUNTER),
+    GLOBAL_FAILED_QUERY_COUNTER(QUERY_FAILED_COUNTER),
+    GLOBAL_SPOOL_FILE_COUNTER(SPOOL_FILE_COUNTER);
+    
+    private static final boolean isGlobalMetricsEnabled = QueryServicesOptions.withDefaults().isGlobalMetricsEnabled();
+    private GlobalMetric metric;
+
+    public void update(long value) {
+        if (isGlobalMetricsEnabled) {
+            metric.change(value);
+        }
+    }
+
+    @VisibleForTesting
+    public GlobalMetric getMetric() {
+        return metric;
+    }
+
+    @Override
+    public String toString() {
+        return metric.toString();
+    }
+
+    private GlobalClientMetrics(MetricType metricType) {
+        this.metric = new GlobalMetricImpl(metricType);
+    }
+
+    public void increment() {
+        if (isGlobalMetricsEnabled) {
+            metric.increment();
+        }
+    }
+
+    public static Collection<GlobalMetric> getMetrics() {
+        List<GlobalMetric> metrics = new ArrayList<>();
+        for (GlobalClientMetrics m : GlobalClientMetrics.values()) {
+            metrics.add(m.metric);
+        }
+        return metrics;
+    }
+
+    public static boolean isMetricsEnabled() {
+        return isGlobalMetricsEnabled;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetric.java
new file mode 100644
index 0000000..f3b562f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetric.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+/**
+ * Class that exposes the various internal phoenix metrics collected
+ * at the JVM level. Because metrics are dynamic in nature, it is not guaranteed that the
+ * state exposed will always be in sync with each other. One should use
+ * these metrics primarily for monitoring and debugging purposes. 
+ */
+public interface GlobalMetric extends Metric {
+    
+    /**
+     * @return Number of samples collected since the last {@link #reset()} call.
+     */
+    public long getNumberOfSamples();
+    
+    /**
+     * @return Sum of the values of the metric sampled since the last {@link #reset()} call.
+     */
+    public long getTotalSum();
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
new file mode 100644
index 0000000..26a16e1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class GlobalMetricImpl implements GlobalMetric {
+
+    private AtomicLong numberOfSamples = new AtomicLong(0);
+    private Metric metric;
+
+    public GlobalMetricImpl(MetricType type) {
+        this.metric = new AtomicMetric(type);
+    }
+
+    /**
+     * Reset the internal state. Typically called after metric information has been collected and a new phase of
+     * collection is being requested for the next interval.
+     */
+    @Override
+    public void reset() {
+        metric.reset();
+        numberOfSamples.set(0);
+    }
+
+    @Override
+    public long getNumberOfSamples() {
+        return numberOfSamples.get();
+    }
+
+    @Override
+    public long getTotalSum() {
+        return metric.getValue();
+    }
+
+    @Override
+    public void change(long delta) {
+        metric.change(delta);
+        numberOfSamples.incrementAndGet();
+    }
+
+    @Override
+    public void increment() {
+        metric.increment();
+        numberOfSamples.incrementAndGet();
+    }
+
+    @Override
+    public String getName() {
+        return metric.getName();
+    }
+
+    @Override
+    public String getDescription() {
+        return metric.getDescription();
+    }
+
+    @Override
+    public long getValue() {
+        return metric.getValue();
+    }
+
+    @Override
+    public String getCurrentMetricState() {
+        return metric.getCurrentMetricState() + ", Number of samples: " + numberOfSamples.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
new file mode 100644
index 0000000..0e82ce4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.MEMORY_WAIT_TIME;
+
+/**
+ * Class that encapsulates the metrics regarding memory resources needed for servicing a request.
+ */
+public class MemoryMetricsHolder {
+    private final CombinableMetric memoryChunkSizeMetric;
+    private final CombinableMetric memoryWaitTimeMetric;
+    public static final MemoryMetricsHolder NO_OP_INSTANCE = new MemoryMetricsHolder(new ReadMetricQueue(false), null);
+    
+    public MemoryMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
+        this.memoryChunkSizeMetric = readMetrics.allotMetric(MEMORY_CHUNK_BYTES, tableName);
+        this.memoryWaitTimeMetric = readMetrics.allotMetric(MEMORY_WAIT_TIME, tableName);
+    }
+
+    public CombinableMetric getMemoryChunkSizeMetric() {
+        return memoryChunkSizeMetric;
+    }
+
+    public CombinableMetric getMemoryWaitTimeMetric() {
+        return memoryWaitTimeMetric;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
index aef792c..1ad1c7a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
@@ -18,47 +18,46 @@
 package org.apache.phoenix.monitoring;
 
 /**
- * Interface that exposes the various internal phoenix metrics collected.
- * Because metrics are dynamic in nature, it is not guaranteed that the
- * state exposed will always be in sync with each other. One should use
- * these metrics primarily for monitoring and debugging purposes. 
+ * Interface that represents phoenix-internal metric.
  */
 public interface Metric {
-    
     /**
-     * 
      * @return Name of the metric
      */
     public String getName();
-    
+
     /**
-     * 
      * @return Description of the metric
      */
     public String getDescription();
-    
+
     /**
-     * Reset the internal state. Typically called after
-     * metric information has been collected and a new
-     * phase of collection is being requested for the next
-     * interval.
+     * @return Current value of the metric
      */
-    public void reset();
-    
+    public long getValue();
+
     /**
+     * Change the metric by the specified amount
      * 
-     * @return String that represents the current state of the metric.
-     * Typically used to log the current state.
+     * @param delta
+     *            amount by which the metric value should be changed
      */
-    public String getCurrentMetricState();
-    
+    public void change(long delta);
+
+    /**
+     * Change the value of metric by 1
+     */
+    public void increment();
+
     /**
-     * @return Number of samples collected since the last {@link #reset()} call.
+     * @return String that represents the current state of the metric. Typically used for logging or reporting purposes.
      */
-    public long getNumberOfSamples();
+    public String getCurrentMetricState();
     
     /**
-     * @return Sum of the values of the metric sampled since the last {@link #reset()} call.
+     * Reset the metric
      */
-    public long getTotalSum();
+    public void reset();
+
 }
+

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
new file mode 100644
index 0000000..a0c2a4a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+public enum MetricType {
+
+    MUTATION_BATCH_SIZE("Batch sizes of mutations"),
+    MUTATION_BYTES("Size of mutations in bytes"),
+    MUTATION_COMMIT_TIME("Time it took to commit mutations"),
+    QUERY_TIME("Query times"),
+    NUM_PARALLEL_SCANS("Number of scans that were executed in parallel"),
+    SCAN_BYTES("Number of bytes read by scans"),
+    MEMORY_CHUNK_BYTES("Number of bytes allocated by the memory manager"),
+    MEMORY_WAIT_TIME("Number of milliseconds threads needed to wait for memory to be allocated through memory manager"),
+    MUTATION_SQL_COUNTER("Counter for number of mutation sql statements"),
+    SELECT_SQL_COUNTER("Counter for number of sql queries"),
+    TASK_QUEUE_WAIT_TIME("Time in milliseconds tasks had to wait in the queue of the thread pool executor"),
+    TASK_END_TO_END_TIME("Time in milliseconds spent by tasks from creation to completion"),
+    TASK_EXECUTION_TIME("Time in milliseconds tasks took to execute"),
+    TASK_EXECUTED_COUNTER("Counter for number of tasks submitted to the thread pool executor"),
+    TASK_REJECTED_COUNTER("Counter for number of tasks that were rejected by the thread pool executor"),
+    QUERY_TIMEOUT_COUNTER("Number of times query timed out"),
+    QUERY_FAILED_COUNTER("Number of times query failed"),
+    SPOOL_FILE_SIZE("Size of spool files created in bytes"),
+    SPOOL_FILE_COUNTER("Number of spool files created"),
+    CACHE_REFRESH_SPLITS_COUNTER("Number of times cache was refreshed because of splits"),
+    WALL_CLOCK_TIME_MS("Wall clock time elapsed for the overall query execution"),
+    RESULT_SET_TIME_MS("Wall clock time elapsed for reading all records using resultSet.next()");
+    
+    private final String description;
+
+    private MetricType(String description) {
+        this.description = description;
+    }
+
+    public String description() {
+        return description;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricsStopWatch.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricsStopWatch.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricsStopWatch.java
new file mode 100644
index 0000000..bffb9ad
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricsStopWatch.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import com.google.common.base.Stopwatch;
+
+/**
+ * 
+ * Stop watch that is cognizant of the fact whether or not metrics is enabled.
+ * If metrics isn't enabled it doesn't do anything. Otherwise, it delegates
+ * calls to a {@code Stopwatch}.
+ *
+ */
+final class MetricsStopWatch {
+    
+    private final boolean isMetricsEnabled;
+    private final Stopwatch stopwatch;
+    
+    MetricsStopWatch(boolean isMetricsEnabled) {
+        this.isMetricsEnabled = isMetricsEnabled;
+        this.stopwatch = new Stopwatch();
+    }
+    
+    void start()  {
+        if (isMetricsEnabled) {
+            stopwatch.start();
+        }
+    }
+    
+    void stop() {
+        if (isMetricsEnabled) {
+            if (stopwatch.isRunning()) {
+                stopwatch.stop();
+            }
+        }
+    }
+    
+    long getElapsedTimeInMs() {
+        if (isMetricsEnabled) {
+            return stopwatch.elapsedMillis();
+        }
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
new file mode 100644
index 0000000..e90da46
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Queue that tracks various writes/mutations related phoenix request metrics.
+ */
+public class MutationMetricQueue {
+    
+    // Map of table name -> mutation metric
+    private Map<String, MutationMetric> tableMutationMetric = new HashMap<>();
+    
+    public void addMetricsForTable(String tableName, MutationMetric metric) {
+        MutationMetric tableMetric = tableMutationMetric.get(tableName);
+        if (tableMetric == null) {
+            tableMutationMetric.put(tableName, metric);
+        } else {
+            tableMetric.combineMetric(metric);
+        }
+    }
+
+    public void combineMetricQueues(MutationMetricQueue other) {
+        Map<String, MutationMetric> tableMetricMap = other.tableMutationMetric;
+        for (Entry<String, MutationMetric> entry : tableMetricMap.entrySet()) {
+            addMetricsForTable(entry.getKey(), entry.getValue());
+        }
+    }
+    
+    /**
+     * Publish the metrics to wherever you want them published. The internal state is cleared out after every publish.
+     * @return map of table name -> list of pair of (metric name, metric value)
+     */
+    public Map<String, Map<String, Long>> aggregate() {
+        Map<String, Map<String, Long>> publishedMetrics = new HashMap<>();
+        for (Entry<String, MutationMetric> entry : tableMutationMetric.entrySet()) {
+            String tableName = entry.getKey();
+            MutationMetric metric = entry.getValue();
+            Map<String, Long> publishedMetricsForTable = publishedMetrics.get(tableName);
+            if (publishedMetricsForTable == null) {
+                publishedMetricsForTable = new HashMap<>();
+                publishedMetrics.put(tableName, publishedMetricsForTable);
+            }
+            publishedMetricsForTable.put(metric.getNumMutations().getName(), metric.getNumMutations().getValue());
+            publishedMetricsForTable.put(metric.getMutationsSizeBytes().getName(), metric.getMutationsSizeBytes().getValue());
+            publishedMetricsForTable.put(metric.getCommitTimeForMutations().getName(), metric.getCommitTimeForMutations().getValue());
+        }
+        return publishedMetrics;
+    }
+    
+    public void clearMetrics() {
+        tableMutationMetric.clear(); // help gc
+    }
+    
+    /**
+     * Class that holds together the various metrics associated with mutations.
+     */
+    public static class MutationMetric {
+        private final CombinableMetric numMutations = new CombinableMetricImpl(MUTATION_BATCH_SIZE);
+        private final CombinableMetric mutationsSizeBytes = new CombinableMetricImpl(MUTATION_BYTES);
+        private final CombinableMetric totalCommitTimeForMutations = new CombinableMetricImpl(MUTATION_COMMIT_TIME);
+
+        public MutationMetric(long numMutations, long mutationsSizeBytes, long commitTimeForMutations) {
+            this.numMutations.change(numMutations);
+            this.mutationsSizeBytes.change(mutationsSizeBytes);
+            this.totalCommitTimeForMutations.change(commitTimeForMutations);
+        }
+
+        public CombinableMetric getCommitTimeForMutations() {
+            return totalCommitTimeForMutations;
+        }
+
+        public CombinableMetric getNumMutations() {
+            return numMutations;
+        }
+
+        public CombinableMetric getMutationsSizeBytes() {
+            return mutationsSizeBytes;
+        }
+
+        public void combineMetric(MutationMetric other) {
+            this.numMutations.combine(other.numMutations);
+            this.mutationsSizeBytes.combine(other.mutationsSizeBytes);
+            this.totalCommitTimeForMutations.combine(other.totalCommitTimeForMutations);
+        }
+
+    }
+
+    /**
+     * Class to represent a no-op mutation metric. Used in places where request level metric tracking for mutations is not
+     * needed or desired.
+     */
+    public static class NoOpMutationMetricsQueue extends MutationMetricQueue {
+
+        public static final NoOpMutationMetricsQueue NO_OP_MUTATION_METRICS_QUEUE = new NoOpMutationMetricsQueue();
+
+        private NoOpMutationMetricsQueue() {}
+
+        @Override
+        public void addMetricsForTable(String tableName, MutationMetric metric) {}
+
+        @Override
+        public Map<String, Map<String, Long>> aggregate() { return Collections.emptyMap(); }
+        
+        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java
new file mode 100644
index 0000000..2d92116
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+/**
+ * Version of {@link Metric} that can be used when the metric isn't getting concurrently modified/accessed by multiple
+ * threads and the memory consistency effects of happen-before can be established. For example - phoenix client side
+ * metrics are modified/accessed by only one thread at a time. Further, the actions of threads in the phoenix client
+ * thread pool happen-before the actions of the thread that performs the aggregation of metrics. This makes
+ * {@link NonAtomicMetric} a good fit for storing Phoenix's client side request level metrics.
+ */
+class NonAtomicMetric implements Metric {
+
+    private final MetricType type;
+    private long value;
+
+    public NonAtomicMetric(MetricType type) {
+        this.type = type;
+    }
+
+    @Override
+    public String getName() {
+        return type.name();
+    }
+
+    @Override
+    public String getDescription() {
+        return type.description();
+    }
+
+    @Override
+    public long getValue() {
+        return value;
+    }
+
+    @Override
+    public void change(long delta) {
+        value += delta;
+    }
+
+    @Override
+    public void increment() {
+        value++;
+    }
+
+    @Override
+    public String getCurrentMetricState() {
+        return getName() + ": " + value;
+    }
+
+    @Override
+    public void reset() {
+        value = 0;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
new file mode 100644
index 0000000..1f71542
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.MetricType.CACHE_REFRESH_SPLITS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.RESULT_SET_TIME_MS;
+import static org.apache.phoenix.monitoring.MetricType.WALL_CLOCK_TIME_MS;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric;
+
+/**
+ * Class that represents the overall metrics associated with a query being executed by the phoenix.
+ */
+public class OverAllQueryMetrics {
+    private final MetricsStopWatch queryWatch;
+    private final MetricsStopWatch resultSetWatch;
+    private final CombinableMetric numParallelScans;
+    private final CombinableMetric wallClockTimeMS;
+    private final CombinableMetric resultSetTimeMS;
+    private final CombinableMetric queryTimedOut;
+    private final CombinableMetric queryFailed;
+    private final CombinableMetric cacheRefreshedDueToSplits;
+
+    public OverAllQueryMetrics(boolean isMetricsEnabled) {
+        queryWatch = new MetricsStopWatch(isMetricsEnabled);
+        resultSetWatch = new MetricsStopWatch(isMetricsEnabled);
+        numParallelScans = isMetricsEnabled ? new CombinableMetricImpl(NUM_PARALLEL_SCANS) : NoOpRequestMetric.INSTANCE;
+        wallClockTimeMS = isMetricsEnabled ? new CombinableMetricImpl(WALL_CLOCK_TIME_MS) : NoOpRequestMetric.INSTANCE;
+        resultSetTimeMS = isMetricsEnabled ? new CombinableMetricImpl(RESULT_SET_TIME_MS) : NoOpRequestMetric.INSTANCE;
+        queryTimedOut = isMetricsEnabled ? new CombinableMetricImpl(QUERY_TIMEOUT_COUNTER) : NoOpRequestMetric.INSTANCE;
+        queryFailed = isMetricsEnabled ? new CombinableMetricImpl(QUERY_FAILED_COUNTER) : NoOpRequestMetric.INSTANCE;
+        cacheRefreshedDueToSplits = isMetricsEnabled ? new CombinableMetricImpl(CACHE_REFRESH_SPLITS_COUNTER)
+                : NoOpRequestMetric.INSTANCE;
+    }
+
+    public void updateNumParallelScans(long numParallelScans) {
+        this.numParallelScans.change(numParallelScans);
+    }
+
+    public void queryTimedOut() {
+        queryTimedOut.increment();
+    }
+
+    public void queryFailed() {
+        queryFailed.increment();
+    }
+
+    public void cacheRefreshedDueToSplits() {
+        cacheRefreshedDueToSplits.increment();
+    }
+
+    public void startQuery() {
+        queryWatch.start();
+    }
+
+    public void endQuery() {
+        queryWatch.stop();
+        wallClockTimeMS.change(queryWatch.getElapsedTimeInMs());
+    }
+
+    public void startResultSetWatch() {
+        resultSetWatch.start();
+    }
+
+    public void stopResultSetWatch() {
+        resultSetWatch.stop();
+        resultSetTimeMS.change(resultSetWatch.getElapsedTimeInMs());
+    }
+
+    public Map<String, Long> publish() {
+        Map<String, Long> metricsForPublish = new HashMap<>();
+        metricsForPublish.put(numParallelScans.getName(), numParallelScans.getValue());
+        metricsForPublish.put(wallClockTimeMS.getName(), wallClockTimeMS.getValue());
+        metricsForPublish.put(resultSetTimeMS.getName(), resultSetTimeMS.getValue());
+        metricsForPublish.put(queryTimedOut.getName(), queryTimedOut.getValue());
+        metricsForPublish.put(queryFailed.getName(), queryFailed.getValue());
+        metricsForPublish.put(cacheRefreshedDueToSplits.getName(), cacheRefreshedDueToSplits.getValue());
+        return metricsForPublish;
+    }
+
+    public void reset() {
+        numParallelScans.reset();
+        wallClockTimeMS.reset();
+        resultSetTimeMS.reset();
+        queryTimedOut.reset();
+        queryFailed.reset();
+        cacheRefreshedDueToSplits.reset();
+        queryWatch.stop();
+        resultSetWatch.stop();
+    }
+
+    public OverAllQueryMetrics combine(OverAllQueryMetrics metric) {
+        cacheRefreshedDueToSplits.combine(metric.cacheRefreshedDueToSplits);
+        queryFailed.combine(metric.queryFailed);
+        queryTimedOut.combine(metric.queryTimedOut);
+        numParallelScans.combine(metric.numParallelScans);
+        return this;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java
deleted file mode 100644
index 28e2f2e..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.monitoring;
-
-/**
- * Central place where we keep track of all the internal
- * phoenix metrics that we track.
- * 
- */
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.phoenix.query.QueryServicesOptions;
-
-public class PhoenixMetrics {
-    private static final boolean isMetricsEnabled = QueryServicesOptions.withDefaults().isMetricsEnabled();
-
-    public static boolean isMetricsEnabled() {
-        return isMetricsEnabled;
-    }
-
-    public enum SizeMetric {
-        MUTATION_BATCH_SIZE("CumulativeBatchSizesOfMutations", "Cumulative batch sizes of mutations"),
-        MUTATION_BYTES("CumulativeMutationSize", "Cumulative size of mutations in bytes"),
-        MUTATION_COMMIT_TIME("CumulativeMutationTime", "Cumulative time it took to send mutations"),
-        QUERY_TIME("QueryTime", "Cumulative query times"),
-        PARALLEL_SCANS("CumulativeNumberOfParallelScans", "Cumulative number of scans executed that were executed in parallel"),
-        SCAN_BYTES("CumulativeScanBytesSize", "Cumulative number of bytes read by scans"),
-        SPOOL_FILE_SIZE("CumulativeSpoolFilesSize", "Cumulative size of spool files created in bytes"),
-        MEMORY_MANAGER_BYTES("CumulativeBytesAllocated", "Cumulative number of bytes allocated by the memory manager"),
-        MEMORY_WAIT_TIME("CumulativeMemoryWaitTime", "Cumulative number of milliseconds threads needed to wait for memory to be allocated through memory manager"),
-        TASK_QUEUE_WAIT_TIME("CumulativeTaskQueueWaitTime", "Cumulative time in milliseconds tasks had to wait in the queue of the thread pool executor"),
-        TASK_END_TO_END_TIME("CumulativeTaskEndToEndTime", "Cumulative time in milliseconds spent by tasks from creation to completion"),
-        TASK_EXECUTION_TIME("CumulativeTaskExecutionTime", "Cumulative time in milliseconds tasks took to execute");
-
-        private final SizeStatistic metric;
-
-        private SizeMetric(String metricName, String metricDescription) {
-            metric = new SizeStatistic(metricName, metricDescription);
-        }
-
-        public void update(long value) {
-            if (isMetricsEnabled) {
-                metric.add(value);
-            }
-        }
-        
-        // exposed for testing.
-        public Metric getMetric() {
-            return metric;
-        }
-        
-        @Override
-        public String toString() {
-            return metric.toString();
-        }
-    }
-
-    public enum CountMetric {
-        MUTATION_COUNT("NumMutationCounter", "Counter for number of mutation statements"),
-        QUERY_COUNT("NumQueryCounter", "Counter for number of queries"),
-        TASK_COUNT("NumberOfTasksCounter", "Counter for number of tasks submitted to the thread pool executor"),
-        REJECTED_TASK_COUNT("RejectedTasksCounter", "Counter for number of tasks that were rejected by the thread pool executor"),
-        QUERY_TIMEOUT("QueryTimeoutCounter", "Number of times query timed out"),
-        FAILED_QUERY("QueryFailureCounter", "Number of times query failed"),
-        NUM_SPOOL_FILE("NumSpoolFilesCounter", "Number of spool files created");
-
-        private final Counter metric;
-
-        private CountMetric(String metricName, String metricDescription) {
-            metric = new Counter(metricName, metricDescription);
-        }
-
-        public void increment() {
-            if (isMetricsEnabled) {
-                metric.increment();
-            }
-        }
-        
-        // exposed for testing.
-        public Metric getMetric() {
-            return metric;
-        }
-        
-        @Override
-        public String toString() {
-            return metric.toString();
-        }
-    }
-    
-    public static Collection<Metric> getMetrics() {
-        List<Metric> metrics = new ArrayList<>();
-        for (SizeMetric s : SizeMetric.values()) {
-            metrics.add(s.metric);
-        }
-        for (CountMetric s : CountMetric.values()) {
-            metrics.add(s.metric);
-        }
-        return metrics;
-    }
-
-}    
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
new file mode 100644
index 0000000..e6c6be2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.annotation.Nonnull;
+
+import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Queue of all metrics associated with performing reads from the cluster.
+ */
+public class ReadMetricQueue {
+
+    private static final int MAX_QUEUE_SIZE = 20000; // TODO: should this be configurable?
+
+    private final ConcurrentMap<MetricKey, Queue<CombinableMetric>> metricsMap = new ConcurrentHashMap<>();
+
+    private final boolean isRequestMetricsEnabled;
+
+    public ReadMetricQueue(boolean isRequestMetricsEnabled) {
+        this.isRequestMetricsEnabled = isRequestMetricsEnabled;
+    }
+
+    public CombinableMetric allotMetric(MetricType type, String tableName) {
+        if (!isRequestMetricsEnabled) { return NoOpRequestMetric.INSTANCE; }
+        MetricKey key = new MetricKey(type, tableName);
+        Queue<CombinableMetric> q = getMetricQueue(key);
+        CombinableMetric metric = getMetric(type);
+        q.offer(metric);
+        return metric;
+    }
+
+    @VisibleForTesting
+    public CombinableMetric getMetric(MetricType type) {
+        CombinableMetric metric = new CombinableMetricImpl(type);
+        return metric;
+    }
+
+    /**
+     * @return map of table name -> list of pair of (metric name, metric value)
+     */
+    public Map<String, Map<String, Long>> aggregate() {
+        Map<String, Map<String, Long>> publishedMetrics = new HashMap<>();
+        for (Entry<MetricKey, Queue<CombinableMetric>> entry : metricsMap.entrySet()) {
+            String tableNameToPublish = entry.getKey().tableName;
+            Collection<CombinableMetric> metrics = entry.getValue();
+            if (metrics.size() > 0) {
+                CombinableMetric m = combine(metrics);
+                Map<String, Long> map = publishedMetrics.get(tableNameToPublish);
+                if (map == null) {
+                    map = new HashMap<>();
+                    publishedMetrics.put(tableNameToPublish, map);
+                }
+                map.put(m.getName(), m.getValue());
+            }
+        }
+        return publishedMetrics;
+    }
+    
+    public void clearMetrics() {
+        metricsMap.clear(); // help gc
+    }
+
+    private static CombinableMetric combine(Collection<CombinableMetric> metrics) {
+        int size = metrics.size();
+        if (size == 0) { throw new IllegalArgumentException("Metrics collection needs to have at least one element"); }
+        Iterator<CombinableMetric> itr = metrics.iterator();
+        CombinableMetric combinedMetric = itr.next();
+        while (itr.hasNext()) {
+            combinedMetric = combinedMetric.combine(itr.next());
+        }
+        return combinedMetric;
+    }
+
+    /**
+     * Combine the metrics. This method should only be called in a single threaded manner when the two metric holders
+     * are not getting modified.
+     */
+    public ReadMetricQueue combineReadMetrics(ReadMetricQueue other) {
+        ConcurrentMap<MetricKey, Queue<CombinableMetric>> otherMetricsMap = other.metricsMap;
+        for (Entry<MetricKey, Queue<CombinableMetric>> entry : otherMetricsMap.entrySet()) {
+            MetricKey key = entry.getKey();
+            Queue<CombinableMetric> otherQueue = entry.getValue();
+            CombinableMetric combinedMetric = null;
+            // combine the metrics corresponding to this metric key before putting it in the queue.
+            for (CombinableMetric m : otherQueue) {
+                if (combinedMetric == null) {
+                    combinedMetric = m;
+                } else {
+                    combinedMetric.combine(m);
+                }
+            }
+            if (combinedMetric != null) {
+                Queue<CombinableMetric> thisQueue = getMetricQueue(key);
+                thisQueue.offer(combinedMetric);
+            }
+        }
+        return this;
+    }
+
+    /**
+     * Inner class whose instances are used as keys in the metrics map.
+     */
+    private static class MetricKey {
+        @Nonnull
+        private final MetricType type;
+
+        @Nonnull
+        private final String tableName;
+
+        MetricKey(MetricType type, String tableName) {
+            checkNotNull(type);
+            checkNotNull(tableName);
+            this.type = type;
+            this.tableName = tableName;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + tableName.hashCode();
+            result = prime * result + type.hashCode();
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) return true;
+            if (obj == null) return false;
+            if (getClass() != obj.getClass()) return false;
+            MetricKey other = (MetricKey)obj;
+            if (tableName.equals(other.tableName) && type == other.type) return true;
+            return false;
+        }
+
+    }
+
+    private Queue<CombinableMetric> getMetricQueue(MetricKey key) {
+        Queue<CombinableMetric> q = metricsMap.get(key);
+        if (q == null) {
+            q = new LinkedBlockingQueue<CombinableMetric>(MAX_QUEUE_SIZE);
+            Queue<CombinableMetric> curQ = metricsMap.putIfAbsent(key, q);
+            if (curQ != null) {
+                q = curQ;
+            }
+        }
+        return q;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java
deleted file mode 100644
index 9eca754..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.monitoring;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * 
- * Statistic that keeps track of the sum of long values that 
- * could be used to represent a phoenix metric. For performance 
- * reasons the internal state in this metric is not strictly covariant
- * and hence should only be used for monitoring and debugging purposes. 
- */
-class SizeStatistic implements Metric {
-    
-    private final AtomicLong total = new AtomicLong(0);
-    private final AtomicLong numSamples = new AtomicLong(0);
-    private final String name;
-    private final String description;
-    
-    public SizeStatistic(String name, String description) {
-        this.name = name;
-        this.description = description;
-    }
-    
-    @Override
-    public String getName() {
-        return name;
-    }
-    
-    @Override
-    public String getDescription() {
-        return description;
-    }   
-
-    @Override
-    public void reset() {
-        total.set(0);
-        numSamples.set(0);
-    }
-    
-    @Override
-    public String getCurrentMetricState() {
-        return "Name:" + description + ", Total: " + total.get() + ", Number of samples: " + numSamples.get();
-    }
-
-    @Override
-    public long getNumberOfSamples() {
-        return numSamples.get();
-    }
-
-    @Override
-    public long getTotalSum() {
-        return total.get();
-    }
-    
-    public long add(long value) {
-        // there is a race condition here but what the heck.
-        numSamples.incrementAndGet();
-        return total.addAndGet(value);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
new file mode 100644
index 0000000..4373887
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+
+/**
+ * Class that encapsulates the various metrics associated with the spooling done by phoenix as part of servicing a
+ * request.
+ */
+public class SpoolingMetricsHolder {
+
+    private final CombinableMetric spoolFileSizeMetric;
+    private final CombinableMetric numSpoolFileMetric;
+    public static final SpoolingMetricsHolder NO_OP_INSTANCE = new SpoolingMetricsHolder(new ReadMetricQueue(false), "");
+
+    public SpoolingMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
+        this.spoolFileSizeMetric = readMetrics.allotMetric(MetricType.SPOOL_FILE_SIZE, tableName);
+        this.numSpoolFileMetric = readMetrics.allotMetric(MetricType.SPOOL_FILE_COUNTER, tableName);
+    }
+
+    public CombinableMetric getSpoolFileSizeMetric() {
+        return spoolFileSizeMetric;
+    }
+
+    public CombinableMetric getNumSpoolFileMetric() {
+        return numSpoolFileMetric;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
new file mode 100644
index 0000000..98ff57c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_QUEUE_WAIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_REJECTED_COUNTER;
+
+
+/**
+ * Class to encapsulate the various metrics associated with submitting and executing a task to the phoenix client
+ * thread pool.
+ */
+public class TaskExecutionMetricsHolder {
+
+    private final CombinableMetric taskQueueWaitTime;
+    private final CombinableMetric taskEndToEndTime;
+    private final CombinableMetric taskExecutionTime;
+    private final CombinableMetric numTasks;
+    private final CombinableMetric numRejectedTasks;
+    public static final TaskExecutionMetricsHolder NO_OP_INSTANCE = new TaskExecutionMetricsHolder(new ReadMetricQueue(false), "");
+    
+    public TaskExecutionMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
+        taskQueueWaitTime = readMetrics.allotMetric(TASK_QUEUE_WAIT_TIME, tableName);
+        taskEndToEndTime = readMetrics.allotMetric(TASK_END_TO_END_TIME, tableName);
+        taskExecutionTime = readMetrics.allotMetric(TASK_EXECUTION_TIME, tableName);
+        numTasks = readMetrics.allotMetric(TASK_EXECUTED_COUNTER, tableName);
+        numRejectedTasks = readMetrics.allotMetric(TASK_REJECTED_COUNTER, tableName);
+    }
+
+    public CombinableMetric getTaskQueueWaitTime() {
+        return taskQueueWaitTime;
+    }
+
+    public CombinableMetric getTaskEndToEndTime() {
+        return taskEndToEndTime;
+    }
+
+    public CombinableMetric getTaskExecutionTime() {
+        return taskExecutionTime;
+    }
+
+    public CombinableMetric getNumTasks() {
+        return numTasks;
+    }
+
+    public CombinableMetric getNumRejectedTasks() {
+        return numRejectedTasks;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
index 898a919..c16b86d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
@@ -45,7 +45,7 @@ public abstract class BaseQueryServicesImpl implements QueryServices {
                 options.getKeepAliveMs(), 
                 options.getThreadPoolSize(), 
                 options.getQueueSize(),
-                options.isMetricsEnabled());
+                options.isGlobalMetricsEnabled());
         this.memoryManager = new GlobalMemoryManager(
                 Runtime.getRuntime().maxMemory() * options.getMaxMemoryPerc() / 100,
                 options.getMaxMemoryWaitMs());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 3e7d084..825cc83 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -157,7 +157,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String DELAY_FOR_SCHEMA_UPDATE_CHECK = "phoenix.schema.change.delay";
     public static final String DEFAULT_KEEP_DELETED_CELLS_ATTRIB = "phoenix.table.default.keep.deleted.cells";
     public static final String DEFAULT_STORE_NULLS_ATTRIB = "phoenix.table.default.store.nulls";
-    public static final String METRICS_ENABLED = "phoenix.query.metrics.enabled";
+    public static final String GLOBAL_METRICS_ENABLED = "phoenix.query.global.metrics.enabled";
     
     // rpc queue configs
     public static final String INDEX_HANDLER_COUNT_ATTRIB = "phoenix.rpc.index.handler.count";
@@ -165,6 +165,7 @@ public interface QueryServices extends SQLCloseable {
     
     public static final String FORCE_ROW_KEY_ORDER_ATTRIB = "phoenix.query.force.rowkeyorder";
     public static final String ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB = "phoenix.functions.allowUserDefinedFunctions";
+    public static final String COLLECT_REQUEST_LEVEL_METRICS = "phoenix.query.request.metrics.enabled";
     
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 02c695e..4e8879b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -18,15 +18,16 @@
 package org.apache.phoenix.query;
 
 import static org.apache.phoenix.query.QueryServices.ALLOW_ONLINE_TABLE_SCHEMA_UPDATE;
-import static org.apache.phoenix.query.QueryServices.ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME;
 import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS;
 import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK;
 import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.GLOBAL_METRICS_ENABLED;
 import static org.apache.phoenix.query.QueryServices.GROUPBY_MAX_CACHE_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB;
@@ -43,7 +44,6 @@ import static org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_TIME_TO_LI
 import static org.apache.phoenix.query.QueryServices.MAX_SERVER_METADATA_CACHE_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.METRICS_ENABLED;
 import static org.apache.phoenix.query.QueryServices.MIN_STATS_UPDATE_FREQ_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK;
@@ -187,13 +187,14 @@ public class QueryServicesOptions {
 
     // TODO Change this to true as part of PHOENIX-1543
     public static final boolean DEFAULT_AUTO_COMMIT = false;
-    public static final boolean DEFAULT_IS_METRICS_ENABLED = true;
+    public static final boolean DEFAULT_IS_GLOBAL_METRICS_ENABLED = true;
     
     private static final String DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY = ClientRpcControllerFactory.class.getName();
     
     public static final boolean DEFAULT_USE_BYTE_BASED_REGEX = false;
     public static final boolean DEFAULT_FORCE_ROW_KEY_ORDER = false;
     public static final boolean DEFAULT_ALLOW_USER_DEFINED_FUNCTIONS = false;
+    public static final boolean DEFAULT_REQUEST_LEVEL_METRICS_ENABLED = false;
 
     private final Configuration config;
 
@@ -246,10 +247,11 @@ public class QueryServicesOptions {
             .setIfUnset(ALLOW_ONLINE_TABLE_SCHEMA_UPDATE, DEFAULT_ALLOW_ONLINE_TABLE_SCHEMA_UPDATE)
             .setIfUnset(NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_RETRIES_FOR_SCHEMA_UPDATE_CHECK)
             .setIfUnset(DELAY_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_DELAY_FOR_SCHEMA_UPDATE_CHECK)
-            .setIfUnset(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED)
+            .setIfUnset(GLOBAL_METRICS_ENABLED, DEFAULT_IS_GLOBAL_METRICS_ENABLED)
             .setIfUnset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY)
             .setIfUnset(USE_BYTE_BASED_REGEX_ATTRIB, DEFAULT_USE_BYTE_BASED_REGEX)
-            .setIfUnset(FORCE_ROW_KEY_ORDER_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER);
+            .setIfUnset(FORCE_ROW_KEY_ORDER_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER)
+            .setIfUnset(COLLECT_REQUEST_LEVEL_METRICS, DEFAULT_REQUEST_LEVEL_METRICS_ENABLED)
             ;
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user set
@@ -445,10 +447,10 @@ public class QueryServicesOptions {
         return config.getInt(GROUPBY_SPILL_FILES_ATTRIB, DEFAULT_GROUPBY_SPILL_FILES);
     }
     
-    public boolean isMetricsEnabled() {
-        return config.getBoolean(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED);
+    public boolean isGlobalMetricsEnabled() {
+        return config.getBoolean(GLOBAL_METRICS_ENABLED, DEFAULT_IS_GLOBAL_METRICS_ENABLED);
     }
-    
+
     public boolean isUseByteBasedRegex() {
         return config.getBoolean(USE_BYTE_BASED_REGEX_ATTRIB, DEFAULT_USE_BYTE_BASED_REGEX);
     }
@@ -526,11 +528,7 @@ public class QueryServicesOptions {
         return this;
     
     }
-    public QueryServicesOptions setMetricsEnabled(boolean flag) {
-        config.setBoolean(METRICS_ENABLED, flag);
-        return this;
-    }
-
+    
     public QueryServicesOptions setUseByteBasedRegex(boolean flag) {
         config.setBoolean(USE_BYTE_BASED_REGEX_ATTRIB, flag);
         return this;
@@ -540,4 +538,5 @@ public class QueryServicesOptions {
         config.setBoolean(FORCE_ROW_KEY_ORDER_ATTRIB, forceRowKeyOrder);
         return this;
     }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
index 265fc78..159e0c9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
@@ -17,12 +17,23 @@
  */
 package org.apache.phoenix.trace;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
-import org.apache.commons.configuration.Configuration;
+import static org.apache.phoenix.metrics.MetricInfo.ANNOTATION;
+import static org.apache.phoenix.metrics.MetricInfo.DESCRIPTION;
+import static org.apache.phoenix.metrics.MetricInfo.END;
+import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME;
+import static org.apache.phoenix.metrics.MetricInfo.PARENT;
+import static org.apache.phoenix.metrics.MetricInfo.SPAN;
+import static org.apache.phoenix.metrics.MetricInfo.START;
+import static org.apache.phoenix.metrics.MetricInfo.TAG;
+import static org.apache.phoenix.metrics.MetricInfo.TRACE;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
 import org.apache.commons.configuration.SubsetConfiguration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,20 +42,15 @@ import org.apache.hadoop.metrics2.AbstractMetric;
 import org.apache.hadoop.metrics2.MetricsRecord;
 import org.apache.hadoop.metrics2.MetricsSink;
 import org.apache.hadoop.metrics2.MetricsTag;
-import org.apache.phoenix.metrics.*;
+import org.apache.phoenix.metrics.MetricInfo;
+import org.apache.phoenix.metrics.Metrics;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.QueryUtil;
 
-import javax.annotation.Nullable;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.*;
-
-import static org.apache.phoenix.metrics.MetricInfo.*;
-import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 
 /**
  * Write the metrics to a phoenix table.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
index 06534d1..cbe5a1a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
@@ -154,4 +154,9 @@ public class JDBCUtil {
         }
         return Boolean.valueOf(autoCommit);
     }
+    
+    public static boolean isCollectingRequestLevelMetricsEnabled(String url, Properties overrideProps, ReadOnlyProps queryServicesProps) throws SQLException {
+        String batchSizeStr = findProperty(url, overrideProps, PhoenixRuntime.REQUEST_METRIC_ATTRIB);
+        return (batchSizeStr == null ? queryServicesProps.getBoolean(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, QueryServicesOptions.DEFAULT_REQUEST_LEVEL_METRICS_ENABLED) : Boolean.parseBoolean(batchSizeStr));
+    }
 }