You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2015/06/27 01:29:06 UTC

[4/4] phoenix git commit: PHOENIX-1819 Build a framework to capture and report phoenix client side request level metrics

PHOENIX-1819 Build a framework to capture and report phoenix client side request level metrics


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7e29d57b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7e29d57b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7e29d57b

Branch: refs/heads/4.x-HBase-0.98
Commit: 7e29d57bda0eb66b1dbd102060253e1b1468f052
Parents: 006aec5
Author: Samarth <sa...@salesforce.com>
Authored: Fri Jun 26 16:28:50 2015 -0700
Committer: Samarth <sa...@salesforce.com>
Committed: Fri Jun 26 16:28:50 2015 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/PhoenixMetricsIT.java       | 147 ----
 .../apache/phoenix/execute/PartialCommitIT.java |   1 +
 .../phoenix/monitoring/PhoenixMetricsIT.java    | 815 +++++++++++++++++++
 .../apache/phoenix/cache/ServerCacheClient.java |   7 +
 .../apache/phoenix/compile/DeleteCompiler.java  |  50 +-
 .../MutatingParallelIteratorFactory.java        |  51 +-
 .../phoenix/compile/StatementContext.java       |  49 +-
 .../apache/phoenix/compile/UpsertCompiler.java  |  80 +-
 .../apache/phoenix/execute/AggregatePlan.java   |   8 +-
 .../apache/phoenix/execute/HashJoinPlan.java    |   7 +
 .../apache/phoenix/execute/MutationState.java   | 290 ++++---
 .../org/apache/phoenix/execute/UnionPlan.java   |   8 +-
 .../phoenix/iterate/BaseResultIterators.java    |  15 +-
 .../phoenix/iterate/ChunkedResultIterator.java  |  21 +-
 .../iterate/ParallelIteratorFactory.java        |   4 +-
 .../phoenix/iterate/ParallelIterators.java      |  25 +-
 .../iterate/RoundRobinResultIterator.java       |   4 +-
 .../phoenix/iterate/ScanningResultIterator.java |  38 +-
 .../apache/phoenix/iterate/SerialIterators.java |  23 +-
 .../phoenix/iterate/SpoolingResultIterator.java |  49 +-
 .../phoenix/iterate/TableResultIterator.java    |  17 +-
 .../phoenix/iterate/UnionResultIterators.java   |  70 +-
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  28 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |  21 +-
 .../apache/phoenix/jdbc/PhoenixResultSet.java   |  48 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  20 +-
 .../java/org/apache/phoenix/job/JobManager.java |  60 +-
 .../phoenix/mapreduce/CsvBulkLoadTool.java      |  10 +-
 .../phoenix/mapreduce/PhoenixRecordReader.java  |  12 +-
 .../phoenix/memory/GlobalMemoryManager.java     |   5 -
 .../apache/phoenix/monitoring/AtomicMetric.java |  70 ++
 .../phoenix/monitoring/CombinableMetric.java    |  77 ++
 .../monitoring/CombinableMetricImpl.java        |  77 ++
 .../org/apache/phoenix/monitoring/Counter.java  |  85 --
 .../phoenix/monitoring/GlobalClientMetrics.java | 117 +++
 .../apache/phoenix/monitoring/GlobalMetric.java |  37 +
 .../phoenix/monitoring/GlobalMetricImpl.java    |  74 ++
 .../phoenix/monitoring/MemoryMetricsHolder.java |  43 +
 .../org/apache/phoenix/monitoring/Metric.java   |  45 +-
 .../apache/phoenix/monitoring/MetricType.java   |  55 ++
 .../phoenix/monitoring/MetricsStopWatch.java    |  59 ++
 .../phoenix/monitoring/MutationMetricQueue.java | 131 +++
 .../phoenix/monitoring/NonAtomicMetric.java     |  71 ++
 .../phoenix/monitoring/OverAllQueryMetrics.java | 121 +++
 .../phoenix/monitoring/PhoenixMetrics.java      | 118 ---
 .../phoenix/monitoring/ReadMetricQueue.java     | 180 ++++
 .../phoenix/monitoring/SizeStatistic.java       |  78 --
 .../monitoring/SpoolingMetricsHolder.java       |  43 +
 .../monitoring/TaskExecutionMetricsHolder.java  |  68 ++
 .../phoenix/query/BaseQueryServicesImpl.java    |   2 +-
 .../org/apache/phoenix/query/QueryServices.java |   3 +-
 .../phoenix/query/QueryServicesOptions.java     |  25 +-
 .../phoenix/trace/PhoenixMetricsSink.java       |  36 +-
 .../java/org/apache/phoenix/util/JDBCUtil.java  |   5 +
 .../org/apache/phoenix/util/PhoenixRuntime.java | 175 +++-
 .../iterate/SpoolingResultIteratorTest.java     |   4 +-
 56 files changed, 2933 insertions(+), 849 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java
