You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2017/07/12 18:39:25 UTC

phoenix git commit: PHOENIX-3978 Expose mutation failures in our metrics

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 d70bb6514 -> 53a766355


PHOENIX-3978 Expose mutation failures in our metrics


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/53a76635
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/53a76635
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/53a76635

Branch: refs/heads/4.x-HBase-0.98
Commit: 53a7663552ae4c2ea7ee45067433f4d1dd0244e4
Parents: d70bb65
Author: Thomas <td...@salesforce.com>
Authored: Sat Jul 8 11:22:03 2017 -0700
Committer: Thomas <td...@salesforce.com>
Committed: Wed Jul 12 11:39:22 2017 -0700

----------------------------------------------------------------------
 .../apache/phoenix/execute/PartialCommitIT.java |  23 ++-
 .../phoenix/monitoring/PhoenixMetricsIT.java    | 178 ++++++++++---------
 .../apache/phoenix/execute/MutationState.java   |  25 ++-
 .../apache/phoenix/jdbc/PhoenixConnection.java  |   7 +-
 .../apache/phoenix/jdbc/PhoenixResultSet.java   |   5 +-
 .../apache/phoenix/monitoring/AtomicMetric.java |  11 +-
 .../phoenix/monitoring/CombinableMetric.java    |  12 +-
 .../monitoring/CombinableMetricImpl.java        |  11 +-
 .../phoenix/monitoring/GlobalClientMetrics.java |   2 +
 .../phoenix/monitoring/GlobalMetricImpl.java    |  11 +-
 .../org/apache/phoenix/monitoring/Metric.java   |  10 +-
 .../apache/phoenix/monitoring/MetricType.java   |  69 ++++---
 .../phoenix/monitoring/MutationMetricQueue.java |  25 ++-
 .../phoenix/monitoring/NonAtomicMetric.java     |  13 +-
 .../phoenix/monitoring/OverAllQueryMetrics.java |  16 +-
 .../phoenix/monitoring/ReadMetricQueue.java     |   8 +-
 .../org/apache/phoenix/util/PhoenixRuntime.java |  71 ++++++--
 .../apache/phoenix/metrics/MetricTypeTest.java  |  42 +++++
 18 files changed, 333 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/53a76635/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index 52a6627..84edc84 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -20,6 +20,8 @@ package org.apache.phoenix.execute;
 import static com.google.common.collect.Lists.newArrayList;
 import static com.google.common.collect.Sets.newHashSet;
 import static java.util.Collections.singletonList;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_FAILED_COUNT;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_FAILED_SIZE;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -53,11 +55,15 @@ import org.apache.phoenix.end2end.BaseOwnClusterIT;
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.monitoring.GlobalMetric;
+import org.apache.phoenix.monitoring.MetricType;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -88,11 +94,13 @@ public class PartialCommitIT extends BaseOwnClusterIT {
 
     @BeforeClass
     public static void doSetup() throws Exception {
-        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(3);
         serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
         serverProps.put("hbase.coprocessor.abortonerror", "false");
         serverProps.put(Indexer.CHECK_VERSION_CONF_KEY, "false");
-        Map<String, String> clientProps = Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, "true");
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+        clientProps.put(QueryServices.TRANSACTIONS_ENABLED, "true");
+        clientProps.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
         createTablesWithABitOfData();
     }
@@ -151,6 +159,13 @@ public class PartialCommitIT extends BaseOwnClusterIT {
       TEST_UTIL.shutdownMiniCluster();
     }
     
+    @Before
+    public void resetGlobalMetrics() {
+        for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) {
+            m.reset();
+        }
+    }
+    
     @Test
     public void testNoFailure() {
         testPartialCommit(singletonList("upsert into " + A_SUCESS_TABLE + " values ('testNoFailure', 'a')"), new int[0], false, singletonList("select count(*) from " + A_SUCESS_TABLE + " where k='testNoFailure'"),
@@ -247,8 +262,12 @@ public class PartialCommitIT extends BaseOwnClusterIT {
                 assertEquals(CommitException.class, sqle.getClass());
                 int[] uncommittedStatementIndexes = ((CommitException)sqle).getUncommittedStatementIndexes();
                 assertArrayEquals(expectedUncommittedStatementIndexes, uncommittedStatementIndexes);
+                Map<String, Map<MetricType, Long>> mutationWriteMetrics = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(con);
+                assertEquals(expectedUncommittedStatementIndexes.length, mutationWriteMetrics.get(B_FAILURE_TABLE).get(MUTATION_BATCH_FAILED_SIZE).intValue());
+                assertEquals(expectedUncommittedStatementIndexes.length, GLOBAL_MUTATION_BATCH_FAILED_COUNT.getMetric().getTotalSum());
             }
             
+            
             // verify data in HBase
             for (int i = 0; i < countStatementsForVerification.size(); i++) {
                 String countStatement = countStatementsForVerification.get(i);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53a76635/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index 2838f04..2c619c9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -11,6 +11,7 @@ package org.apache.phoenix.monitoring;
 
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_FAILED_COUNT;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_SIZE;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_COMMIT_TIME;
@@ -76,10 +77,10 @@ import com.google.common.collect.Sets;
 
 public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
 
-    private static final List<String> mutationMetricsToSkip = Lists
-            .newArrayList(MetricType.MUTATION_COMMIT_TIME.name());
-    private static final List<String> readMetricsToSkip = Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME.name(),
-            MetricType.TASK_EXECUTION_TIME.name(), MetricType.TASK_END_TO_END_TIME.name());
+    private static final List<MetricType> mutationMetricsToSkip = Lists
+            .newArrayList(MetricType.MUTATION_COMMIT_TIME);
+    private static final List<MetricType> readMetricsToSkip = Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME,
+            MetricType.TASK_EXECUTION_TIME, MetricType.TASK_END_TO_END_TIME);
     private static final String CUSTOM_URL_STRING = "SESSION";
     private static final AtomicInteger numConnections = new AtomicInteger(0); 
 
@@ -124,7 +125,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
         assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum());
         assertEquals(0, GLOBAL_MUTATION_BATCH_SIZE.getMetric().getTotalSum());
         assertEquals(0, GLOBAL_MUTATION_BYTES.getMetric().getTotalSum());
-        assertEquals(0, GLOBAL_MUTATION_COMMIT_TIME.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_MUTATION_BATCH_FAILED_COUNT.getMetric().getTotalSum());
 
         assertTrue(GLOBAL_SCAN_BYTES.getMetric().getTotalSum() > 0);
         assertTrue(GLOBAL_QUERY_TIME.getMetric().getTotalSum() > 0);
@@ -146,6 +147,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
         assertEquals(0, GLOBAL_QUERY_TIMEOUT_COUNTER.getMetric().getTotalSum());
         assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum());
         assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_MUTATION_BATCH_FAILED_COUNT.getMetric().getTotalSum());
     }
 
     @Test
@@ -173,6 +175,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
         assertEquals(0, GLOBAL_QUERY_TIMEOUT_COUNTER.getMetric().getTotalSum());
         assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum());
         assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum());