deleted file mode 100644
index edb4042..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.FAILED_QUERY;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.MUTATION_COUNT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.NUM_SPOOL_FILE;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.QUERY_COUNT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.QUERY_TIMEOUT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.REJECTED_TASK_COUNT;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BATCH_SIZE;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BYTES;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_COMMIT_TIME;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.PARALLEL_SCANS;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.QUERY_TIME;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.SCAN_BYTES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-
-import org.apache.phoenix.monitoring.Metric;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.junit.Test;
-
-public class PhoenixMetricsIT extends BaseHBaseManagedTimeIT {
-    
-    @Test
-    public void testResetPhoenixMetrics() {
-        resetMetrics();
-        for (Metric m : PhoenixRuntime.getInternalPhoenixMetrics()) {
-            assertEquals(0, m.getTotalSum());
-            assertEquals(0, m.getNumberOfSamples());
-        }
-    }
-    
-    @Test
-    public void testPhoenixMetricsForQueries() throws Exception {
-        createTableAndInsertValues("T", true);
-        resetMetrics(); // we want to count metrics related only to the below query
-        Connection conn = DriverManager.getConnection(getUrl());
-        String query = "SELECT * FROM T";
-        ResultSet rs = conn.createStatement().executeQuery(query);
-        while (rs.next()) {
-            rs.getString(1);
-            rs.getString(2);
-        }
-        assertEquals(1, PARALLEL_SCANS.getMetric().getTotalSum());
-        assertEquals(1, QUERY_COUNT.getMetric().getTotalSum());
-        assertEquals(0, REJECTED_TASK_COUNT.getMetric().getTotalSum());
-        assertEquals(0, QUERY_TIMEOUT.getMetric().getTotalSum());
-        assertEquals(0, FAILED_QUERY.getMetric().getTotalSum());
-        assertEquals(0, NUM_SPOOL_FILE.getMetric().getTotalSum());
-        assertEquals(0, MUTATION_BATCH_SIZE.getMetric().getTotalSum());
-        assertEquals(0, MUTATION_BYTES.getMetric().getTotalSum());
-        assertEquals(0, MUTATION_COMMIT_TIME.getMetric().getTotalSum());
-        
-        assertTrue(SCAN_BYTES.getMetric().getTotalSum() > 0);
-        assertTrue(QUERY_TIME.getMetric().getTotalSum() > 0);
-    }
-    
-    @Test
-    public void testPhoenixMetricsForMutations() throws Exception {
-        createTableAndInsertValues("T", true);
-        assertEquals(10, MUTATION_BATCH_SIZE.getMetric().getTotalSum());
-        assertEquals(10, MUTATION_COUNT.getMetric().getTotalSum());
-        assertTrue(MUTATION_BYTES.getMetric().getTotalSum() > 0);
-        assertTrue(MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0);
-        assertEquals(0, PARALLEL_SCANS.getMetric().getTotalSum());
-        assertEquals(0, QUERY_COUNT.getMetric().getTotalSum());
-        assertEquals(0, REJECTED_TASK_COUNT.getMetric().getTotalSum());
-        assertEquals(0, QUERY_TIMEOUT.getMetric().getTotalSum());
-        assertEquals(0, FAILED_QUERY.getMetric().getTotalSum());
-        assertEquals(0, NUM_SPOOL_FILE.getMetric().getTotalSum());
-    }
-    
-    
-    @Test
-    public void testPhoenixMetricsForUpsertSelect() throws Exception { 
-        createTableAndInsertValues("T", true);
-        resetMetrics();
-        String ddl = "CREATE TABLE T2 (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
-        Connection conn = DriverManager.getConnection(getUrl());
-        conn.createStatement().execute(ddl);
-        resetMetrics();
-        String dml = "UPSERT INTO T2 (K, V) SELECT K, V FROM T";
-        conn.createStatement().executeUpdate(dml);
-        conn.commit();
-        assertEquals(10, MUTATION_BATCH_SIZE.getMetric().getTotalSum());
-        assertEquals(1, MUTATION_COUNT.getMetric().getTotalSum());
-        assertEquals(1, PARALLEL_SCANS.getMetric().getTotalSum());
-        assertEquals(0, QUERY_TIME.getMetric().getTotalSum());
-        assertTrue(SCAN_BYTES.getMetric().getTotalSum() > 0);
-        assertTrue(MUTATION_BYTES.getMetric().getTotalSum() > 0);
-        assertTrue(MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0);
-        assertEquals(0, QUERY_COUNT.getMetric().getTotalSum());
-        assertEquals(0, REJECTED_TASK_COUNT.getMetric().getTotalSum());
-        assertEquals(0, QUERY_TIMEOUT.getMetric().getTotalSum());
-        assertEquals(0, FAILED_QUERY.getMetric().getTotalSum());
-        assertEquals(0, NUM_SPOOL_FILE.getMetric().getTotalSum());
-    }
-    
-    private static void resetMetrics() {
-        for (Metric m : PhoenixRuntime.getInternalPhoenixMetrics()) {
-            m.reset();
-        }
-    }
-    
-    private static void createTableAndInsertValues(String tableName, boolean resetMetricsAfterTableCreate) throws Exception {
-        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
-        Connection conn = DriverManager.getConnection(getUrl());
-        conn.createStatement().execute(ddl);
-        if (resetMetricsAfterTableCreate) {
-            resetMetrics();
-        }
-        // executing 10 upserts/mutations.
-        String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
-        PreparedStatement stmt = conn.prepareStatement(dml);
-        for (int i = 1; i <= 10; i++) {
-            stmt.setString(1, "key" + i);
-            stmt.setString(2, "value" + i);
-            stmt.executeUpdate();
-        }
-        conn.commit();
-    }
-    
-    
-    
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index c8696e2..e0f0a3c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -260,6 +260,7 @@ public class PartialCommitIT {
         PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class));
         final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator());
         return new PhoenixConnection(phxCon) {
+            @Override
             protected MutationState newMutationState(int maxSize) {
                 return new MutationState(maxSize, this, mutations);
             };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
new file mode 100644
index 0000000..d9ca8e8
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -0,0 +1,815 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_SIZE;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_NUM_PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_REJECTED_TASK_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FILE_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_END_TO_END_TIME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTION_TIME;
+import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
+import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class PhoenixMetricsIT extends BaseOwnClusterHBaseManagedTimeIT {
+
+    private static final List<String> mutationMetricsToSkip = Lists
+            .newArrayList(MetricType.MUTATION_COMMIT_TIME.name());
+    private static final List<String> readMetricsToSkip = Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME.name(),
+            MetricType.TASK_EXECUTION_TIME.name(), MetricType.TASK_END_TO_END_TIME.name());
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        // Enable request metric collection at the driver level
+        props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Test
+    public void testResetGlobalPhoenixMetrics() {
+        resetGlobalMetrics();
+        for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) {
+            assertEquals(0, m.getTotalSum());
+            assertEquals(0, m.getNumberOfSamples());
+        }
+    }
+
+    @Test
+    public void testGlobalPhoenixMetricsForQueries() throws Exception {
+        createTableAndInsertValues("T", true);
+        resetGlobalMetrics(); // we want to count metrics related only to the below query
+        Connection conn = DriverManager.getConnection(getUrl());
+        String query = "SELECT * FROM T";
+        ResultSet rs = conn.createStatement().executeQuery(query);
+        while (rs.next()) {
+            rs.getString(1);
+            rs.getString(2);
+        }
+        assertEquals(1, GLOBAL_NUM_PARALLEL_SCANS.getMetric().getTotalSum());
+        assertEquals(1, GLOBAL_SELECT_SQL_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_REJECTED_TASK_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_QUERY_TIMEOUT_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_MUTATION_BATCH_SIZE.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_MUTATION_BYTES.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_MUTATION_COMMIT_TIME.getMetric().getTotalSum());
+
+        assertTrue(GLOBAL_SCAN_BYTES.getMetric().getTotalSum() > 0);
+        assertTrue(GLOBAL_QUERY_TIME.getMetric().getTotalSum() > 0);
+        assertTrue(GLOBAL_TASK_END_TO_END_TIME.getMetric().getTotalSum() > 0);
+        assertTrue(GLOBAL_TASK_EXECUTION_TIME.getMetric().getTotalSum() > 0);
+    }
+
+    @Test
+    public void testGlobalPhoenixMetricsForMutations() throws Exception {
+        createTableAndInsertValues("T", true);
+        assertEquals(10, GLOBAL_MUTATION_BATCH_SIZE.getMetric().getTotalSum());
+        assertEquals(10, GLOBAL_MUTATION_SQL_COUNTER.getMetric().getTotalSum());
+        assertTrue(GLOBAL_MUTATION_BYTES.getMetric().getTotalSum() > 0);
+        assertTrue(GLOBAL_MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0);
+        assertEquals(0, GLOBAL_NUM_PARALLEL_SCANS.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_SELECT_SQL_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_REJECTED_TASK_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_QUERY_TIMEOUT_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum());
+    }
+
+    @Test
+    public void testGlobalPhoenixMetricsForUpsertSelect() throws Exception {
+        createTableAndInsertValues("T", true);
+        resetGlobalMetrics();
+        String ddl = "CREATE TABLE T2 (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute(ddl);
+        resetGlobalMetrics();
+        String dml = "UPSERT INTO T2 (K, V) SELECT K, V FROM T";
+        conn.createStatement().executeUpdate(dml);
+        conn.commit();
+        assertEquals(10, GLOBAL_MUTATION_BATCH_SIZE.getMetric().getTotalSum());
+        assertEquals(1, GLOBAL_MUTATION_SQL_COUNTER.getMetric().getTotalSum());
+        assertEquals(1, GLOBAL_NUM_PARALLEL_SCANS.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_QUERY_TIME.getMetric().getTotalSum());
+        assertTrue(GLOBAL_SCAN_BYTES.getMetric().getTotalSum() > 0);
+        assertTrue(GLOBAL_MUTATION_BYTES.getMetric().getTotalSum() > 0);
+        assertTrue(GLOBAL_MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0);
+        assertEquals(0, GLOBAL_SELECT_SQL_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_REJECTED_TASK_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_QUERY_TIMEOUT_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum());
+    }
+
+    private static void resetGlobalMetrics() {
+        for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) {
+            m.reset();
+        }
+    }
+
+    private static void createTableAndInsertValues(String tableName, boolean resetGlobalMetricsAfterTableCreate)
+            throws Exception {
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute(ddl);
+        if (resetGlobalMetricsAfterTableCreate) {
+            resetGlobalMetrics();
+        }
+        // executing 10 upserts/mutations.
+        String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+        PreparedStatement stmt = conn.prepareStatement(dml);
+        for (int i = 1; i <= 10; i++) {
+            stmt.setString(1, "key" + i);
+            stmt.setString(2, "value" + i);
+            stmt.executeUpdate();
+        }
+        conn.commit();
+    }
+
+    @Test
+    public void testOverallQueryMetricsForSelect() throws Exception {
+        String tableName = "SCANMETRICS";
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 6";
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute(ddl);
+    }
+
+    @Test
+    public void testReadMetricsForSelect() throws Exception {
+        String tableName = "READMETRICSFORSELECT";
+        long numSaltBuckets = 6;
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+                + numSaltBuckets;
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute(ddl);
+
+        long numRows = 1000;
+        long numExpectedTasks = numSaltBuckets;
+        insertRowsInTable(tableName, numRows);
+
+        String query = "SELECT * FROM " + tableName;
+        Statement stmt = conn.createStatement();
+        ResultSet rs = stmt.executeQuery(query);
+        PhoenixResultSet resultSetBeingTested = rs.unwrap(PhoenixResultSet.class);
+        changeInternalStateForTesting(resultSetBeingTested);
+        while (resultSetBeingTested.next()) {}
+        resultSetBeingTested.close();
+        Set<String> expectedTableNames = Sets.newHashSet(tableName);
+        assertReadMetricValuesForSelectSql(Lists.newArrayList(numRows), Lists.newArrayList(numExpectedTasks),
+                resultSetBeingTested, expectedTableNames);
+    }
+
+    @Test
+    public void testMetricsForUpsert() throws Exception {
+        String tableName = "UPSERTMETRICS";
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 6";
+        Connection ddlConn = DriverManager.getConnection(getUrl());
+        ddlConn.createStatement().execute(ddl);
+        ddlConn.close();
+
+        int numRows = 10;
+        Connection conn = insertRowsInTable(tableName, numRows);
+        PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+        Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+        for (Entry<String, Map<String, Long>> entry : mutationMetrics.entrySet()) {
+            String t = entry.getKey();
+            assertEquals("Table names didn't match!", tableName, t);
+            Map<String, Long> p = entry.getValue();
+            assertEquals("There should have been three metrics", 3, p.size());
+            boolean mutationBatchSizePresent = false;
+            boolean mutationCommitTimePresent = false;
+            boolean mutationBytesPresent = false;
+            for (Entry<String, Long> metric : p.entrySet()) {
+                String metricName = metric.getKey();
+                long metricValue = metric.getValue();
+                if (metricName.equals(MetricType.MUTATION_BATCH_SIZE.name())) {
+                    assertEquals("Mutation batch sizes didn't match!", numRows, metricValue);
+                    mutationBatchSizePresent = true;
+                } else if (metricName.equals(MetricType.MUTATION_COMMIT_TIME.name())) {
+                    assertTrue("Mutation commit time should be greater than zero", metricValue > 0);
+                    mutationCommitTimePresent = true;
+                } else if (metricName.equals(MetricType.MUTATION_BYTES.name())) {
+                    assertTrue("Mutation bytes size should be greater than zero", metricValue > 0);
+                    mutationBytesPresent = true;
+                }
+            }
+            assertTrue(mutationBatchSizePresent);
+            assertTrue(mutationCommitTimePresent);
+            assertTrue(mutationBytesPresent);
+        }
+        Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        assertEquals("Read metrics should be empty", 0, readMetrics.size());
+    }
+
+    @Test
+    public void testMetricsForUpsertSelect() throws Exception {
+        String tableName1 = "UPSERTFROM";
+        long table1SaltBuckets = 6;
+        String ddl = "CREATE TABLE " + tableName1 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+                + table1SaltBuckets;
+        Connection ddlConn = DriverManager.getConnection(getUrl());
+        ddlConn.createStatement().execute(ddl);
+        ddlConn.close();
+        int numRows = 10;
+        insertRowsInTable(tableName1, numRows);
+
+        String tableName2 = "UPSERTTO";
+        ddl = "CREATE TABLE " + tableName2 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 10";
+        ddlConn = DriverManager.getConnection(getUrl());
+        ddlConn.createStatement().execute(ddl);
+        ddlConn.close();
+
+        Connection conn = DriverManager.getConnection(getUrl());
+        String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " + tableName1;
+        conn.createStatement().executeUpdate(upsertSelect);
+        conn.commit();
+        PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+
+        Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+        assertMutationMetrics(tableName2, numRows, mutationMetrics);
+        Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        assertReadMetricsForMutatingSql(tableName1, table1SaltBuckets, readMetrics);
+    }
+
+    @Test
+    public void testMetricsForDelete() throws Exception {
+        String tableName = "DELETEMETRICS";
+        long tableSaltBuckets = 6;
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+                + tableSaltBuckets;
+        Connection ddlConn = DriverManager.getConnection(getUrl());
+        ddlConn.createStatement().execute(ddl);
+        ddlConn.close();
+        int numRows = 10;
+        insertRowsInTable(tableName, numRows);
+        Connection conn = DriverManager.getConnection(getUrl());
+        String delete = "DELETE FROM " + tableName;
+        conn.createStatement().execute(delete);
+        conn.commit();
+        PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+        Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+        assertMutationMetrics(tableName, numRows, mutationMetrics);
+
+        Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        assertReadMetricsForMutatingSql(tableName, tableSaltBuckets, readMetrics);
+    }
+
+    @Test
+    public void testNoMetricsCollectedForConnection() throws Exception {
+        String tableName = "NOMETRICS";
+        long tableSaltBuckets = 6;
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+                + tableSaltBuckets;
+        Connection ddlConn = DriverManager.getConnection(getUrl());
+        ddlConn.createStatement().execute(ddl);
+        ddlConn.close();
+        int numRows = 10;
+        insertRowsInTable(tableName, numRows);
+        Properties props = new Properties();
+        props.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, "false");
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
+        while (rs.next()) {}
+        rs.close();
+        Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getRequestReadMetrics(rs);
+        assertTrue("No read metrics should have been generated", readMetrics.size() == 0);
+        conn.createStatement().executeUpdate("UPSERT INTO " + tableName + " VALUES ('KEY', 'VALUE')");
+        conn.commit();
+        Map<String, Map<String, Long>> writeMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+        assertTrue("No write metrics should have been generated", writeMetrics.size() == 0);
+    }
+
+    @Test
+    public void testMetricsForUpsertWithAutoCommit() throws Exception {
+        String tableName = "VERIFYUPSERTAUTOCOMMIT";
+        long tableSaltBuckets = 6;
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+                + tableSaltBuckets;
+        try (Connection ddlConn = DriverManager.getConnection(getUrl())) {
+            ddlConn.createStatement().execute(ddl);
+        }
+
+        String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+        int numRows = 10;
+        Map<String, Map<String, Long>> mutationMetricsForAutoCommitOff = null;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(false);
+            upsertRows(upsert, numRows, conn);
+            conn.commit();
+            mutationMetricsForAutoCommitOff = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+        }
+
+        // Insert rows now with auto-commit on
+        Map<String, Map<String, Long>> mutationMetricsAutoCommitOn = null;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(true);
+            upsertRows(upsert, numRows, conn);
+            mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+        }
+        // Verify that the mutation metrics are same for both cases
+        assertMetricsAreSame(mutationMetricsForAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
+    }
+
+    private void upsertRows(String upsert, int numRows, Connection conn) throws SQLException {
+        PreparedStatement stmt = conn.prepareStatement(upsert);
+        for (int i = 1; i <= numRows; i++) {
+            stmt.setString(1, "key" + i);
+            stmt.setString(2, "value" + i);
+            stmt.executeUpdate();
+        }
+    }
+
+    @Test
+    public void testMetricsForDeleteWithAutoCommit() throws Exception {
+        String tableName = "VERIFYDELETEAUTOCOMMIT";
+        long tableSaltBuckets = 6;
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+                + tableSaltBuckets;
+        try (Connection ddlConn = DriverManager.getConnection(getUrl())) {
+            ddlConn.createStatement().execute(ddl);
+        }
+
+        String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+        int numRows = 10;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(false);
+            upsertRows(upsert, numRows, conn);
+            conn.commit();
+        }
+
+        String delete = "DELETE FROM " + tableName;
+        // Delete rows now with auto-commit off
+        Map<String, Map<String, Long>> deleteMetricsWithAutoCommitOff = null;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(false);
+            conn.createStatement().executeUpdate(delete);
+            deleteMetricsWithAutoCommitOff = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+        }
+
+        // Upsert the rows back
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(false);
+            upsertRows(upsert, numRows, conn);
+            conn.commit();
+        }
+
+        // Now delete rows with auto-commit on
+        Map<String, Map<String, Long>> deleteMetricsWithAutoCommitOn = null;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(true);
+            conn.createStatement().executeUpdate(delete);
+            deleteMetricsWithAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+        }
+
+        // Verify that the mutation metrics are same for both cases.
+        assertMetricsAreSame(deleteMetricsWithAutoCommitOff, deleteMetricsWithAutoCommitOn, mutationMetricsToSkip);
+    }
+
+    @Test
+    public void testMetricsForUpsertSelectWithAutoCommit() throws Exception {
+        String tableName1 = "UPSERTFROMAUTOCOMMIT";
+        long table1SaltBuckets = 6;
+        String ddl = "CREATE TABLE " + tableName1 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+                + table1SaltBuckets;
+        Connection ddlConn = DriverManager.getConnection(getUrl());
+        ddlConn.createStatement().execute(ddl);
+        ddlConn.close();
+        int numRows = 10;
+        insertRowsInTable(tableName1, numRows);
+
+        String tableName2 = "UPSERTTOAUTCOMMIT";
+        ddl = "CREATE TABLE " + tableName2 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 10";
+        ddlConn = DriverManager.getConnection(getUrl());
+        ddlConn.createStatement().execute(ddl);
+        ddlConn.close();
+
+        String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " + tableName1;
+
+        Map<String, Map<String, Long>> mutationMetricsAutoCommitOff = null;
+        Map<String, Map<String, Long>> readMetricsAutoCommitOff = null;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(false);
+            conn.createStatement().executeUpdate(upsertSelect);
+            conn.commit();
+            PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+            mutationMetricsAutoCommitOff = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+            readMetricsAutoCommitOff = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        }
+
+        Map<String, Map<String, Long>> mutationMetricsAutoCommitOn = null;
+        Map<String, Map<String, Long>> readMetricsAutoCommitOn = null;
+
+        int autoCommitBatchSize = numRows + 1; // batchsize = 11 is less than numRows and is not a divisor of batchsize
+        Properties props = new Properties();
+        props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize));
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            conn.createStatement().executeUpdate(upsertSelect);
+            PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+            mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+            readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        }
+        assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
+        assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOn, readMetricsToSkip);
+
+        autoCommitBatchSize = numRows - 1; // batchsize = 9 is less than numRows and is not a divisor of batchsize
+        props = new Properties();
+        props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize));
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            conn.createStatement().executeUpdate(upsertSelect);
+            PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+            mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+            readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        }
+        assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
+        assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOn, readMetricsToSkip);
+
+        autoCommitBatchSize = numRows;
+        props = new Properties();
+        props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize));
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            conn.createStatement().executeUpdate(upsertSelect);
+            PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+            mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+            readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        }
+        assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
+        assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOn, readMetricsToSkip);
+
+        autoCommitBatchSize = 2; // multiple batches of equal size
+        props = new Properties();
+        props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize));
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            conn.createStatement().executeUpdate(upsertSelect);
+            PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+            mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+            readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        }
+        assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
+        assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOff, readMetricsToSkip);
+    }
+
+    @Test
+    public void testMutationMetricsWhenUpsertingToMultipleTables() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String table1 = "TABLE1";
+            createTableAndInsertValues(true, 10, conn, table1);
+            String table2 = "TABLE2";
+            createTableAndInsertValues(true, 10, conn, table2);
+            String table3 = "TABLE3";
+            createTableAndInsertValues(true, 10, conn, table3);
+            Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+            assertTrue("Mutation metrics not present for " + table1, mutationMetrics.get(table1) != null);
+            assertTrue("Mutation metrics not present for " + table2, mutationMetrics.get(table2) != null);
+            assertTrue("Mutation metrics not present for " + table3, mutationMetrics.get(table3) != null);
+            assertMetricsHaveSameValues(mutationMetrics.get(table1), mutationMetrics.get(table2), mutationMetricsToSkip);
+            assertMetricsHaveSameValues(mutationMetrics.get(table1), mutationMetrics.get(table3), mutationMetricsToSkip);
+        }
+    }
+
+    @Test
+    public void testClosingConnectionClearsMetrics() throws Exception {
+        Connection conn = null;
+        try {
+            conn = DriverManager.getConnection(getUrl());
+            createTableAndInsertValues(true, 10, conn, "clearmetrics");
+            assertTrue("Mutation metrics not present", PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn).size() > 0);
+        } finally {
+            if (conn != null) {
+                conn.close();
+                assertTrue("Closing connection didn't clear metrics",
+                        PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn).size() == 0);
+            }
+        }
+    }
+
+    @Test
+    public void testMetricsForUpsertingIntoImmutableTableWithIndices() throws Exception {
+        String dataTable = "IMMTABLEWITHINDICES";
+        String tableDdl = "CREATE TABLE "
+                + dataTable
+                + " (K1 VARCHAR NOT NULL, K2 VARCHAR NOT NULL, V1 INTEGER, V2 INTEGER, V3 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(K1, K2)) IMMUTABLE_ROWS = true";
+        String index1 = "I1";
+        String index1Ddl = "CREATE INDEX " + index1 + " ON " + dataTable + " (V1) include (V2)";
+        String index2 = "I2";
+        String index2Ddl = "CREATE INDEX " + index2 + " ON " + dataTable + " (V2) include (V3)";
+        String index3 = "I3";
+        String index3Ddl = "CREATE INDEX " + index3 + " ON " + dataTable + " (V3) include (V1)";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute(tableDdl);
+            conn.createStatement().execute(index1Ddl);
+            conn.createStatement().execute(index2Ddl);
+            conn.createStatement().execute(index3Ddl);
+        }
+        String upsert = "UPSERT INTO " + dataTable + " VALUES (?, ?, ?, ?, ?)";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            /*
+             * Upsert data into table. Because the table is immutable, mutations for updating the indices on it are
+             * handled by the client itself. So mutation metrics should include mutations for the indices as well as the
+             * data table.
+             */
+            PreparedStatement stmt = conn.prepareStatement(upsert);
+            for (int i = 1; i < 10; i++) {
+                stmt.setString(1, "key1" + i);
+                stmt.setString(2, "key2" + i);
+                stmt.setInt(3, i);
+                stmt.setInt(4, i);
+                stmt.setInt(5, i);
+                stmt.executeUpdate();
+            }
+            conn.commit();
+            Map<String, Map<String, Long>> metrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+            assertTrue(metrics.get(dataTable).size() > 0);
+            assertTrue(metrics.get(index1).size() > 0);
+            assertTrue(metrics.get(index2).size() > 0);
+            assertMetricsHaveSameValues(metrics.get(index1), metrics.get(index2), mutationMetricsToSkip);
+            assertTrue(metrics.get(index3).size() > 0);
+            assertMetricsHaveSameValues(metrics.get(index1), metrics.get(index3), mutationMetricsToSkip);
+        }
+    }
+    
+    @Test
+    public void testMetricsForUpsertSelectSameTable() throws Exception {
+        String tableName = "UPSERTSAME";
+        long table1SaltBuckets = 6;
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
+                + table1SaltBuckets;
+        Connection ddlConn = DriverManager.getConnection(getUrl());
+        ddlConn.createStatement().execute(ddl);
+        ddlConn.close();
+        int numRows = 10;
+        insertRowsInTable(tableName, numRows);
+
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.setAutoCommit(false);
+        String upsertSelect = "UPSERT INTO " + tableName + " SELECT * FROM " + tableName;
+        conn.createStatement().executeUpdate(upsertSelect);
+        conn.commit();
+        PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+        
+        Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+        // Because auto-commit is off, upsert select into the same table will run on the client.
+        // So we should have client side read and write metrics available.
+        assertMutationMetrics(tableName, numRows, mutationMetrics);
+        Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        assertReadMetricsForMutatingSql(tableName, table1SaltBuckets, readMetrics);
+        PhoenixRuntime.resetMetrics(pConn);
+        // With autocommit on, still, this upsert select runs on the client side.
+        conn.setAutoCommit(true);
+        conn.createStatement().executeUpdate(upsertSelect);
+        Map<String, Map<String, Long>> autoCommitMutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+        Map<String, Map<String, Long>> autoCommitReadMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        assertMetricsAreSame(mutationMetrics, autoCommitMutationMetrics, mutationMetricsToSkip);
+        assertMetricsAreSame(readMetrics, autoCommitReadMetrics, readMetricsToSkip);
+    }
+    
+    private void createTableAndInsertValues(boolean commit, int numRows, Connection conn, String tableName)
+            throws SQLException {
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
+        conn.createStatement().execute(ddl);
+        // executing 10 upserts/mutations.
+        String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+        PreparedStatement stmt = conn.prepareStatement(dml);
+        for (int i = 1; i <= numRows; i++) {
+            stmt.setString(1, "key" + i);
+            stmt.setString(2, "value" + i);
+            stmt.executeUpdate();
+        }
+        if (commit) {
+            conn.commit();
+        }
+    }
+
+    private void assertMetricsAreSame(Map<String, Map<String, Long>> metric1, Map<String, Map<String, Long>> metric2,
+            List<String> metricsToSkip) {
+        assertTrue("The two metrics have different or unequal number of table names ",
+                metric1.keySet().equals(metric2.keySet()));
+        for (Entry<String, Map<String, Long>> entry : metric1.entrySet()) {
+            Map<String, Long> metricNameValueMap1 = entry.getValue();
+            Map<String, Long> metricNameValueMap2 = metric2.get(entry.getKey());
+            assertMetricsHaveSameValues(metricNameValueMap1, metricNameValueMap2, metricsToSkip);
+        }
+    }
+
+    private void assertMetricsHaveSameValues(Map<String, Long> metricNameValueMap1,
+            Map<String, Long> metricNameValueMap2, List<String> metricsToSkip) {
+        assertTrue("The two metrics have different or unequal number of metric names ", metricNameValueMap1.keySet()
+                .equals(metricNameValueMap2.keySet()));
+        for (Entry<String, Long> entry : metricNameValueMap1.entrySet()) {
+            String metricName = entry.getKey();
+            if (!metricsToSkip.contains(metricName)) {
+                assertEquals("Unequal values for metric " + metricName, entry.getValue(),
+                        metricNameValueMap2.get(metricName));
+            }
+        }
+    }
+
+    private void changeInternalStateForTesting(PhoenixResultSet rs) {
+        // get and set the internal state for testing purposes.
+        ReadMetricQueue testMetricsQueue = new TestReadMetricsQueue(true);
+        StatementContext ctx = (StatementContext)Whitebox.getInternalState(rs, "context");
+        Whitebox.setInternalState(ctx, "readMetricsQueue", testMetricsQueue);
+        Whitebox.setInternalState(rs, "readMetricsQueue", testMetricsQueue);
+    }
+
+    private void assertReadMetricValuesForSelectSql(ArrayList<Long> numRows, ArrayList<Long> numExpectedTasks,
+            PhoenixResultSet resultSetBeingTested, Set<String> expectedTableNames) throws SQLException {
+        Map<String, Map<String, Long>> metrics = PhoenixRuntime.getRequestReadMetrics(resultSetBeingTested);
+        int counter = 0;
+        for (Entry<String, Map<String, Long>> entry : metrics.entrySet()) {
+            String tableName = entry.getKey();
+            expectedTableNames.remove(tableName);
+            Map<String, Long> metricValues = entry.getValue();
+            boolean scanMetricsPresent = false;
+            boolean taskCounterMetricsPresent = false;
+            boolean taskExecutionTimeMetricsPresent = false;
+            boolean memoryMetricsPresent = false;
+            for (Entry<String, Long> pair : metricValues.entrySet()) {
+                String metricName = pair.getKey();
+                long metricValue = pair.getValue();
+                long n = numRows.get(counter);
+                long numTask = numExpectedTasks.get(counter);
+                if (metricName.equals(SCAN_BYTES.name())) {
+                    // we are using a SCAN_BYTES_DELTA of 1. So number of scan bytes read should be number of rows read
+                    assertEquals(n, metricValue);
+                    scanMetricsPresent = true;
+                } else if (metricName.equals(TASK_EXECUTED_COUNTER.name())) {
+                    assertEquals(numTask, metricValue);
+                    taskCounterMetricsPresent = true;
+                } else if (metricName.equals(TASK_EXECUTION_TIME.name())) {
+                    assertEquals(numTask * TASK_EXECUTION_TIME_DELTA, metricValue);
+                    taskExecutionTimeMetricsPresent = true;
+                } else if (metricName.equals(MEMORY_CHUNK_BYTES.name())) {
+                    assertEquals(numTask * MEMORY_CHUNK_BYTES_DELTA, metricValue);
+                    memoryMetricsPresent = true;
+                }
+            }
+            counter++;
+            assertTrue(scanMetricsPresent);
+            assertTrue(taskCounterMetricsPresent);
+            assertTrue(taskExecutionTimeMetricsPresent);
+            assertTrue(memoryMetricsPresent);
+        }
+        PhoenixRuntime.resetMetrics(resultSetBeingTested);
+        assertTrue("Metrics not found tables " + Joiner.on(",").join(expectedTableNames),
+                expectedTableNames.size() == 0);
+    }
+
+    private Connection insertRowsInTable(String tableName, long numRows) throws SQLException {
+        String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+        Connection conn = DriverManager.getConnection(getUrl());
+        PreparedStatement stmt = conn.prepareStatement(dml);
+        for (int i = 1; i <= numRows; i++) {
+            stmt.setString(1, "key" + i);
+            stmt.setString(2, "value" + i);
+            stmt.executeUpdate();
+        }
+        conn.commit();
+        return conn;
+    }
+
+    // number of records read should be number of bytes at the end
+    public static final long SCAN_BYTES_DELTA = 1;
+
+    // total task execution time should be numTasks * TASK_EXECUTION_TIME_DELTA
+    public static final long TASK_EXECUTION_TIME_DELTA = 10;
+
+    // total task execution time should be numTasks * TASK_EXECUTION_TIME_DELTA
+    public static final long MEMORY_CHUNK_BYTES_DELTA = 100;
+
+    private class TestReadMetricsQueue extends ReadMetricQueue {
+
+        public TestReadMetricsQueue(boolean isRequestMetricsEnabled) {
+            super(isRequestMetricsEnabled);
+        }
+
+        @Override
+        public CombinableMetric getMetric(MetricType type) {
+            switch (type) {
+            case SCAN_BYTES:
+                return new CombinableMetricImpl(type) {
+
+                    @Override
+                    public void change(long delta) {
+                        super.change(SCAN_BYTES_DELTA);
+                    }
+                };
+            case TASK_EXECUTION_TIME:
+                return new CombinableMetricImpl(type) {
+
+                    @Override
+                    public void change(long delta) {
+                        super.change(TASK_EXECUTION_TIME_DELTA);
+                    }
+                };
+            case MEMORY_CHUNK_BYTES:
+                return new CombinableMetricImpl(type) {
+
+                    @Override
+                    public void change(long delta) {
+                        super.change(MEMORY_CHUNK_BYTES_DELTA);
+                    }
+                };
+            }
+            return super.getMetric(type);
+        }
+    }
+
+    private void assertReadMetricsForMutatingSql(String tableName, long tableSaltBuckets,
+            Map<String, Map<String, Long>> readMetrics) {
+        assertTrue("No read metrics present when there should have been!", readMetrics.size() > 0);
+        int numTables = 0;
+        for (Entry<String, Map<String, Long>> entry : readMetrics.entrySet()) {
+            String t = entry.getKey();
+            assertEquals("Table name didn't match for read metrics", tableName, t);
+            numTables++;
+            Map<String, Long> p = entry.getValue();
+            assertTrue("No read metrics present when there should have been", p.size() > 0);
+            for (Entry<String, Long> metric : p.entrySet()) {
+                String metricName = metric.getKey();
+                long metricValue = metric.getValue();
+                if (metricName.equals(TASK_EXECUTED_COUNTER.name())) {
+                    assertEquals(tableSaltBuckets, metricValue);
+                } else if (metricName.equals(SCAN_BYTES.name())) {
+                    assertTrue("Scan bytes read should be greater than zero", metricValue > 0);
+                }
+            }
+        }
+        assertEquals("There should have been read metrics only for one table: " + tableName, 1, numTables);
+    }
+
+    private void assertMutationMetrics(String tableName, int numRows, Map<String, Map<String, Long>> mutationMetrics) {
+        assertTrue("No mutation metrics present when there should have been", mutationMetrics.size() > 0);
+        for (Entry<String, Map<String, Long>> entry : mutationMetrics.entrySet()) {
+            String t = entry.getKey();
+            assertEquals("Table name didn't match for mutation metrics", tableName, t);
+            Map<String, Long> p = entry.getValue();
+            assertEquals("There should have been three metrics", 3, p.size());
+            for (Entry<String, Long> metric : p.entrySet()) {
+                String metricName = metric.getKey();
+                long metricValue = metric.getValue();
+                if (metricName.equals(MetricType.MUTATION_BATCH_SIZE.name())) {
+                    assertEquals("Mutation batch sizes didn't match!", numRows, metricValue);
+                } else if (metricName.equals(MetricType.MUTATION_COMMIT_TIME.name())) {
+                    assertTrue("Mutation commit time should be greater than zero", metricValue > 0);
+                } else if (metricName.equals(MetricType.MUTATION_BYTES.name())) {
+                    assertTrue("Mutation bytes size should be greater than zero", metricValue > 0);
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index 9718709..9ad9ef5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.cache;
 
+import static org.apache.phoenix.monitoring.TaskExecutionMetricsHolder.NO_OP_INSTANCE;
 import static org.apache.phoenix.util.LogUtil.addCustomAnnotations;
 
 import java.io.Closeable;
@@ -57,6 +58,7 @@ import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachin
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -226,6 +228,11 @@ public class ServerCacheClient {
                         public Object getJobId() {
                             return ServerCacheClient.this;
                         }
+                        
+                        @Override
+                        public TaskExecutionMetricsHolder getTaskExecutionMetric() {
+                            return NO_OP_INSTANCE;
+                        }
                     }));
                 } else {
                     if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("NOT adding cache entry to be sent for " + entry + " since one already exists for that entry", connection));}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 575f0f3..a28f614 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -94,8 +94,9 @@ public class DeleteCompiler {
         this.statement = statement;
     }
     
-    private static MutationState deleteRows(PhoenixStatement statement, TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException {
+    private static MutationState deleteRows(StatementContext childContext, TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException {
         PTable table = targetTableRef.getTable();
+        PhoenixStatement statement = childContext.getStatement();
         PhoenixConnection connection = statement.getConnection();
         PName tenantId = connection.getTenantId();
         byte[] tenantIdBytes = null;
@@ -114,19 +115,18 @@ public class DeleteCompiler {
         if (indexTableRef != null) {
             indexMutations = Maps.newHashMapWithExpectedSize(batchSize);
         }
-        try {
-            List<PColumn> pkColumns = table.getPKColumns();
-            boolean isMultiTenant = table.isMultiTenant() && tenantIdBytes != null;
-            boolean isSharedViewIndex = table.getViewIndexId() != null;
-            int offset = (table.getBucketNum() == null ? 0 : 1);
-            byte[][] values = new byte[pkColumns.size()][];
-            if (isMultiTenant) {
-                values[offset++] = tenantIdBytes;
-            }
-            if (isSharedViewIndex) {
-                values[offset++] = MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId());
-            }
-            PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, statement);
+        List<PColumn> pkColumns = table.getPKColumns();
+        boolean isMultiTenant = table.isMultiTenant() && tenantIdBytes != null;
+        boolean isSharedViewIndex = table.getViewIndexId() != null;
+        int offset = (table.getBucketNum() == null ? 0 : 1);
+        byte[][] values = new byte[pkColumns.size()][];
+        if (isMultiTenant) {
+            values[offset++] = tenantIdBytes;
+        }
+        if (isSharedViewIndex) {
+            values[offset++] = MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId());
+        }
+        try (PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
             int rowCount = 0;
             while (rs.next()) {
                 ImmutableBytesPtr ptr = new ImmutableBytesPtr();  // allocate new as this is a key in a Map
@@ -183,8 +183,6 @@ public class DeleteCompiler {
                 state.join(indexState);
             }
             return state;
-        } finally {
-            iterator.close();
         }
     }
     
@@ -199,9 +197,16 @@ public class DeleteCompiler {
         }
         
         @Override
-        protected MutationState mutate(StatementContext context, ResultIterator iterator, PhoenixConnection connection) throws SQLException {
+        protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException {
             PhoenixStatement statement = new PhoenixStatement(connection);
-            return deleteRows(statement, targetTableRef, indexTableRef, iterator, projector, sourceTableRef);
+            /*
+             * We don't want to collect any read metrics within the child context. This is because any read metrics that
+             * need to be captured are already getting collected in the parent statement context enclosed in the result
+             * iterator being used for reading rows out.
+             */
+            StatementContext ctx = new StatementContext(statement, false);
+            MutationState state = deleteRows(ctx, targetTableRef, indexTableRef, iterator, projector, sourceTableRef);
+            return state;
         }
         
         public void setTargetTableRef(TableRef tableRef) {
@@ -559,9 +564,14 @@ public class DeleteCompiler {
                             }
                             // Return total number of rows that have been delete. In the case of auto commit being off
                             // the mutations will all be in the mutation state of the current connection.
-                            return new MutationState(maxSize, connection, totalRowCount);
+                            MutationState state = new MutationState(maxSize, connection, totalRowCount);
+                            
+                            // set the read metrics accumulated in the parent context so that it can be published when the mutations are committed.
+                            state.setReadMetricQueue(plan.getContext().getReadMetricsQueue());
+                            
+                            return state;
                         } else {
-                            return deleteRows(statement, tableRef, deleteFromImmutableIndexToo ? plan.getTableRef() : null, iterator, plan.getProjector(), plan.getTableRef());
+                            return deleteRows(plan.getContext(), tableRef, deleteFromImmutableIndexToo ? plan.getTableRef() : null, iterator, plan.getProjector(), plan.getTableRef());
                         }
                     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index bcac17d..630760c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@ -35,9 +35,9 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.KeyValueUtil;
 
 /**
@@ -53,21 +53,34 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
     /**
      * Method that does the actual mutation work
      */
-    abstract protected MutationState mutate(StatementContext context, ResultIterator iterator, PhoenixConnection connection) throws SQLException;
+    abstract protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException;
     
     @Override
-    public PeekingResultIterator newIterator(StatementContext context, ResultIterator iterator, Scan scan) throws SQLException {
-        final PhoenixConnection connection = new PhoenixConnection(this.connection);
-        MutationState state = mutate(context, iterator, connection);
+    public PeekingResultIterator newIterator(final StatementContext parentContext, ResultIterator iterator, Scan scan, String tableName) throws SQLException {
+        final PhoenixConnection clonedConnection = new PhoenixConnection(this.connection);
+        
+        MutationState state = mutate(parentContext, iterator, clonedConnection);
+        
         long totalRowCount = state.getUpdateCount();
-        if (connection.getAutoCommit()) {
-            connection.getMutationState().join(state);
-            connection.commit();
-            ConnectionQueryServices services = connection.getQueryServices();
-            int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
-            state = new MutationState(maxSize, connection, totalRowCount);
+        if (clonedConnection.getAutoCommit()) {
+            clonedConnection.getMutationState().join(state);
+            clonedConnection.commit();
+            ConnectionQueryServices services = clonedConnection.getQueryServices();
+            int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+            /*
+             * Everything that was mutated as part of the clonedConnection has been committed. However, we want to
+             * report the mutation work done using this clonedConnection as part of the overall mutation work of the
+             * parent connection. So we need to set those metrics in the empty mutation state so that they could be
+             * combined with the parent connection's mutation metrics (as part of combining mutation state) in the
+             * close() method of the iterator being returned. Don't combine the read metrics in parent context yet
+             * though because they are possibly being concurrently modified by other threads at this stage. Instead we
+             * will get hold of the read metrics when all the mutating iterators are done.
+             */
+            state = MutationState.emptyMutationState(maxSize, clonedConnection);
+            state.getMutationMetricQueue().combineMetricQueues(clonedConnection.getMutationState().getMutationMetricQueue());
         }
         final MutationState finalState = state;
+        
         byte[] value = PLong.INSTANCE.toBytes(totalRowCount);
         KeyValue keyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
         final Tuple tuple = new SingleKeyValueTuple(keyValue);
@@ -90,13 +103,17 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
             @Override
             public void close() throws SQLException {
                 try {
-                    // Join the child mutation states in close, since this is called in a single threaded manner
-                    // after the parallel results have been processed.
-                    if (!connection.getAutoCommit()) {
-                        MutatingParallelIteratorFactory.this.connection.getMutationState().join(finalState);
-                    }
+                    /* 
+                     * Join the child mutation states in close, since this is called in a single threaded manner
+                     * after the parallel results have been processed. 
+                     * If auto-commit is on for the cloned child connection, then the finalState here is an empty mutation 
+                     * state (with no mutations). However, it still has the metrics for mutation work done by the 
+                     * mutating-iterator. Joining the mutation state makes sure those metrics are passed over
+                     * to the parent connection.
+                     */ 
+                    MutatingParallelIteratorFactory.this.connection.getMutationState().join(finalState);
                 } finally {
-                    connection.close();
+                    clonedConnection.close();
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index d726488..52bb7f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.monitoring.OverAllQueryMetrics;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
@@ -41,6 +43,7 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.NumberUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
 
 import com.google.common.collect.Maps;
 
@@ -80,10 +83,19 @@ public class StatementContext {
     private TimeRange scanTimeRange = null;
 
     private Map<SelectStatement, Object> subqueryResults;
-
+    private final ReadMetricQueue readMetricsQueue;
+    private final OverAllQueryMetrics overAllQueryMetrics;
+    
     public StatementContext(PhoenixStatement statement) {
         this(statement, new Scan());
     }
+    
+    /**
+     *  Constructor that lets you override whether or not to collect request level metrics.
+     */
+    public StatementContext(PhoenixStatement statement, boolean collectRequestLevelMetrics) {
+        this(statement, FromCompiler.EMPTY_TABLE_RESOLVER, new Scan(), new SequenceManager(statement), collectRequestLevelMetrics);
+    }
 
     public StatementContext(PhoenixStatement statement, Scan scan) {
         this(statement, FromCompiler.EMPTY_TABLE_RESOLVER, new Scan(), new SequenceManager(statement));
@@ -94,6 +106,10 @@ public class StatementContext {
     }
 
     public StatementContext(PhoenixStatement statement, ColumnResolver resolver, Scan scan, SequenceManager seqManager) {
+        this(statement, resolver, scan, seqManager, statement.getConnection().isRequestLevelMetricsEnabled());
+    }
+    
+    public StatementContext(PhoenixStatement statement, ColumnResolver resolver, Scan scan, SequenceManager seqManager, boolean isRequestMetricsEnabled) {
         this.statement = statement;
         this.resolver = resolver;
         this.scan = scan;
@@ -102,20 +118,24 @@ public class StatementContext {
         this.aggregates = new AggregationManager();
         this.expressions = new ExpressionManager();
         PhoenixConnection connection = statement.getConnection();
-        this.dateFormat = connection.getQueryServices().getProps().get(QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT);
+        ReadOnlyProps props = connection.getQueryServices().getProps();
+        this.dateFormat = props.get(QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT);
         this.dateFormatter = DateUtil.getDateFormatter(dateFormat);
-        this.timeFormat = connection.getQueryServices().getProps().get(QueryServices.TIME_FORMAT_ATTRIB, DateUtil.DEFAULT_TIME_FORMAT);
+        this.timeFormat = props.get(QueryServices.TIME_FORMAT_ATTRIB, DateUtil.DEFAULT_TIME_FORMAT);
         this.timeFormatter = DateUtil.getTimeFormatter(timeFormat);
-        this.timestampFormat = connection.getQueryServices().getProps().get(QueryServices.TIMESTAMP_FORMAT_ATTRIB, DateUtil.DEFAULT_TIMESTAMP_FORMAT);
+        this.timestampFormat = props.get(QueryServices.TIMESTAMP_FORMAT_ATTRIB, DateUtil.DEFAULT_TIMESTAMP_FORMAT);
         this.timestampFormatter = DateUtil.getTimestampFormatter(timestampFormat);
-        this.dateFormatTimeZone = TimeZone.getTimeZone(
-                connection.getQueryServices().getProps().get(QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB, DateUtil.DEFAULT_TIME_ZONE_ID));
-        this.numberFormat = connection.getQueryServices().getProps().get(QueryServices.NUMBER_FORMAT_ATTRIB, NumberUtil.DEFAULT_NUMBER_FORMAT);
+        this.dateFormatTimeZone = TimeZone.getTimeZone(props.get(QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB,
+                DateUtil.DEFAULT_TIME_ZONE_ID));
+        this.numberFormat = props.get(QueryServices.NUMBER_FORMAT_ATTRIB, NumberUtil.DEFAULT_NUMBER_FORMAT);
         this.tempPtr = new ImmutableBytesWritable();
         this.currentTable = resolver != null && !resolver.getTables().isEmpty() ? resolver.getTables().get(0) : null;
-        this.whereConditionColumns = new ArrayList<Pair<byte[],byte[]>>();
-        this.dataColumns = this.currentTable == null ? Collections.<PColumn, Integer>emptyMap() : Maps.<PColumn, Integer>newLinkedHashMap();
-        this.subqueryResults = Maps.<SelectStatement, Object>newHashMap();
+        this.whereConditionColumns = new ArrayList<Pair<byte[], byte[]>>();
+        this.dataColumns = this.currentTable == null ? Collections.<PColumn, Integer> emptyMap() : Maps
+                .<PColumn, Integer> newLinkedHashMap();
+        this.subqueryResults = Maps.<SelectStatement, Object> newHashMap();
+        this.readMetricsQueue = new ReadMetricQueue(isRequestMetricsEnabled);
+        this.overAllQueryMetrics = new OverAllQueryMetrics(isRequestMetricsEnabled);
     }
 
     /**
@@ -285,4 +305,13 @@ public class StatementContext {
     public void setSubqueryResult(SelectStatement select, Object result) {
         subqueryResults.put(select, result);
     }
+    
+    public ReadMetricQueue getReadMetricsQueue() {
+        return readMetricsQueue;
+    }
+    
+    public OverAllQueryMetrics getOverallQueryMetrics() {
+        return overAllQueryMetrics;
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 2b35d4f..7b39a28 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -118,43 +118,40 @@ public class UpsertCompiler {
         mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter()));
     }
 
-    private static MutationState upsertSelect(PhoenixStatement statement, 
-            TableRef tableRef, RowProjector projector, ResultIterator iterator, int[] columnIndexes,
-            int[] pkSlotIndexes) throws SQLException {
-        try {
-            PhoenixConnection connection = statement.getConnection();
-            ConnectionQueryServices services = connection.getQueryServices();
-            int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
-            int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
-            boolean isAutoCommit = connection.getAutoCommit();
-            byte[][] values = new byte[columnIndexes.length][];
-            int rowCount = 0;
-            Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
-            PTable table = tableRef.getTable();
-            ResultSet rs = new PhoenixResultSet(iterator, projector, statement);
+    private static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector,
+            ResultIterator iterator, int[] columnIndexes, int[] pkSlotIndexes) throws SQLException {
+        PhoenixStatement statement = childContext.getStatement();
+        PhoenixConnection connection = statement.getConnection();
+        ConnectionQueryServices services = connection.getQueryServices();
+        int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
+                QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+        int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
+        boolean isAutoCommit = connection.getAutoCommit();
+        byte[][] values = new byte[columnIndexes.length][];
+        int rowCount = 0;
+        Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
+        PTable table = tableRef.getTable();
+        try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
             ImmutableBytesWritable ptr = new ImmutableBytesWritable();
             while (rs.next()) {
                 for (int i = 0; i < values.length; i++) {
                     PColumn column = table.getColumns().get(columnIndexes[i]);
-                    byte[] bytes = rs.getBytes(i+1);
+                    byte[] bytes = rs.getBytes(i + 1);
                     ptr.set(bytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : bytes);
-                    Object value = rs.getObject(i+1);
-                    int rsPrecision = rs.getMetaData().getPrecision(i+1);
+                    Object value = rs.getObject(i + 1);
+                    int rsPrecision = rs.getMetaData().getPrecision(i + 1);
                     Integer precision = rsPrecision == 0 ? null : rsPrecision;
-                    int rsScale = rs.getMetaData().getScale(i+1);
+                    int rsScale = rs.getMetaData().getScale(i + 1);
                     Integer scale = rsScale == 0 ? null : rsScale;
                     // We are guaranteed that the two column will have compatible types,
                     // as we checked that before.
-                    if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(),
-                            precision, scale,
-                            column.getMaxLength(),column.getScale())) {
-                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY)
-                            .setColumnName(column.getName().getString())
-                            .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build().buildException();
-                    }
-                    column.getDataType().coerceBytes(ptr, value, column.getDataType(),
-                            precision, scale, SortOrder.getDefault(),
-                            column.getMaxLength(), column.getScale(), column.getSortOrder());
+                    if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), precision, scale,
+                            column.getMaxLength(), column.getScale())) { throw new SQLExceptionInfo.Builder(
+                            SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
+                            .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build()
+                            .buildException(); }
+                    column.getDataType().coerceBytes(ptr, value, column.getDataType(), precision, scale,
+                            SortOrder.getDefault(), column.getMaxLength(), column.getScale(), column.getSortOrder());
                     values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
                 }
                 setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement);
@@ -169,8 +166,6 @@ public class UpsertCompiler {
             }
             // If auto commit is true, this last batch will be committed upon return
             return new MutationState(tableRef, mutation, rowCount / batchSize * batchSize, maxSize, connection);
-        } finally {
-            iterator.close();
         }
     }
 
@@ -186,14 +181,21 @@ public class UpsertCompiler {
         }
 
         @Override
-        protected MutationState mutate(StatementContext context, ResultIterator iterator, PhoenixConnection connection) throws SQLException {
-            if (context.getSequenceManager().getSequenceCount() > 0) {
+        protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException {
+            if (parentContext.getSequenceManager().getSequenceCount() > 0) {
                 throw new IllegalStateException("Cannot pipeline upsert when sequence is referenced");
             }
             PhoenixStatement statement = new PhoenixStatement(connection);
+            /*
+             * We don't want to collect any read metrics within the child context. This is because any read metrics that
+             * need to be captured are already getting collected in the parent statement context enclosed in the result
+             * iterator being used for reading rows out.
+             */
+            StatementContext childContext = new StatementContext(statement, false);
             // Clone the row projector as it's not thread safe and would be used simultaneously by
             // multiple threads otherwise.
-            return upsertSelect(statement, tableRef, projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes);
+            MutationState state = upsertSelect(childContext, tableRef, projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes);
+            return state;
         }
         
         public void setRowProjector(RowProjector projector) {
@@ -669,7 +671,7 @@ public class UpsertCompiler {
                 public MutationState execute() throws SQLException {
                     ResultIterator iterator = queryPlan.iterator();
                     if (parallelIteratorFactory == null) {
-                        return upsertSelect(statement, tableRef, projector, iterator, columnIndexes, pkSlotIndexes);
+                        return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes);
                     }
                     try {
                         parallelIteratorFactory.setRowProjector(projector);
@@ -677,13 +679,21 @@ public class UpsertCompiler {
                         parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
                         Tuple tuple;
                         long totalRowCount = 0;
+                        StatementContext context = queryPlan.getContext();
                         while ((tuple=iterator.next()) != null) {// Runs query
                             Cell kv = tuple.getValue(0);
                             totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
                         }
                         // Return total number of rows that have been updated. In the case of auto commit being off
                         // the mutations will all be in the mutation state of the current connection.
-                        return new MutationState(maxSize, statement.getConnection(), totalRowCount);
+                        MutationState mutationState = new MutationState(maxSize, statement.getConnection(), totalRowCount);
+                        /*
+                         *  All the metrics collected for measuring the reads done by the parallel mutating iterators
+                         *  is included in the ReadMetricHolder of the statement context. Include these metrics in the
+                         *  returned mutation state so they can be published on commit. 
+                         */
+                        mutationState.setReadMetricQueue(context.getReadMetricsQueue());
+                        return mutationState; 
                     } finally {
                         iterator.close();
                     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index ba137f8..00e843d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -102,7 +102,7 @@ public class AggregatePlan extends BaseQueryPlan {
             this.services = services;
         }
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException {
+        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException {
             Expression expression = RowKeyExpression.INSTANCE;
             OrderByExpression orderByExpression = new OrderByExpression(expression, false, true);
             int threshold = services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
@@ -119,9 +119,9 @@ public class AggregatePlan extends BaseQueryPlan {
             this.outerFactory = outerFactory;
         }
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException {
-            PeekingResultIterator iterator = innerFactory.newIterator(context, scanner, scan);
-            return outerFactory.newIterator(context, iterator, scan);
+        public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException {
+            PeekingResultIterator iterator = innerFactory.newIterator(context, scanner, scan, tableName);
+            return outerFactory.newIterator(context, iterator, scan, tableName);
         }
     }