+        assertEquals(0, GLOBAL_MUTATION_BATCH_FAILED_COUNT.getMetric().getTotalSum());
     }
 
     private static void resetGlobalMetrics() {
@@ -244,34 +247,39 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
         int numRows = 10;
         Connection conn = insertRowsInTable(tableName, numRows);
         PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
-        Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
-        for (Entry<String, Map<String, Long>> entry : mutationMetrics.entrySet()) {
+        Map<String, Map<MetricType, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(pConn);
+        for (Entry<String, Map<MetricType, Long>> entry : mutationMetrics.entrySet()) {
             String t = entry.getKey();
             assertEquals("Table names didn't match!", tableName, t);
-            Map<String, Long> p = entry.getValue();
-            assertEquals("There should have been three metrics", 3, p.size());
+            Map<MetricType, Long> p = entry.getValue();
+            assertEquals("There should have been four metrics", 4, p.size());
             boolean mutationBatchSizePresent = false;
             boolean mutationCommitTimePresent = false;
             boolean mutationBytesPresent = false;
-            for (Entry<String, Long> metric : p.entrySet()) {
-                String metricName = metric.getKey();
+            boolean mutationBatchFailedPresent = false;
+            for (Entry<MetricType, Long> metric : p.entrySet()) {
+            	MetricType metricType = metric.getKey();
                 long metricValue = metric.getValue();
-                if (metricName.equals(MetricType.MUTATION_BATCH_SIZE.name())) {
+                if (metricType.equals(MetricType.MUTATION_BATCH_SIZE)) {
                     assertEquals("Mutation batch sizes didn't match!", numRows, metricValue);
                     mutationBatchSizePresent = true;
-                } else if (metricName.equals(MetricType.MUTATION_COMMIT_TIME.name())) {
+                } else if (metricType.equals(MetricType.MUTATION_COMMIT_TIME)) {
                     assertTrue("Mutation commit time should be greater than zero", metricValue > 0);
                     mutationCommitTimePresent = true;
-                } else if (metricName.equals(MetricType.MUTATION_BYTES.name())) {
+                } else if (metricType.equals(MetricType.MUTATION_BYTES)) {
                     assertTrue("Mutation bytes size should be greater than zero", metricValue > 0);
                     mutationBytesPresent = true;
+                } else if (metricType.equals(MetricType.MUTATION_BATCH_FAILED_SIZE)) {
+                    assertEquals("Zero failed mutations expected", 0, metricValue);
+                    mutationBatchFailedPresent = true;
                 }
             }
             assertTrue(mutationBatchSizePresent);
             assertTrue(mutationCommitTimePresent);
             assertTrue(mutationBytesPresent);
+            assertTrue(mutationBytesPresent);
         }
-        Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        Map<String, Map<MetricType, Long>> readMetrics = PhoenixRuntime.getReadMetricInfoForMutationsSinceLastReset(pConn);
         assertEquals("Read metrics should be empty", 0, readMetrics.size());
     }
 
@@ -299,9 +307,9 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
         conn.commit();
         PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
 
-        Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+        Map<String, Map<MetricType, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(pConn);
         assertMutationMetrics(tableName2, numRows, mutationMetrics);
-        Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        Map<String, Map<MetricType, Long>> readMetrics = PhoenixRuntime.getReadMetricInfoForMutationsSinceLastReset(pConn);
         assertReadMetricsForMutatingSql(tableName1, table1SaltBuckets, readMetrics);
     }
 
@@ -321,10 +329,10 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
         conn.createStatement().execute(delete);
         conn.commit();
         PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
-        Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
+        Map<String, Map<MetricType, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(pConn);
         assertMutationMetrics(tableName, numRows, mutationMetrics);
 
-        Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+        Map<String, Map<MetricType, Long>> readMetrics = PhoenixRuntime.getReadMetricInfoForMutationsSinceLastReset(pConn);
         assertReadMetricsForMutatingSql(tableName, tableSaltBuckets, readMetrics);
     }
 
@@ -345,11 +353,11 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
         ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
         while (rs.next()) {}
         rs.close();
-        Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getRequestReadMetrics(rs);
+        Map<String, Map<MetricType, Long>> readMetrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
         assertTrue("No read metrics should have been generated", readMetrics.size() == 0);
         conn.createStatement().executeUpdate("UPSERT INTO " + tableName + " VALUES ('KEY', 'VALUE')");
         conn.commit();
-        Map<String, Map<String, Long>> writeMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+        Map<String, Map<MetricType, Long>> writeMetrics = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn);
         assertTrue("No write metrics should have been generated", writeMetrics.size() == 0);
     }
 
@@ -365,20 +373,20 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
 
         String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)";
         int numRows = 10;
-        Map<String, Map<String, Long>> mutationMetricsForAutoCommitOff = null;
+        Map<String, Map<MetricType, Long>> mutationMetricsForAutoCommitOff = null;
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             conn.setAutoCommit(false);
             upsertRows(upsert, numRows, conn);
             conn.commit();
-            mutationMetricsForAutoCommitOff = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+            mutationMetricsForAutoCommitOff = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn);
         }
 
         // Insert rows now with auto-commit on
-        Map<String, Map<String, Long>> mutationMetricsAutoCommitOn = null;
+        Map<String, Map<MetricType, Long>> mutationMetricsAutoCommitOn = null;
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             conn.setAutoCommit(true);
             upsertRows(upsert, numRows, conn);
-            mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+            mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn);
         }
         // Verify that the mutation metrics are same for both cases
         assertMetricsAreSame(mutationMetricsForAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
@@ -413,11 +421,11 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
 
         String delete = "DELETE FROM " + tableName;
         // Delete rows now with auto-commit off
-        Map<String, Map<String, Long>> deleteMetricsWithAutoCommitOff = null;
+        Map<String, Map<MetricType, Long>> deleteMetricsWithAutoCommitOff = null;
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             conn.setAutoCommit(false);
             conn.createStatement().executeUpdate(delete);
-            deleteMetricsWithAutoCommitOff = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+            deleteMetricsWithAutoCommitOff = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn);
         }
 
         // Upsert the rows back
@@ -428,11 +436,11 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
         }
 
         // Now delete rows with auto-commit on
-        Map<String, Map<String, Long>> deleteMetricsWithAutoCommitOn = null;
+        Map<String, Map<MetricType, Long>> deleteMetricsWithAutoCommitOn = null;
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             conn.setAutoCommit(true);
             conn.createStatement().executeUpdate(delete);
-            deleteMetricsWithAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+            deleteMetricsWithAutoCommitOn = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn);
         }
 
         // Verify that the mutation metrics are same for both cases.
@@ -472,19 +480,19 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
 
         String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " + tableName1;
 
-        Map<String, Map<String, Long>> mutationMetricsAutoCommitOff = null;
-        Map<String, Map<String, Long>> readMetricsAutoCommitOff = null;
+        Map<String, Map<MetricType, Long>> mutationMetricsAutoCommitOff = null;
+        Map<String, Map<MetricType, Long>> readMetricsAutoCommitOff = null;
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             conn.setAutoCommit(false);
             conn.createStatement().executeUpdate(upsertSelect);
             conn.commit();
             PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
-            mutationMetricsAutoCommitOff = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
-            readMetricsAutoCommitOff = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+            mutationMetricsAutoCommitOff = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(pConn);
+            readMetricsAutoCommitOff = PhoenixRuntime.getReadMetricInfoForMutationsSinceLastReset(pConn);
         }
 
-        Map<String, Map<String, Long>> mutationMetricsAutoCommitOn = null;
-        Map<String, Map<String, Long>> readMetricsAutoCommitOn = null;
+        Map<String, Map<MetricType, Long>> mutationMetricsAutoCommitOn = null;
+        Map<String, Map<MetricType, Long>> readMetricsAutoCommitOn = null;
 
         int autoCommitBatchSize = numRows + 1; // batchsize = 11 is less than numRows and is not a divisor of batchsize
         Properties props = new Properties();
@@ -493,8 +501,8 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
             conn.setAutoCommit(true);
             conn.createStatement().executeUpdate(upsertSelect);
             PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
-            mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
-            readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+            mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(pConn);
+            readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricInfoForMutationsSinceLastReset(pConn);
         }
         assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
         assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOn, readMetricsToSkip);
@@ -506,8 +514,8 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
             conn.setAutoCommit(true);
             conn.createStatement().executeUpdate(upsertSelect);
             PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
-            mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
-            readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+            mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(pConn);
+            readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricInfoForMutationsSinceLastReset(pConn);
         }
         assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
         assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOn, readMetricsToSkip);
@@ -519,8 +527,8 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
             conn.setAutoCommit(true);
             conn.createStatement().executeUpdate(upsertSelect);
             PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
-            mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
-            readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+            mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(pConn);
+            readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricInfoForMutationsSinceLastReset(pConn);
         }
         assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
         assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOn, readMetricsToSkip);
@@ -532,8 +540,8 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
             conn.setAutoCommit(true);
             conn.createStatement().executeUpdate(upsertSelect);
             PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
-            mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
-            readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
+            mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(pConn);
+            readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricInfoForMutationsSinceLastReset(pConn);
         }
         assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip);
         assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOff, readMetricsToSkip);
@@ -548,7 +556,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
             createTableAndInsertValues(true, 10, conn, table2);
             String table3 = generateUniqueName();
             createTableAndInsertValues(true, 10, conn, table3);
-            Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+            Map<String, Map<MetricType, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn);
             assertTrue("Mutation metrics not present for " + table1, mutationMetrics.get(table1) != null);
             assertTrue("Mutation metrics not present for " + table2, mutationMetrics.get(table2) != null);
             assertTrue("Mutation metrics not present for " + table3, mutationMetrics.get(table3) != null);
@@ -563,12 +571,12 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
         try {
             conn = DriverManager.getConnection(getUrl());
             createTableAndInsertValues(true, 10, conn, generateUniqueName());
-            assertTrue("Mutation metrics not present", PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn).size() > 0);
+            assertTrue("Mutation metrics not present", PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn).size() > 0);
         } finally {
             if (conn != null) {
                 conn.close();
                 assertTrue("Closing connection didn't clear metrics",
-                        PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn).size() == 0);
+                        PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn).size() == 0);
             }
         }
     }
@@ -608,7 +616,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
                 stmt.executeUpdate();
             }
             conn.commit();
-            Map<String, Map<String, Long>> metrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn);
+            Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn);
             assertTrue(metrics.get(dataTable).size() > 0);
             assertTrue(metrics.get(index1).size() > 0);
             assertTrue(metrics.get(index2).size() > 0);
@@ -644,26 +652,26 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
         }
     }
 
-    private void assertMetricsAreSame(Map<String, Map<String, Long>> metric1, Map<String, Map<String, Long>> metric2,
-            List<String> metricsToSkip) {
+    private void assertMetricsAreSame(Map<String, Map<MetricType, Long>> metric1, Map<String, Map<MetricType, Long>> metric2,
+            List<MetricType> metricsToSkip) {
         assertTrue("The two metrics have different or unequal number of table names ",
                 metric1.keySet().equals(metric2.keySet()));
-        for (Entry<String, Map<String, Long>> entry : metric1.entrySet()) {
-            Map<String, Long> metricNameValueMap1 = entry.getValue();
-            Map<String, Long> metricNameValueMap2 = metric2.get(entry.getKey());
+        for (Entry<String, Map<MetricType, Long>> entry : metric1.entrySet()) {
+            Map<MetricType, Long> metricNameValueMap1 = entry.getValue();
+            Map<MetricType, Long> metricNameValueMap2 = metric2.get(entry.getKey());
             assertMetricsHaveSameValues(metricNameValueMap1, metricNameValueMap2, metricsToSkip);
         }
     }
 
-    private void assertMetricsHaveSameValues(Map<String, Long> metricNameValueMap1,
-            Map<String, Long> metricNameValueMap2, List<String> metricsToSkip) {
+    private void assertMetricsHaveSameValues(Map<MetricType, Long> metricNameValueMap1,
+            Map<MetricType, Long> metricNameValueMap2, List<MetricType> metricsToSkip) {
         assertTrue("The two metrics have different or unequal number of metric names ", metricNameValueMap1.keySet()
                 .equals(metricNameValueMap2.keySet()));
-        for (Entry<String, Long> entry : metricNameValueMap1.entrySet()) {
-            String metricName = entry.getKey();
-            if (!metricsToSkip.contains(metricName)) {
-                assertEquals("Unequal values for metric " + metricName, entry.getValue(),
-                        metricNameValueMap2.get(metricName));
+        for (Entry<MetricType, Long> entry : metricNameValueMap1.entrySet()) {
+        	MetricType metricType = entry.getKey();
+            if (!metricsToSkip.contains(metricType)) {
+                assertEquals("Unequal values for metric " + metricType, entry.getValue(),
+                        metricNameValueMap2.get(metricType));
             }
         }
     }
@@ -678,32 +686,32 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
 
     private void assertReadMetricValuesForSelectSql(ArrayList<Long> numRows, ArrayList<Long> numExpectedTasks,
             PhoenixResultSet resultSetBeingTested, Set<String> expectedTableNames) throws SQLException {
-        Map<String, Map<String, Long>> metrics = PhoenixRuntime.getRequestReadMetrics(resultSetBeingTested);
+        Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(resultSetBeingTested);
         int counter = 0;
-        for (Entry<String, Map<String, Long>> entry : metrics.entrySet()) {
+        for (Entry<String, Map<MetricType, Long>> entry : metrics.entrySet()) {
             String tableName = entry.getKey();
             expectedTableNames.remove(tableName);
-            Map<String, Long> metricValues = entry.getValue();
+            Map<MetricType, Long> metricValues = entry.getValue();
             boolean scanMetricsPresent = false;
             boolean taskCounterMetricsPresent = false;
             boolean taskExecutionTimeMetricsPresent = false;
             boolean memoryMetricsPresent = false;
-            for (Entry<String, Long> pair : metricValues.entrySet()) {
-                String metricName = pair.getKey();
+            for (Entry<MetricType, Long> pair : metricValues.entrySet()) {
+            	MetricType metricType = pair.getKey();
                 long metricValue = pair.getValue();
                 long n = numRows.get(counter);
                 long numTask = numExpectedTasks.get(counter);
-                if (metricName.equals(SCAN_BYTES.name())) {
+                if (metricType.equals(SCAN_BYTES)) {
                     // we are using a SCAN_BYTES_DELTA of 1. So number of scan bytes read should be number of rows read
                     assertEquals(n, metricValue);
                     scanMetricsPresent = true;
-                } else if (metricName.equals(TASK_EXECUTED_COUNTER.name())) {
+                } else if (metricType.equals(TASK_EXECUTED_COUNTER)) {
                     assertEquals(numTask, metricValue);
                     taskCounterMetricsPresent = true;
-                } else if (metricName.equals(TASK_EXECUTION_TIME.name())) {
+                } else if (metricType.equals(TASK_EXECUTION_TIME)) {
                     assertEquals(numTask * TASK_EXECUTION_TIME_DELTA, metricValue);
                     taskExecutionTimeMetricsPresent = true;
-                } else if (metricName.equals(MEMORY_CHUNK_BYTES.name())) {
+                } else if (metricType.equals(MEMORY_CHUNK_BYTES)) {
                     assertEquals(numTask * MEMORY_CHUNK_BYTES_DELTA, metricValue);
                     memoryMetricsPresent = true;
                 }
@@ -780,21 +788,21 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
     }
 
     private void assertReadMetricsForMutatingSql(String tableName, long tableSaltBuckets,
-            Map<String, Map<String, Long>> readMetrics) {
+            Map<String, Map<MetricType, Long>> readMetrics) {
         assertTrue("No read metrics present when there should have been!", readMetrics.size() > 0);
         int numTables = 0;
-        for (Entry<String, Map<String, Long>> entry : readMetrics.entrySet()) {
+        for (Entry<String, Map<MetricType, Long>> entry : readMetrics.entrySet()) {
             String t = entry.getKey();
             assertEquals("Table name didn't match for read metrics", tableName, t);
             numTables++;
-            Map<String, Long> p = entry.getValue();
+            Map<MetricType, Long> p = entry.getValue();
             assertTrue("No read metrics present when there should have been", p.size() > 0);
-            for (Entry<String, Long> metric : p.entrySet()) {
-                String metricName = metric.getKey();
+            for (Entry<MetricType, Long> metric : p.entrySet()) {
+            	MetricType metricType = metric.getKey();
                 long metricValue = metric.getValue();
-                if (metricName.equals(TASK_EXECUTED_COUNTER.name())) {
+                if (metricType.equals(TASK_EXECUTED_COUNTER)) {
                     assertEquals(tableSaltBuckets, metricValue);
-                } else if (metricName.equals(SCAN_BYTES.name())) {
+                } else if (metricType.equals(SCAN_BYTES)) {
                     assertTrue("Scan bytes read should be greater than zero", metricValue > 0);
                 }
             }
@@ -802,22 +810,24 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
         assertEquals("There should have been read metrics only for one table: " + tableName, 1, numTables);
     }
 
-    private void assertMutationMetrics(String tableName, int numRows, Map<String, Map<String, Long>> mutationMetrics) {
+    private void assertMutationMetrics(String tableName, int numRows, Map<String, Map<MetricType, Long>> mutationMetrics) {
         assertTrue("No mutation metrics present when there should have been", mutationMetrics.size() > 0);
-        for (Entry<String, Map<String, Long>> entry : mutationMetrics.entrySet()) {
+        for (Entry<String, Map<MetricType, Long>> entry : mutationMetrics.entrySet()) {
             String t = entry.getKey();
             assertEquals("Table name didn't match for mutation metrics", tableName, t);
-            Map<String, Long> p = entry.getValue();
-            assertEquals("There should have been three metrics", 3, p.size());
-            for (Entry<String, Long> metric : p.entrySet()) {
-                String metricName = metric.getKey();
+            Map<MetricType, Long> p = entry.getValue();
+            assertEquals("There should have been four metrics", 4, p.size());
+            for (Entry<MetricType, Long> metric : p.entrySet()) {
+            	MetricType metricType = metric.getKey();
                 long metricValue = metric.getValue();
-                if (metricName.equals(MetricType.MUTATION_BATCH_SIZE.name())) {
+                if (metricType.equals(MetricType.MUTATION_BATCH_SIZE)) {
                     assertEquals("Mutation batch sizes didn't match!", numRows, metricValue);
-                } else if (metricName.equals(MetricType.MUTATION_COMMIT_TIME.name())) {
+                } else if (metricType.equals(MetricType.MUTATION_COMMIT_TIME)) {
                     assertTrue("Mutation commit time should be greater than zero", metricValue > 0);
-                } else if (metricName.equals(MetricType.MUTATION_BYTES.name())) {
+                } else if (metricType.equals(MetricType.MUTATION_BYTES)) {
                     assertTrue("Mutation bytes size should be greater than zero", metricValue > 0);
+                } else if (metricType.equals(MetricType.MUTATION_BATCH_FAILED_SIZE)) {
+                    assertEquals("Zero failed mutations expected", 0, metricValue);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53a76635/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index e18dc9f..16d2f1a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.execute;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_FAILED_COUNT;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_SIZE;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_COMMIT_TIME;
@@ -966,6 +967,11 @@ public class MutationState implements SQLCloseable {
 
                 int retryCount = 0;
                 boolean shouldRetry = false;
+                long numMutations = 0;
+                long mutationSizeBytes = 0;
+                long mutationCommitTime = 0;
+                long numFailedMutations = 0;;
+                long startTime = 0;
                 do {
                     TableRef origTableRef = tableInfo.getOrigTableRef();
                     PTable table = origTableRef.getTable();
@@ -993,10 +999,11 @@ public class MutationState implements SQLCloseable {
                             hTable = TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, hTable, table);
                         }
                         
-                        long numMutations = mutationList.size();
+                        numMutations = mutationList.size();
                         GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
+                        mutationSizeBytes = calculateMutationSize(mutationList);
                         
-                        long startTime = System.currentTimeMillis();
+                        startTime = System.currentTimeMillis();
                         child.addTimelineAnnotation("Attempt " + retryCount);
                         List<List<Mutation>> mutationBatchList = getMutationBatchList(batchSize, batchSizeBytes, mutationList);
                         for (List<Mutation> mutationBatch : mutationBatchList) {
@@ -1007,18 +1014,17 @@ public class MutationState implements SQLCloseable {
                         child.stop();
                         child.stop();
                         shouldRetry = false;
-                        long mutationCommitTime = System.currentTimeMillis() - startTime;
+                        mutationCommitTime = System.currentTimeMillis() - startTime;
                         GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
+                        numFailedMutations = 0;
                         
-                        long mutationSizeBytes = calculateMutationSize(mutationList);
-                        MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime);
-                        mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric);
                         if (tableInfo.isDataTable()) {
                             numRows -= numMutations;
                         }
                         // Remove batches as we process them
                         mutations.remove(origTableRef);
                     } catch (Exception e) {
+                    	mutationCommitTime = System.currentTimeMillis() - startTime;
                         serverTimestamp = ServerUtil.parseServerTimestamp(e);
                         SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
                         if (inferredE != null) {
@@ -1041,8 +1047,13 @@ public class MutationState implements SQLCloseable {
                         }
                         // Throw to client an exception that indicates the statements that
                         // were not committed successfully.
-                        sqlE = new CommitException(e, getUncommittedStatementIndexes(), serverTimestamp);
+                        int[] uncommittedStatementIndexes = getUncommittedStatementIndexes();
+						sqlE = new CommitException(e, uncommittedStatementIndexes, serverTimestamp);
+						numFailedMutations = uncommittedStatementIndexes.length;
+						GLOBAL_MUTATION_BATCH_FAILED_COUNT.update(numFailedMutations);
                     } finally {
+                    	MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime, numFailedMutations);
+                        mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric);
                         try {
                             if (cache!=null) 
                                 cache.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53a76635/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 33c54e0..71b98ca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -72,6 +72,7 @@ import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.TableResultIterator;
 import org.apache.phoenix.iterate.TableResultIteratorFactory;
 import org.apache.phoenix.jdbc.PhoenixStatement.PhoenixStatementParser;
+import org.apache.phoenix.monitoring.MetricType;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.PSchema;
 import org.apache.phoenix.query.ConnectionQueryServices;
@@ -1006,12 +1007,12 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
         this.traceScope = traceScope;
     }
     
-    public Map<String, Map<String, Long>> getMutationMetrics() {
+    public Map<String, Map<MetricType, Long>> getMutationMetrics() {
         return mutationState.getMutationMetricQueue().aggregate();
     }
     
-    public Map<String, Map<String, Long>> getReadMetrics() {
-        return mutationState.getReadMetricQueue() != null ? mutationState.getReadMetricQueue().aggregate() : Collections.<String, Map<String, Long>>emptyMap();
+    public Map<String, Map<MetricType, Long>> getReadMetrics() {
+        return mutationState.getReadMetricQueue() != null ? mutationState.getReadMetricQueue().aggregate() : Collections.<String, Map<MetricType, Long>>emptyMap();
     }
     
     public boolean isRequestLevelMetricsEnabled() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53a76635/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index 3ca48a1..d3ec151 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -50,6 +50,7 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.monitoring.MetricType;
 import org.apache.phoenix.monitoring.OverAllQueryMetrics;
 import org.apache.phoenix.monitoring.ReadMetricQueue;
 import org.apache.phoenix.schema.tuple.ResultTuple;
@@ -1290,11 +1291,11 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable {
         return scanner;
     }
     
-    public Map<String, Map<String, Long>> getReadMetrics() {
+    public Map<String, Map<MetricType, Long>> getReadMetrics() {
         return readMetricsQueue.aggregate();
     }
 
-    public Map<String, Long> getOverAllRequestReadMetrics() {
+    public Map<MetricType, Long> getOverAllRequestReadMetrics() {
         return overAllQueryMetrics.publish();
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53a76635/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java
index c79d34b..728e734 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java
@@ -33,13 +33,8 @@ public class AtomicMetric implements Metric {
     }
 
     @Override
-    public String getName() {
-        return type.name();
-    }
-
-    @Override
-    public String getDescription() {
-        return type.description();
+    public MetricType getMetricType() {
+        return type;
     }
 
     @Override
@@ -59,7 +54,7 @@ public class AtomicMetric implements Metric {
 
     @Override
     public String getCurrentMetricState() {
-        return getName() + ": " + value.get();
+        return getMetricType().shortName() + ": " + value.get();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53a76635/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java
index ededb41..db51c56 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java
@@ -35,14 +35,9 @@ public interface CombinableMetric extends Metric {
         private static final String EMPTY_STRING = "";
 
         @Override
-        public String getName() {
-            return EMPTY_STRING;
-        }
-
-        @Override
-        public String getDescription() {
-            return EMPTY_STRING;
-        }
+		public MetricType getMetricType() {
+			return MetricType.NO_OP_METRIC;
+		}
 
         @Override
         public long getValue() {
@@ -75,6 +70,7 @@ public interface CombinableMetric extends Metric {
 
         @Override
         public void decrement() {}
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53a76635/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java
index 1972095..88049c7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java
@@ -28,14 +28,9 @@ public class CombinableMetricImpl implements CombinableMetric {
     }
 
     @Override
-    public String getName() {
-        return metric.getName();
-    }
-
-    @Override
-    public String getDescription() {
-        return metric.getDescription();
-    }
+	public MetricType getMetricType() {
+		return metric.getMetricType();
+	}
 
     @Override
     public long getValue() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53a76635/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
index b5f9422..e125fd2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
@@ -22,6 +22,7 @@ import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_
 import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
 import static org.apache.phoenix.monitoring.MetricType.MEMORY_WAIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_FAILED_SIZE;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
@@ -60,6 +61,7 @@ public enum GlobalClientMetrics {
     GLOBAL_MUTATION_BATCH_SIZE(MUTATION_BATCH_SIZE),
     GLOBAL_MUTATION_BYTES(MUTATION_BYTES),
     GLOBAL_MUTATION_COMMIT_TIME(MUTATION_COMMIT_TIME),
+    GLOBAL_MUTATION_BATCH_FAILED_COUNT(MUTATION_BATCH_FAILED_SIZE),
     GLOBAL_QUERY_TIME(QUERY_TIME),
     GLOBAL_NUM_PARALLEL_SCANS(NUM_PARALLEL_SCANS),
     GLOBAL_SCAN_BYTES(SCAN_BYTES),

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53a76635/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
index 25c0dfb..ce692f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
@@ -53,14 +53,9 @@ public class GlobalMetricImpl implements GlobalMetric {
     }
 
     @Override
-    public String getName() {
-        return metric.getName();
-    }
-
-    @Override
-    public String getDescription() {
-        return metric.getDescription();
-    }
+	public MetricType getMetricType() {
+		return metric.getMetricType();
+	}
 
     @Override
     public long getValue() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53a76635/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
index 53c91e7..0e51fc0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
@@ -21,15 +21,11 @@ package org.apache.phoenix.monitoring;
  * Interface that represents phoenix-internal metric.
  */
 public interface Metric {
+    
     /**
-     * @return Name of the metric
-     */
-    public String getName();
-
-    /**
-     * @return Description of the metric
+     * @return type of the metric
      */
-    public String getDescription();
+    public MetricType getMetricType();
 
     /**
      * @return Current value of the metric

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53a76635/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index 7b21de5..0c72e34 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -19,43 +19,56 @@ package org.apache.phoenix.monitoring;
 
 public enum MetricType {
 
-    MUTATION_BATCH_SIZE("Batch sizes of mutations"),
-    MUTATION_BYTES("Size of mutations in bytes"),
-    MUTATION_COMMIT_TIME("Time it took to commit mutations"),
-    QUERY_TIME("Query times"),
-    NUM_PARALLEL_SCANS("Number of scans that were executed in parallel"),
-    SCAN_BYTES("Number of bytes read by scans"),
-    MEMORY_CHUNK_BYTES("Number of bytes allocated by the memory manager"),
-    MEMORY_WAIT_TIME("Number of milliseconds threads needed to wait for memory to be allocated through memory manager"),
-    MUTATION_SQL_COUNTER("Counter for number of mutation sql statements"),
-    SELECT_SQL_COUNTER("Counter for number of sql queries"),
-    TASK_QUEUE_WAIT_TIME("Time in milliseconds tasks had to wait in the queue of the thread pool executor"),
-    TASK_END_TO_END_TIME("Time in milliseconds spent by tasks from creation to completion"),
-    TASK_EXECUTION_TIME("Time in milliseconds tasks took to execute"),
-    TASK_EXECUTED_COUNTER("Counter for number of tasks submitted to the thread pool executor"),
-    TASK_REJECTED_COUNTER("Counter for number of tasks that were rejected by the thread pool executor"),
-    QUERY_TIMEOUT_COUNTER("Number of times query timed out"),
-    QUERY_FAILED_COUNTER("Number of times query failed"),
-    SPOOL_FILE_SIZE("Size of spool files created in bytes"),
-    SPOOL_FILE_COUNTER("Number of spool files created"),
-    CACHE_REFRESH_SPLITS_COUNTER("Number of times cache was refreshed because of splits"),
-    WALL_CLOCK_TIME_MS("Wall clock time elapsed for the overall query execution"),
-    RESULT_SET_TIME_MS("Wall clock time elapsed for reading all records using resultSet.next()"),
-    OPEN_PHOENIX_CONNECTIONS_COUNTER("Number of open phoenix connections"),
-    QUERY_SERVICES_COUNTER("Number of ConnectionQueryServicesImpl instantiated"),
-    HCONNECTIONS_COUNTER("Number of HConnections created by phoenix driver"),
-    PHOENIX_CONNECTIONS_THROTTLED_COUNTER("Number of client Phoenix connections prevented from opening " +
+	NO_OP_METRIC("no", "No op metric"),
+	// mutation (write) related metrics 
+    MUTATION_BATCH_SIZE("mc", "Number of mutations in the batch"),
+    MUTATION_BYTES("mb", "Size of mutations in bytes"),
+    MUTATION_COMMIT_TIME("mt", "Time it took to commit a batch of mutations"),
+    MUTATION_BATCH_FAILED_SIZE("mfc", "Number of mutations that failed to be committed"),
+    MUTATION_SQL_COUNTER("msc", "Counter for number of mutation sql statements"),
+    // query (read) related metrics
+    QUERY_TIME("qt", "Query times"),
+    QUERY_TIMEOUT_COUNTER("qo", "Number of times query timed out"),
+    QUERY_FAILED_COUNTER("qf", "Number of times query failed"),
+    NUM_PARALLEL_SCANS("ps", "Number of scans that were executed in parallel"),
+    SCAN_BYTES("sb", "Number of bytes read by scans"),
+    SELECT_SQL_COUNTER("sc", "Counter for number of sql queries"),
+    // task metrics
+    TASK_QUEUE_WAIT_TIME("tw", "Time in milliseconds tasks had to wait in the queue of the thread pool executor"),
+    TASK_END_TO_END_TIME("tee", "Time in milliseconds spent by tasks from creation to completion"),
+    TASK_EXECUTION_TIME("tx", "Time in milliseconds tasks took to execute"),
+    TASK_EXECUTED_COUNTER("te", "Counter for number of tasks submitted to the thread pool executor"),
+    TASK_REJECTED_COUNTER("tr", "Counter for number of tasks that were rejected by the thread pool executor"),
+    // spool metrics
+    SPOOL_FILE_SIZE("ss", "Size of spool files created in bytes"),
+    SPOOL_FILE_COUNTER("sn", "Number of spool files created"),
+    // misc metrics
+    MEMORY_CHUNK_BYTES("mc", "Number of bytes allocated by the memory manager"),
+    MEMORY_WAIT_TIME("mw", "Number of milliseconds threads needed to wait for memory to be allocated through memory manager"),
+    CACHE_REFRESH_SPLITS_COUNTER("cr", "Number of times cache was refreshed because of splits"),
+    WALL_CLOCK_TIME_MS("tq", "Wall clock time elapsed for the overall query execution"),
+    RESULT_SET_TIME_MS("tn", "Wall clock time elapsed for reading all records using resultSet.next()"),
+    OPEN_PHOENIX_CONNECTIONS_COUNTER("o", "Number of open phoenix connections"),
+    QUERY_SERVICES_COUNTER("cqs", "Number of ConnectionQueryServicesImpl instantiated"),
+    HCONNECTIONS_COUNTER("h", "Number of HConnections created by phoenix driver"),
+    PHOENIX_CONNECTIONS_THROTTLED_COUNTER("ct", "Number of client Phoenix connections prevented from opening " +
                                               "because there are already too many to that target cluster."),
-    PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("Number of requests for Phoenix connections, whether successful or not.");
+    PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("ca","Number of requests for Phoenix connections, whether successful or not.");
     
     private final String description;
+    private final String shortName;
 
-    private MetricType(String description) {
+    private MetricType(String shortName, String description) {
+    	this.shortName = shortName;
         this.description = description;
     }
 
     public String description() {
         return description;
     }
+    
+    public String shortName() {
+        return shortName;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53a76635/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
index e90da46..3de2be1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.monitoring;
 
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_FAILED_SIZE;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
@@ -54,19 +55,20 @@ public class MutationMetricQueue {
      * Publish the metrics to wherever you want them published. The internal state is cleared out after every publish.
      * @return map of table name -> list of pair of (metric name, metric value)
      */
-    public Map<String, Map<String, Long>> aggregate() {
-        Map<String, Map<String, Long>> publishedMetrics = new HashMap<>();
+    public Map<String, Map<MetricType, Long>> aggregate() {
+        Map<String, Map<MetricType, Long>> publishedMetrics = new HashMap<>();
         for (Entry<String, MutationMetric> entry : tableMutationMetric.entrySet()) {
             String tableName = entry.getKey();
             MutationMetric metric = entry.getValue();
-            Map<String, Long> publishedMetricsForTable = publishedMetrics.get(tableName);
+            Map<MetricType, Long> publishedMetricsForTable = publishedMetrics.get(tableName);
             if (publishedMetricsForTable == null) {
                 publishedMetricsForTable = new HashMap<>();
                 publishedMetrics.put(tableName, publishedMetricsForTable);
             }
-            publishedMetricsForTable.put(metric.getNumMutations().getName(), metric.getNumMutations().getValue());
-            publishedMetricsForTable.put(metric.getMutationsSizeBytes().getName(), metric.getMutationsSizeBytes().getValue());
-            publishedMetricsForTable.put(metric.getCommitTimeForMutations().getName(), metric.getCommitTimeForMutations().getValue());
+            publishedMetricsForTable.put(metric.getNumMutations().getMetricType(), metric.getNumMutations().getValue());
+            publishedMetricsForTable.put(metric.getMutationsSizeBytes().getMetricType(), metric.getMutationsSizeBytes().getValue());
+            publishedMetricsForTable.put(metric.getCommitTimeForMutations().getMetricType(), metric.getCommitTimeForMutations().getValue());
+            publishedMetricsForTable.put(metric.getNumFailedMutations().getMetricType(), metric.getNumFailedMutations().getValue());
         }
         return publishedMetrics;
     }
@@ -82,11 +84,13 @@ public class MutationMetricQueue {
         private final CombinableMetric numMutations = new CombinableMetricImpl(MUTATION_BATCH_SIZE);
         private final CombinableMetric mutationsSizeBytes = new CombinableMetricImpl(MUTATION_BYTES);
         private final CombinableMetric totalCommitTimeForMutations = new CombinableMetricImpl(MUTATION_COMMIT_TIME);
+        private final CombinableMetric numFailedMutations = new CombinableMetricImpl(MUTATION_BATCH_FAILED_SIZE);
 
-        public MutationMetric(long numMutations, long mutationsSizeBytes, long commitTimeForMutations) {
+        public MutationMetric(long numMutations, long mutationsSizeBytes, long commitTimeForMutations, long numFailedMutations) {
             this.numMutations.change(numMutations);
             this.mutationsSizeBytes.change(mutationsSizeBytes);
             this.totalCommitTimeForMutations.change(commitTimeForMutations);
+            this.numFailedMutations.change(numFailedMutations);
         }
 
         public CombinableMetric getCommitTimeForMutations() {
@@ -100,11 +104,16 @@ public class MutationMetricQueue {
         public CombinableMetric getMutationsSizeBytes() {
             return mutationsSizeBytes;
         }
+        
+        public CombinableMetric getNumFailedMutations() {
+            return numFailedMutations;
+        }
 
         public void combineMetric(MutationMetric other) {
             this.numMutations.combine(other.numMutations);
             this.mutationsSizeBytes.combine(other.mutationsSizeBytes);
             this.totalCommitTimeForMutations.combine(other.totalCommitTimeForMutations);
+            this.numFailedMutations.combine(other.numFailedMutations);
         }
 
     }
@@ -123,7 +132,7 @@ public class MutationMetricQueue {
         public void addMetricsForTable(String tableName, MutationMetric metric) {}
 
         @Override
-        public Map<String, Map<String, Long>> aggregate() { return Collections.emptyMap(); }
+        public Map<String, Map<MetricType, Long>> aggregate() { return Collections.emptyMap(); }
         
         
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53a76635/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java
index 5205228..4e611c5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java
@@ -34,14 +34,9 @@ class NonAtomicMetric implements Metric {
     }
 
     @Override
-    public String getName() {
-        return type.name();
-    }
-
-    @Override
-    public String getDescription() {
-        return type.description();
-    }
+	public MetricType getMetricType() {
+		return type;
+	}
 
     @Override
     public long getValue() {
@@ -60,7 +55,7 @@ class NonAtomicMetric implements Metric {
 
     @Override
     public String getCurrentMetricState() {
-        return getName() + ": " + value;
+        return type.shortName() + ": " + value;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53a76635/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
index 1f71542..b995267 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
@@ -88,14 +88,14 @@ public class OverAllQueryMetrics {
         resultSetTimeMS.change(resultSetWatch.getElapsedTimeInMs());
     }
 
-    public Map<String, Long> publish() {
-        Map<String, Long> metricsForPublish = new HashMap<>();
-        metricsForPublish.put(numParallelScans.getName(), numParallelScans.getValue());
-        metricsForPublish.put(wallClockTimeMS.getName(), wallClockTimeMS.getValue());
-        metricsForPublish.put(resultSetTimeMS.getName(), resultSetTimeMS.getValue());
-        metricsForPublish.put(queryTimedOut.getName(), queryTimedOut.getValue());
-        metricsForPublish.put(queryFailed.getName(), queryFailed.getValue());
-        metricsForPublish.put(cacheRefreshedDueToSplits.getName(), cacheRefreshedDueToSplits.getValue());
+    public Map<MetricType, Long> publish() {
+        Map<MetricType, Long> metricsForPublish = new HashMap<>();
+        metricsForPublish.put(numParallelScans.getMetricType(), numParallelScans.getValue());
+        metricsForPublish.put(wallClockTimeMS.getMetricType(), wallClockTimeMS.getValue());
+        metricsForPublish.put(resultSetTimeMS.getMetricType(), resultSetTimeMS.getValue());
+        metricsForPublish.put(queryTimedOut.getMetricType(), queryTimedOut.getValue());
+        metricsForPublish.put(queryFailed.getMetricType(), queryFailed.getValue());
+        metricsForPublish.put(cacheRefreshedDueToSplits.getMetricType(), cacheRefreshedDueToSplits.getValue());
         return metricsForPublish;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53a76635/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
index e6c6be2..0e9b27f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
@@ -68,19 +68,19 @@ public class ReadMetricQueue {
     /**
      * @return map of table name -> list of pair of (metric name, metric value)
      */
-    public Map<String, Map<String, Long>> aggregate() {
-        Map<String, Map<String, Long>> publishedMetrics = new HashMap<>();
+    public Map<String, Map<MetricType, Long>> aggregate() {
+        Map<String, Map<MetricType, Long>> publishedMetrics = new HashMap<>();
         for (Entry<MetricKey, Queue<CombinableMetric>> entry : metricsMap.entrySet()) {
             String tableNameToPublish = entry.getKey().tableName;
             Collection<CombinableMetric> metrics = entry.getValue();
             if (metrics.size() > 0) {
                 CombinableMetric m = combine(metrics);
-                Map<String, Long> map = publishedMetrics.get(tableNameToPublish);
+                Map<MetricType, Long> map = publishedMetrics.get(tableNameToPublish);
                 if (map == null) {
                     map = new HashMap<>();
                     publishedMetrics.put(tableNameToPublish, map);
                 }
-                map.put(m.getName(), m.getValue());
+                map.put(m.getMetricType(), m.getValue());
             }
         }
         return publishedMetrics;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53a76635/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index fe58c2a..b553bf0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -36,6 +36,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.TreeSet;
@@ -69,6 +70,7 @@ import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.monitoring.GlobalClientMetrics;
 import org.apache.phoenix.monitoring.GlobalMetric;
+import org.apache.phoenix.monitoring.MetricType;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.AmbiguousColumnException;
@@ -88,10 +90,13 @@ import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.transaction.TransactionFactory;
 
+import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Maps.EntryTransformer;
 
 /**
  *
@@ -1313,13 +1318,31 @@ public class PhoenixRuntime {
         return GlobalClientMetrics.isMetricsEnabled();
     }
     
+    private static Map<String, Long> createMetricMap(Map<MetricType, Long> metricInfoMap) {
+    	Map<String, Long> metricMap = Maps.newHashMapWithExpectedSize(metricInfoMap.size());
+    	for (Entry<MetricType, Long> entry : metricInfoMap.entrySet()) {
+    		metricMap.put(entry.getKey().shortName(), entry.getValue());
+    	}
+    	return metricMap;
+	}
+    
+	private static Map<String, Map<String, Long>> transformMetrics(Map<String, Map<MetricType, Long>> metricMap) {
+		Function<Map<MetricType, Long>, Map<String, Long>> func = new Function<Map<MetricType, Long>, Map<String, Long>>() {
+			@Override
+			public Map<String, Long> apply(Map<MetricType, Long> map) {
+				return createMetricMap(map);
+			}
+		};
+		return Maps.transformValues(metricMap, func);
+	}
+    
     /**
      * Method to expose the metrics associated with performing reads using the passed result set. A typical pattern is:
      * 
      * <pre>
      * {@code
-     * Map<String, Map<String, Long>> overAllQueryMetrics = null;
-     * Map<String, Map<String, Long>> requestReadMetrics = null;
+     * Map<String, Map<MetricType, Long>> overAllQueryMetrics = null;
+     * Map<String, Map<MetricType, Long>> requestReadMetrics = null;
      * try (ResultSet rs = stmt.executeQuery()) {
      *    while(rs.next()) {
      *      .....
@@ -1335,10 +1358,16 @@ public class PhoenixRuntime {
      * @return a map of (table name) -> (map of (metric name) -> (metric value))
      * @throws SQLException
      */
-    public static Map<String, Map<String, Long>> getRequestReadMetrics(ResultSet rs) throws SQLException {
+    public static Map<String, Map<MetricType, Long>> getRequestReadMetricInfo(ResultSet rs) throws SQLException {
         PhoenixResultSet resultSet = rs.unwrap(PhoenixResultSet.class);
         return resultSet.getReadMetrics();
     }
+    
+    @Deprecated 
+    // use getRequestReadMetricInfo
+    public static Map<String, Map<String, Long>> getRequestReadMetrics(ResultSet rs) throws SQLException {
+        return transformMetrics(getRequestReadMetricInfo(rs));
+    }
 
     /**
      * Method to expose the overall metrics associated with executing a query via phoenix. A typical pattern of
@@ -1346,8 +1375,8 @@ public class PhoenixRuntime {
      * 
      * <pre>
      * {@code
-     * Map<String, Map<String, Long>> overAllQueryMetrics = null;
-     * Map<String, Map<String, Long>> requestReadMetrics = null;
+     * Map<String, Map<MetricType, Long>> overAllQueryMetrics = null;
+     * Map<String, Map<MetricType, Long>> requestReadMetrics = null;
      * try (ResultSet rs = stmt.executeQuery()) {
      *    while(rs.next()) {
      *      .....
@@ -1363,10 +1392,16 @@ public class PhoenixRuntime {
      * @return a map of metric name -> metric value
      * @throws SQLException
      */
-    public static Map<String, Long> getOverAllReadRequestMetrics(ResultSet rs) throws SQLException {
+    public static Map<MetricType, Long> getOverAllReadRequestMetricInfo(ResultSet rs) throws SQLException {
         PhoenixResultSet resultSet = rs.unwrap(PhoenixResultSet.class);
         return resultSet.getOverAllRequestReadMetrics();
     }
+    
+    @Deprecated
+    // use getOverAllReadRequestMetricInfo
+    public static Map<String, Long> getOverAllReadRequestMetrics(ResultSet rs) throws SQLException {
+        return createMetricMap(getOverAllReadRequestMetricInfo(rs));
+    }
 
     /**
      * Method to expose the metrics associated with sending over mutations to HBase. These metrics are updated when
@@ -1375,8 +1410,8 @@ public class PhoenixRuntime {
      * 
      * <pre>
      * {@code
-     * Map<String, Map<String, Long>> mutationWriteMetrics = null;
-     * Map<String, Map<String, Long>> mutationReadMetrics = null;
+     * Map<String, Map<MetricType, Long>> mutationWriteMetrics = null;
+     * Map<String, Map<MetricType, Long>> mutationReadMetrics = null;
      * try (Connection conn = DriverManager.getConnection(url)) {
      *    conn.createStatement.executeUpdate(dml1);
      *    ....
@@ -1396,10 +1431,16 @@ public class PhoenixRuntime {
      * @return a map of (table name) -> (map of (metric name) -> (metric value))
      * @throws SQLException
      */
-    public static Map<String, Map<String, Long>> getWriteMetricsForMutationsSinceLastReset(Connection conn) throws SQLException {
+    public static Map<String, Map<MetricType, Long>> getWriteMetricInfoForMutationsSinceLastReset(Connection conn) throws SQLException {
         PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
         return pConn.getMutationMetrics();
     }
+    
+    @Deprecated
+    // use getWriteMetricInfoForMutationsSinceLastReset
+    public static Map<String, Map<String, Long>> getWriteMetricsForMutationsSinceLastReset(Connection conn) throws SQLException {
+        return transformMetrics(getWriteMetricInfoForMutationsSinceLastReset(conn));
+    }
 
     /**
      * Method to expose the read metrics associated with executing a dml statement. These metrics are updated when
@@ -1408,8 +1449,8 @@ public class PhoenixRuntime {
      * 
      * <pre>
      * {@code
-     * Map<String, Map<String, Long>> mutationWriteMetrics = null;
-     * Map<String, Map<String, Long>> mutationReadMetrics = null;
+     * Map<String, Map<MetricType, Long>> mutationWriteMetrics = null;
+     * Map<String, Map<MetricType, Long>> mutationReadMetrics = null;
      * try (Connection conn = DriverManager.getConnection(url)) {
      *    conn.createStatement.executeUpdate(dml1);
      *    ....
@@ -1428,10 +1469,16 @@ public class PhoenixRuntime {
      * @return  a map of (table name) -> (map of (metric name) -> (metric value))
      * @throws SQLException
      */
-    public static Map<String, Map<String, Long>> getReadMetricsForMutationsSinceLastReset(Connection conn) throws SQLException {
+    public static Map<String, Map<MetricType, Long>> getReadMetricInfoForMutationsSinceLastReset(Connection conn) throws SQLException {
         PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
         return pConn.getReadMetrics();
     }
+    
+    @Deprecated
+    // use getReadMetricInfoForMutationsSinceLastReset 
+    public static Map<String, Map<String, Long>> getReadMetricsForMutationsSinceLastReset(Connection conn) throws SQLException {
+        return transformMetrics(getReadMetricInfoForMutationsSinceLastReset(conn));
+    }
 
     /**
      * Reset the read metrics collected in the result set.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53a76635/phoenix-core/src/test/java/org/apache/phoenix/metrics/MetricTypeTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/metrics/MetricTypeTest.java b/phoenix-core/src/test/java/org/apache/phoenix/metrics/MetricTypeTest.java
new file mode 100644
index 0000000..5b0909a
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/metrics/MetricTypeTest.java
@@ -0,0 +1,42 @@
+package org.apache.phoenix.metrics;
+
+import static org.junit.Assert.fail;
+
+import java.util.Map;
+
+import org.apache.phoenix.monitoring.MetricType;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class MetricTypeTest {
+
+	@Test
+    public void testUniqueShortNames() throws Exception {
+		Map<String, MetricType> shortNameMap = Maps.newHashMapWithExpectedSize(MetricType.values().length);
+        for (MetricType type : MetricType.values()) {
+        	MetricType oldMetricType = shortNameMap.put(type.shortName(), type);
+        	if (oldMetricType!=null) {
+				fail("Metric short names should be unique found duplicates for " + type.name() + " and "
+						+ oldMetricType.name());
+        	}
+        }
+    }
+}