You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2015/03/06 01:48:17 UTC

phoenix git commit: PHOENIX-1452 Add Add Phoenix client-side logging and capture resource utilization metrics

Repository: phoenix
Updated Branches:
  refs/heads/master 9a546b9c8 -> c65d5ec49


PHOENIX-1452 Add Add Phoenix client-side logging and capture resource utilization metrics


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c65d5ec4
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c65d5ec4
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c65d5ec4

Branch: refs/heads/master
Commit: c65d5ec4908a3fb5a737338c92c6928025b1e775
Parents: 9a546b9
Author: Samarth <sa...@salesforce.com>
Authored: Thu Mar 5 16:47:32 2015 -0800
Committer: Samarth <sa...@salesforce.com>
Committed: Thu Mar 5 16:47:32 2015 -0800

----------------------------------------------------------------------
 .../phoenix/end2end/PhoenixMetricsIT.java       | 151 +++++++++++++++++++
 .../apache/phoenix/execute/MutationState.java   |  34 ++---
 .../phoenix/iterate/BaseResultIterators.java    |  11 +-
 .../phoenix/iterate/ParallelIterators.java      |   7 +-
 .../phoenix/iterate/ScanningResultIterator.java |  24 ++-
 .../phoenix/iterate/SpoolingResultIterator.java |   9 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  16 +-
 .../java/org/apache/phoenix/job/JobManager.java | 141 ++++++++++++++---
 .../phoenix/mapreduce/CsvBulkLoadTool.java      |  19 ++-
 .../phoenix/memory/GlobalMemoryManager.java     |   8 +-
 .../org/apache/phoenix/monitoring/Counter.java  |  85 +++++++++++
 .../org/apache/phoenix/monitoring/Metric.java   |  64 ++++++++
 .../phoenix/monitoring/PhoenixMetrics.java      | 118 +++++++++++++++
 .../phoenix/monitoring/SizeStatistic.java       |  78 ++++++++++
 .../phoenix/query/BaseQueryServicesImpl.java    |   3 +-
 .../org/apache/phoenix/query/QueryServices.java |   1 +
 .../phoenix/query/QueryServicesOptions.java     |  18 ++-
 .../org/apache/phoenix/util/PhoenixRuntime.java |  11 +-
 18 files changed, 734 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/c65d5ec4/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java
new file mode 100644
index 0000000..856480a
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.FAILED_QUERY;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.MUTATION_COUNT;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.NUM_SPOOL_FILE;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.QUERY_COUNT;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.QUERY_TIMEOUT;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.REJECTED_TASK_COUNT;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BATCH_SIZE;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.QUERY_TIME;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.SCAN_BYTES;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.TASK_END_TO_END_TIME;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.TASK_EXECUTION_TIME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+
+import org.apache.phoenix.monitoring.Metric;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.Test;
+
+public class PhoenixMetricsIT extends BaseHBaseManagedTimeIT {
+    
+    @Test
+    public void testResetPhoenixMetrics() {
+        resetMetrics();
+        for (Metric m : PhoenixRuntime.getInternalPhoenixMetrics()) {
+            assertEquals(0, m.getTotalSum());
+            assertEquals(0, m.getNumberOfSamples());
+        }
+    }
+    
+    @Test
+    public void testPhoenixMetricsForQueries() throws Exception {
+        createTableAndInsertValues("T", true);
+        resetMetrics(); // we want to count metrics related only to the below query
+        Connection conn = DriverManager.getConnection(getUrl());
+        String query = "SELECT * FROM T";
+        ResultSet rs = conn.createStatement().executeQuery(query);
+        while (rs.next()) {
+            rs.getString(1);
+            rs.getString(2);
+        }
+        assertEquals(1, PARALLEL_SCANS.getMetric().getTotalSum());
+        assertEquals(1, QUERY_COUNT.getMetric().getTotalSum());
+        assertEquals(0, REJECTED_TASK_COUNT.getMetric().getTotalSum());
+        assertEquals(0, QUERY_TIMEOUT.getMetric().getTotalSum());
+        assertEquals(0, FAILED_QUERY.getMetric().getTotalSum());
+        assertEquals(0, NUM_SPOOL_FILE.getMetric().getTotalSum());
+        assertEquals(0, MUTATION_BATCH_SIZE.getMetric().getTotalSum());
+        assertEquals(0, MUTATION_BYTES.getMetric().getTotalSum());
+        assertEquals(0, MUTATION_COMMIT_TIME.getMetric().getTotalSum());
+        
+        assertTrue(SCAN_BYTES.getMetric().getTotalSum() > 0);
+        assertTrue(QUERY_TIME.getMetric().getTotalSum() > 0);
+        assertTrue(TASK_END_TO_END_TIME.getMetric().getTotalSum() > 0);
+        assertTrue(TASK_EXECUTION_TIME.getMetric().getTotalSum() > 0);
+    }
+    
+    @Test
+    public void testPhoenixMetricsForMutations() throws Exception {
+        createTableAndInsertValues("T", true);
+        assertEquals(10, MUTATION_BATCH_SIZE.getMetric().getTotalSum());
+        assertEquals(10, MUTATION_COUNT.getMetric().getTotalSum());
+        assertTrue(MUTATION_BYTES.getMetric().getTotalSum() > 0);
+        assertTrue(MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0);
+        assertEquals(0, PARALLEL_SCANS.getMetric().getTotalSum());
+        assertEquals(0, QUERY_COUNT.getMetric().getTotalSum());
+        assertEquals(0, REJECTED_TASK_COUNT.getMetric().getTotalSum());
+        assertEquals(0, QUERY_TIMEOUT.getMetric().getTotalSum());
+        assertEquals(0, FAILED_QUERY.getMetric().getTotalSum());
+        assertEquals(0, NUM_SPOOL_FILE.getMetric().getTotalSum());
+    }
+    
+    
+    @Test
+    public void testPhoenixMetricsForUpsertSelect() throws Exception { 
+        createTableAndInsertValues("T", true);
+        resetMetrics();
+        String ddl = "CREATE TABLE T2 (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute(ddl);
+        resetMetrics();
+        String dml = "UPSERT INTO T2 (K, V) SELECT K, V FROM T";
+        conn.createStatement().executeUpdate(dml);
+        conn.commit();
+        assertEquals(10, MUTATION_BATCH_SIZE.getMetric().getTotalSum());
+        assertEquals(1, MUTATION_COUNT.getMetric().getTotalSum());
+        assertEquals(1, PARALLEL_SCANS.getMetric().getTotalSum());
+        assertEquals(0, QUERY_TIME.getMetric().getTotalSum());
+        assertTrue(SCAN_BYTES.getMetric().getTotalSum() > 0);
+        assertTrue(MUTATION_BYTES.getMetric().getTotalSum() > 0);
+        assertTrue(MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0);
+        assertEquals(0, QUERY_COUNT.getMetric().getTotalSum());
+        assertEquals(0, REJECTED_TASK_COUNT.getMetric().getTotalSum());
+        assertEquals(0, QUERY_TIMEOUT.getMetric().getTotalSum());
+        assertEquals(0, FAILED_QUERY.getMetric().getTotalSum());
+        assertEquals(0, NUM_SPOOL_FILE.getMetric().getTotalSum());
+    }
+    
+    private static void resetMetrics() {
+        for (Metric m : PhoenixRuntime.getInternalPhoenixMetrics()) {
+            m.reset();
+        }
+    }
+    
+    private static void createTableAndInsertValues(String tableName, boolean resetMetricsAfterTableCreate) throws Exception {
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute(ddl);
+        if (resetMetricsAfterTableCreate) {
+            resetMetrics();
+        }
+        // executing 10 upserts/mutations.
+        String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+        PreparedStatement stmt = conn.prepareStatement(dml);
+        for (int i = 1; i <= 10; i++) {
+            stmt.setString(1, "key" + i);
+            stmt.setString(2, "value" + i);
+            stmt.executeUpdate();
+        }
+        conn.commit();
+    }
+    
+    
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c65d5ec4/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 04626a6..b98d705 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -23,10 +23,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -41,6 +38,7 @@ import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.IndexMetaDataCacheClient;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.monitoring.PhoenixMetrics;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.MetaDataClient;
@@ -64,6 +62,9 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BATCH_SIZE;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_COMMIT_TIME;
 
 /**
  * 
@@ -81,7 +82,7 @@ public class MutationState implements SQLCloseable {
     private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> mutations = Maps.newHashMapWithExpectedSize(3); // TODO: Sizing?
     private long sizeOffset;
     private int numRows = 0;
-
+    
     public MutationState(int maxSize, PhoenixConnection connection) {
         this(maxSize,connection,0);
     }
@@ -337,19 +338,15 @@ public class MutationState implements SQLCloseable {
     private static void logMutationSize(HTableInterface htable, List<Mutation> mutations, PhoenixConnection connection) {
         long byteSize = 0;
         int keyValueCount = 0;
-        for (Mutation mutation : mutations) {
-            if (mutation.getFamilyCellMap() != null) { // Not a Delete of the row
-                for (Entry<byte[], List<Cell>> entry : mutation.getFamilyCellMap().entrySet()) {
-                    if (entry.getValue() != null) {
-                        for (Cell kv : entry.getValue()) {
-                            byteSize += CellUtil.estimatedSizeOf(kv);
-                            keyValueCount++;
-                        }
-                    }
-                }
+        if (PhoenixMetrics.isMetricsEnabled() || logger.isDebugEnabled()) {
+            for (Mutation mutation : mutations) {
+                byteSize += mutation.heapSize();
+            }
+            MUTATION_BYTES.update(byteSize);
+            if (logger.isDebugEnabled()) {
+                logger.debug(LogUtil.addCustomAnnotations("Sending " + mutations.size() + " mutations for " + Bytes.toString(htable.getTableName()) + " with " + keyValueCount + " key values of total size " + byteSize + " bytes", connection));
             }
         }
-        logger.debug(LogUtil.addCustomAnnotations("Sending " + mutations.size() + " mutations for " + Bytes.toString(htable.getTableName()) + " with " + keyValueCount + " key values of total size " + byteSize + " bytes", connection));
     }
     
     @SuppressWarnings("deprecation")
@@ -418,13 +415,16 @@ public class MutationState implements SQLCloseable {
                     SQLException sqlE = null;
                     HTableInterface hTable = connection.getQueryServices().getTable(htableName);
                     try {
-                        if (logger.isDebugEnabled()) logMutationSize(hTable, mutations, connection);
+                        logMutationSize(hTable, mutations, connection);
+                        MUTATION_BATCH_SIZE.update(mutations.size());
                         long startTime = System.currentTimeMillis();
                         child.addTimelineAnnotation("Attempt " + retryCount);
                         hTable.batch(mutations);
                         child.stop();
+                        long duration = System.currentTimeMillis() - startTime;
+                        MUTATION_COMMIT_TIME.update(duration);
                         shouldRetry = false;
-                        if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Total time for batch call of  " + mutations.size() + " mutations into " + table.getName().getString() + ": " + (System.currentTimeMillis() - startTime) + " ms", connection));
+                        if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Total time for batch call of  " + mutations.size() + " mutations into " + table.getName().getString() + ": " + duration + " ms", connection));
                         committedList.add(entry);
                     } catch (Exception e) {
                         SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c65d5ec4/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 2285fec..a120143 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -18,6 +18,8 @@
 package org.apache.phoenix.iterate;
 
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.FAILED_QUERY;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.QUERY_TIMEOUT;
 import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
 
 import java.sql.SQLException;
@@ -101,7 +103,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
     protected final String scanId;
     // TODO: too much nesting here - breakup into new classes.
     private final List<List<List<Pair<Scan,Future<PeekingResultIterator>>>>> allFutures;
-
     
     static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() {
         @Override
@@ -519,7 +520,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         SQLException toThrow = null;
         // Get query time out from Statement and convert from seconds back to milliseconds
         int queryTimeOut = context.getStatement().getQueryTimeout() * 1000;
-        long maxQueryEndTime = System.currentTimeMillis() + queryTimeOut;
+        final long startTime = System.currentTimeMillis();
+        final long maxQueryEndTime = startTime + queryTimeOut;
         try {
             submitWork(scans, futures, allIterators, splits.size());
             boolean clearedCache = false;
@@ -575,10 +577,10 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                 }
                 addIterator(iterators, concatIterators);
             }
-
             success = true;
             return iterators;
         } catch (TimeoutException e) {
+            QUERY_TIMEOUT.increment();
             // thrown when a thread times out waiting for the future.get() call to return
             toThrow = new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT)
                     .setMessage(". Query couldn't be completed in the alloted time: " + queryTimeOut + " ms")
@@ -612,6 +614,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                 }
             } finally {
                 if (toThrow != null) {
+                    FAILED_QUERY.increment();
                     throw toThrow;
                 }
             }
@@ -678,7 +681,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
     abstract protected String getName();    
     abstract protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
             List<PeekingResultIterator> allIterators, int estFlattenedSize);
-
+    
     @Override
     public int size() {
         return this.scans.size();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c65d5ec4/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 62af19a..b74919b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.iterate;
 
+import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.PARALLEL_SCANS;
+
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
@@ -34,8 +36,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
-
-
 /**
  *
  * Class that parallelizes the scan over a table using the ExecutorService provided.  Each region of the table will be scanned in parallel with
@@ -53,7 +53,7 @@ public class ParallelIterators extends BaseResultIterators {
             throws SQLException {
         super(plan, perScanLimit);
         this.iteratorFactory = iteratorFactory;
-    }
+    }   
 
     @Override
     protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
@@ -78,6 +78,7 @@ public class ParallelIterators extends BaseResultIterators {
         // Shuffle so that we start execution across many machines
         // before we fill up the thread pool
         Collections.shuffle(scanLocations);
+        PARALLEL_SCANS.update(scanLocations.size());
         for (ScanLocator scanLocation : scanLocations) {
             final Scan scan = scanLocation.getScan();
             Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c65d5ec4/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
index 8aa9a2d..fd65d0c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -17,21 +17,24 @@
  */
 package org.apache.phoenix.iterate;
 
+import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.SCAN_BYTES;
+
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
-
+import org.apache.phoenix.monitoring.PhoenixMetrics;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ServerUtil;
 
-
 public class ScanningResultIterator implements ResultIterator {
     private final ResultScanner scanner;
-    
     public ScanningResultIterator(ResultScanner scanner) {
         this.scanner = scanner;
     }
@@ -45,6 +48,7 @@ public class ScanningResultIterator implements ResultIterator {
     public Tuple next() throws SQLException {
         try {
             Result result = scanner.next();
+            calculateScanSize(result);
             // TODO: use ResultTuple.setResult(result)
             // Need to create a new one if holding on to it (i.e. OrderedResultIterator)
             return result == null ? null : new ResultTuple(result);
@@ -61,4 +65,18 @@ public class ScanningResultIterator implements ResultIterator {
 	public String toString() {
 		return "ScanningResultIterator [scanner=" + scanner + "]";
 	}
+	
+	private static void calculateScanSize(Result result) {
+	    if (PhoenixMetrics.isMetricsEnabled()) {
+	        if (result != null) {
+	            Cell[] cells = result.rawCells();
+	            long scanResultSize = 0;
+	            for (Cell cell : cells) {
+	                KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+	                scanResultSize += kv.heapSize();
+	            }
+	            SCAN_BYTES.update(scanResultSize);
+	        }
+	    }
+	}
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c65d5ec4/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
index 71ad7d7..63d3761 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
@@ -17,6 +17,9 @@
  */
 package org.apache.phoenix.iterate;
 
+import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.NUM_SPOOL_FILE;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.SPOOL_FILE_SIZE;
+
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -43,8 +46,6 @@ import org.apache.phoenix.util.ResultUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TupleUtil;
 
-
-
 /**
  *
  * Result iterator that spools the results of a scan to disk once an in-memory threshold has been reached.
@@ -55,7 +56,7 @@ import org.apache.phoenix.util.TupleUtil;
  */
 public class SpoolingResultIterator implements PeekingResultIterator {
     private final PeekingResultIterator spoolFrom;
-
+    
     public static class SpoolingResultIteratorFactory implements ParallelIteratorFactory {
         private final QueryServices services;
 
@@ -115,6 +116,8 @@ public class SpoolingResultIterator implements PeekingResultIterator {
                 chunk.resize(data.length);
                 spoolFrom = new InMemoryResultIterator(data, chunk);
             } else {
+                NUM_SPOOL_FILE.increment();
+                SPOOL_FILE_SIZE.update(spoolTo.getFile().length());
                 spoolFrom = new OnDiskResultIterator(spoolTo.getFile());
                 if (spoolTo.getFile() != null) {
                     spoolTo.getFile().deleteOnExit();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c65d5ec4/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 4a23ab7..746c0b7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -17,6 +17,10 @@
  */
 package org.apache.phoenix.jdbc;
 
+import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.MUTATION_COUNT;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.QUERY_COUNT;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.QUERY_TIME;
+
 import java.io.IOException;
 import java.io.Reader;
 import java.sql.ParameterMetaData;
@@ -126,16 +130,12 @@ import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ServerUtil;
-import org.cloudera.htrace.Sampler;
-import org.cloudera.htrace.TraceScope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
-
-
 /**
  * 
  * JDBC Statement implementation of Phoenix.
@@ -225,11 +225,13 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
     }
     
     protected PhoenixResultSet executeQuery(final CompilableStatement stmt) throws SQLException {
+        QUERY_COUNT.increment();
         try {
             return CallRunner.run(
                 new CallRunner.CallableThrowable<PhoenixResultSet, SQLException>() {
                 @Override
                     public PhoenixResultSet call() throws SQLException {
+                    final long startTime = System.currentTimeMillis();
                     try {
                         QueryPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE);
                         plan = connection.getQueryServices().getOptimizer().optimize(
@@ -255,6 +257,11 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                             throw (SQLException) e.getCause();
                         }
                         throw e;
+                    } finally {
+                        // Regardless of whether the query was successfully handled or not, 
+                        // update the time spent so far. If needed, we can separate out the
+                        // success times and failure times.
+                        QUERY_TIME.update(System.currentTimeMillis() - startTime);
                     }
                 }
                 }, PhoenixContextExecutor.inContext());
@@ -270,6 +277,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                 SQLExceptionCode.READ_ONLY_CONNECTION).
                 build().buildException();
         }
+	    MUTATION_COUNT.increment();
         try {
             return CallRunner
                     .run(

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c65d5ec4/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java b/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
index de10042..35c95c8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
@@ -17,10 +17,18 @@
  */
 package org.apache.phoenix.job;
 
+import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.REJECTED_TASK_COUNT;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.TASK_COUNT;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.TASK_END_TO_END_TIME;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.TASK_EXECUTION_TIME;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.TASK_QUEUE_WAIT_TIME;
+
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
@@ -29,7 +37,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 /**
  * 
  * Thread pool executor that executes scans in parallel
@@ -55,35 +62,49 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> {
         public Object getJobId();
     }
 
-    public static ThreadPoolExecutor createThreadPoolExec(int keepAliveMs, int size, int queueSize) {
+    public static ThreadPoolExecutor createThreadPoolExec(int keepAliveMs, int size, int queueSize, boolean useInstrumentedThreadPool) {
         BlockingQueue<Runnable> queue;
         if (queueSize == 0) {
             queue = new SynchronousQueue<Runnable>(); // Specialized for 0 length.
         } else {
             queue = new JobManager<Runnable>(queueSize);
         }
-
+        String name = "phoenix-" + PHOENIX_POOL_INDEX.getAndIncrement();
         ThreadFactory threadFactory = new ThreadFactoryBuilder()
-                .setNameFormat("phoenix-" + PHOENIX_POOL_INDEX.getAndIncrement() + "-thread-%s")
+                .setNameFormat(name + "-thread-%s")
                 .setDaemon(true)
                 .setThreadFactory(
                         new ContextClassLoaderThreadFactory(JobManager.class.getClassLoader()))
                 .build();
-        // For thread pool, set core threads = max threads -- we don't ever want to exceed core threads, but want to go up to core threads *before* using the queue.
-        ThreadPoolExecutor exec = new ThreadPoolExecutor(size, size, keepAliveMs, TimeUnit.MILLISECONDS, queue, threadFactory) {
-            @Override
-            protected <T> RunnableFuture<T> newTaskFor(Callable<T> call) {
-                // Override this so we can create a JobFutureTask so we can extract out the parentJobId (otherwise, in the default FutureTask, it is private). 
-                return new JobFutureTask<T>(call);
-            }
-    
-            @Override
-            protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
-                return new JobFutureTask<T>(runnable, value);
-            }
-            
-        };
+        ThreadPoolExecutor exec;
+        if (useInstrumentedThreadPool) {
+            // For thread pool, set core threads = max threads -- we don't ever want to exceed core threads, but want to go up to core threads *before* using the queue.
+            exec = new InstrumentedThreadPoolExecutor(name, size, size, keepAliveMs, TimeUnit.MILLISECONDS, queue, threadFactory) {
+                @Override
+                protected <T> RunnableFuture<T> newTaskFor(Callable<T> call) {
+                    return new InstrumentedJobFutureTask<T>(call);
+                }
+        
+                @Override
+                protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+                    return new InstrumentedJobFutureTask<T>(runnable, value);
+                }
+            };
+        } else {
+            // For thread pool, set core threads = max threads -- we don't ever want to exceed core threads, but want to go up to core threads *before* using the queue.
+            exec = new ThreadPoolExecutor(size, size, keepAliveMs, TimeUnit.MILLISECONDS, queue, threadFactory) {
+                @Override
+                protected <T> RunnableFuture<T> newTaskFor(Callable<T> call) {
+                    // Override this so we can create a JobFutureTask so we can extract out the parentJobId (otherwise, in the default FutureTask, it is private). 
+                    return new JobFutureTask<T>(call);
+                }
         
+                @Override
+                protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+                    return new JobFutureTask<T>(runnable, value);
+                }
+            };
+        }
         exec.allowCoreThreadTimeOut(true); // ... and allow core threads to time out.  This just keeps things clean when idle, and is nice for ftests modes, etc., where we'd especially like these not to linger.
         return exec;
     }
@@ -117,8 +138,47 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> {
             return jobId;
         }
     }
+    
+    /**
+     * Instrumented version of {@link JobFutureTask} that measures time spent by a task at various stages in the queue
+     * and when executed.
+     */
+    private static class InstrumentedJobFutureTask<T> extends JobFutureTask<T> {
 
+        /*
+         * Time at which the task was submitted to the executor.
+         */
+        private final long taskSubmissionTime;
 
+        // Time at which the task is about to be executed. 
+        private long taskExecutionStartTime;
+
+        public InstrumentedJobFutureTask(Runnable r, T t) {
+            super(r, t);
+            this.taskSubmissionTime = System.currentTimeMillis();
+        }
+
+        public InstrumentedJobFutureTask(Callable<T> c) {
+            super(c);
+            this.taskSubmissionTime = System.currentTimeMillis();
+        }
+        
+        @Override
+        public void run() {
+            this.taskExecutionStartTime = System.currentTimeMillis();
+            super.run();
+        }
+        
+        public long getTaskSubmissionTime() {
+            return taskSubmissionTime;
+        }
+        
+        public long getTaskExecutionStartTime() {
+            return taskExecutionStartTime;
+        }
+
+    }
+    
     /**
      * Delegating callable implementation that preserves the parentJobId and sets up thread tracker stuff before delegating to the actual command. 
      */
@@ -151,5 +211,50 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> {
             return t;
         }
     }
+    
+    /**
+     * Thread pool executor that instruments the various characteristics of the backing pool of threads and queue. This
+     * executor assumes that all the tasks handled are of type {@link JobManager.InstrumentedJobFutureTask}
+     */
+    private static class InstrumentedThreadPoolExecutor extends ThreadPoolExecutor {
+
+        private final RejectedExecutionHandler rejectedExecHandler = new RejectedExecutionHandler() {
+            @Override
+            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                REJECTED_TASK_COUNT.increment();
+                throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + executor.toString());
+            }
+        };
+
+        public InstrumentedThreadPoolExecutor(String threadPoolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+                BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
+            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+            setRejectedExecutionHandler(rejectedExecHandler);
+        }
+
+        @Override
+        public void execute(Runnable task) {
+            TASK_COUNT.increment();
+            super.execute(task);
+        }
+
+        @Override
+        protected void beforeExecute(Thread worker, Runnable task) {
+            InstrumentedJobFutureTask instrumentedTask = (InstrumentedJobFutureTask)task;
+            TASK_QUEUE_WAIT_TIME.update(System.currentTimeMillis() - instrumentedTask.getTaskSubmissionTime());
+            super.beforeExecute(worker, instrumentedTask);
+        }
+
+        @Override
+        protected void afterExecute(Runnable task, Throwable t) {
+            InstrumentedJobFutureTask instrumentedTask = (InstrumentedJobFutureTask)task;
+            try {
+                super.afterExecute(instrumentedTask, t);
+            } finally {
+                TASK_EXECUTION_TIME.update(System.currentTimeMillis() - instrumentedTask.getTaskExecutionStartTime());
+                TASK_END_TO_END_TIME.update(System.currentTimeMillis() - instrumentedTask.getTaskSubmissionTime());
+            }
+        }
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c65d5ec4/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
index c92a3a3..31f8b42 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
@@ -29,9 +29,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -57,9 +54,13 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.job.JobManager;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.util.CSVCommonsLoader;
@@ -71,6 +72,10 @@ import org.apache.phoenix.util.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
 /**
  * Base tool for running MapReduce-based ingests of data.
  */
@@ -236,7 +241,13 @@ public class CsvBulkLoadTool extends Configured implements Tool {
         }
         
         List<Future<Boolean>> runningJobs = new ArrayList<Future<Boolean>>();
-        ExecutorService executor =  JobManager.createThreadPoolExec(Integer.MAX_VALUE, 5, 20);
+        boolean useInstrumentedPool = conn
+                .unwrap(PhoenixConnection.class)
+                .getQueryServices()
+                .getProps()
+                .getBoolean(QueryServices.METRICS_ENABLED,
+                        QueryServicesOptions.DEFAULT_IS_METRICS_ENABLED);
+        ExecutorService executor =  JobManager.createThreadPoolExec(Integer.MAX_VALUE, 5, 20, useInstrumentedPool);
         try{
 	        for (TargetTableRef table : tablesToBeLoaded) {
 	        	Path tablePath = new Path(outputPath, table.getPhysicalName());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c65d5ec4/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
index 3a98067..02c1dea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
@@ -17,11 +17,12 @@
  */
 package org.apache.phoenix.memory;
 
+import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MEMORY_MANAGER_BYTES;
+import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MEMORY_WAIT_TIME;
+
 import org.apache.http.annotation.GuardedBy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-
 /**
  * 
  * Global memory manager to track course grained memory usage across all requests.
@@ -37,7 +38,6 @@ public class GlobalMemoryManager implements MemoryManager {
     private final int maxWaitMs;
     @GuardedBy("sync")
     private volatile long usedMemoryBytes;
-    
     public GlobalMemoryManager(long maxBytes, int maxWaitMs) {
         if (maxBytes <= 0) {
             throw new IllegalStateException("Total number of available bytes (" + maxBytes + ") must be greater than zero");
@@ -92,6 +92,8 @@ public class GlobalMemoryManager implements MemoryManager {
             }
             usedMemoryBytes += nBytes;
         }
+        MEMORY_WAIT_TIME.update(System.currentTimeMillis() - startTimeMs);
+        MEMORY_MANAGER_BYTES.update(nBytes);
         return nBytes;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c65d5ec4/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Counter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Counter.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Counter.java
new file mode 100644
index 0000000..141294d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Counter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * Incrementing only counter that keeps track of the 
+ * number of occurrences of something.
+ * 
+ */
+@ThreadSafe
+class Counter implements Metric {
+    
+    private final AtomicLong counter;
+    private final String name;
+    private final String description;
+    
+    public Counter(String name, String description) {
+        this.name = name;
+        this.description = description;
+        this.counter = new AtomicLong(0);
+    }
+    
+    public long increment() {
+        return counter.incrementAndGet();
+    }
+    
+    public long getCurrentCount() {
+        return counter.get();
+    }
+    
+    @Override
+    public String getName() {
+        return name;
+    }
+    
+    @Override
+    public String getDescription() {
+        return description;
+    }
+    
+    @Override
+    public void reset() {
+        counter.set(0);
+    }
+    
+    @Override
+    public String toString() {
+        return "Name: " + name + ", Current count: " + counter.get();
+    }
+    
+    @Override
+    public String getCurrentMetricState() {
+        return toString();
+    }
+
+    @Override
+    public long getNumberOfSamples() {
+        return getCurrentCount();
+    }
+
+    @Override
+    public long getTotalSum() {
+        return getCurrentCount();
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c65d5ec4/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
new file mode 100644
index 0000000..aef792c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+/**
+ * Interface that exposes the various internal phoenix metrics collected.
+ * Because metrics are dynamic in nature, it is not guaranteed that the
+ * state exposed will always be in sync with each other. One should use
+ * these metrics primarily for monitoring and debugging purposes. 
+ */
+public interface Metric {
+    
+    /**
+     * 
+     * @return Name of the metric
+     */
+    public String getName();
+    
+    /**
+     * 
+     * @return Description of the metric
+     */
+    public String getDescription();
+    
+    /**
+     * Reset the internal state. Typically called after
+     * metric information has been collected and a new
+     * phase of collection is being requested for the next
+     * interval.
+     */
+    public void reset();
+    
+    /**
+     * 
+     * @return String that represents the current state of the metric.
+     * Typically used to log the current state.
+     */
+    public String getCurrentMetricState();
+    
+    /**
+     * @return Number of samples collected since the last {@link #reset()} call.
+     */
+    public long getNumberOfSamples();
+    
+    /**
+     * @return Sum of the values of the metric sampled since the last {@link #reset()} call.
+     */
+    public long getTotalSum();
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c65d5ec4/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java
new file mode 100644
index 0000000..28e2f2e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+/**
+ * Central place where we keep track of all the internal
+ * phoenix metrics that we track.
+ * 
+ */
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.phoenix.query.QueryServicesOptions;
+
+public class PhoenixMetrics {
+    private static final boolean isMetricsEnabled = QueryServicesOptions.withDefaults().isMetricsEnabled();
+
+    public static boolean isMetricsEnabled() {
+        return isMetricsEnabled;
+    }
+
+    public enum SizeMetric {
+        MUTATION_BATCH_SIZE("CumulativeBatchSizesOfMutations", "Cumulative batch sizes of mutations"),
+        MUTATION_BYTES("CumulativeMutationSize", "Cumulative size of mutations in bytes"),
+        MUTATION_COMMIT_TIME("CumulativeMutationTime", "Cumulative time it took to send mutations"),
+        QUERY_TIME("QueryTime", "Cumulative query times"),
+        PARALLEL_SCANS("CumulativeNumberOfParallelScans", "Cumulative number of scans executed that were executed in parallel"),
+        SCAN_BYTES("CumulativeScanBytesSize", "Cumulative number of bytes read by scans"),
+        SPOOL_FILE_SIZE("CumulativeSpoolFilesSize", "Cumulative size of spool files created in bytes"),
+        MEMORY_MANAGER_BYTES("CumulativeBytesAllocated", "Cumulative number of bytes allocated by the memory manager"),
+        MEMORY_WAIT_TIME("CumulativeMemoryWaitTime", "Cumulative number of milliseconds threads needed to wait for memory to be allocated through memory manager"),
+        TASK_QUEUE_WAIT_TIME("CumulativeTaskQueueWaitTime", "Cumulative time in milliseconds tasks had to wait in the queue of the thread pool executor"),
+        TASK_END_TO_END_TIME("CumulativeTaskEndToEndTime", "Cumulative time in milliseconds spent by tasks from creation to completion"),
+        TASK_EXECUTION_TIME("CumulativeTaskExecutionTime", "Cumulative time in milliseconds tasks took to execute");
+
+        private final SizeStatistic metric;
+
+        private SizeMetric(String metricName, String metricDescription) {
+            metric = new SizeStatistic(metricName, metricDescription);
+        }
+
+        public void update(long value) {
+            if (isMetricsEnabled) {
+                metric.add(value);
+            }
+        }
+        
+        // exposed for testing.
+        public Metric getMetric() {
+            return metric;
+        }
+        
+        @Override
+        public String toString() {
+            return metric.toString();
+        }
+    }
+
+    public enum CountMetric {
+        MUTATION_COUNT("NumMutationCounter", "Counter for number of mutation statements"),
+        QUERY_COUNT("NumQueryCounter", "Counter for number of queries"),
+        TASK_COUNT("NumberOfTasksCounter", "Counter for number of tasks submitted to the thread pool executor"),
+        REJECTED_TASK_COUNT("RejectedTasksCounter", "Counter for number of tasks that were rejected by the thread pool executor"),
+        QUERY_TIMEOUT("QueryTimeoutCounter", "Number of times query timed out"),
+        FAILED_QUERY("QueryFailureCounter", "Number of times query failed"),
+        NUM_SPOOL_FILE("NumSpoolFilesCounter", "Number of spool files created");
+
+        private final Counter metric;
+
+        private CountMetric(String metricName, String metricDescription) {
+            metric = new Counter(metricName, metricDescription);
+        }
+
+        public void increment() {
+            if (isMetricsEnabled) {
+                metric.increment();
+            }
+        }
+        
+        // exposed for testing.
+        public Metric getMetric() {
+            return metric;
+        }
+        
+        @Override
+        public String toString() {
+            return metric.toString();
+        }
+    }
+    
+    public static Collection<Metric> getMetrics() {
+        List<Metric> metrics = new ArrayList<>();
+        for (SizeMetric s : SizeMetric.values()) {
+            metrics.add(s.metric);
+        }
+        for (CountMetric s : CountMetric.values()) {
+            metrics.add(s.metric);
+        }
+        return metrics;
+    }
+
+}    
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c65d5ec4/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java
new file mode 100644
index 0000000..9eca754
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * 
+ * Statistic that keeps track of the sum of long values that 
+ * could be used to represent a phoenix metric. For performance 
+ * reasons the internal state in this metric is not strictly covariant
+ * and hence should only be used for monitoring and debugging purposes. 
+ */
+class SizeStatistic implements Metric {
+    
+    private final AtomicLong total = new AtomicLong(0);
+    private final AtomicLong numSamples = new AtomicLong(0);
+    private final String name;
+    private final String description;
+    
+    public SizeStatistic(String name, String description) {
+        this.name = name;
+        this.description = description;
+    }
+    
+    @Override
+    public String getName() {
+        return name;
+    }
+    
+    @Override
+    public String getDescription() {
+        return description;
+    }   
+
+    @Override
+    public void reset() {
+        total.set(0);
+        numSamples.set(0);
+    }
+    
+    @Override
+    public String getCurrentMetricState() {
+        return "Name:" + description + ", Total: " + total.get() + ", Number of samples: " + numSamples.get();
+    }
+
+    @Override
+    public long getNumberOfSamples() {
+        return numSamples.get();
+    }
+
+    @Override
+    public long getTotalSum() {
+        return total.get();
+    }
+    
+    public long add(long value) {
+        // there is a race condition here but what the heck.
+        numSamples.incrementAndGet();
+        return total.addAndGet(value);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c65d5ec4/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
index 73cc3c2..898a919 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
@@ -44,7 +44,8 @@ public abstract class BaseQueryServicesImpl implements QueryServices {
         this.executor =  JobManager.createThreadPoolExec(
                 options.getKeepAliveMs(), 
                 options.getThreadPoolSize(), 
-                options.getQueueSize());
+                options.getQueueSize(),
+                options.isMetricsEnabled());
         this.memoryManager = new GlobalMemoryManager(
                 Runtime.getRuntime().maxMemory() * options.getMaxMemoryPerc() / 100,
                 options.getMaxMemoryWaitMs());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c65d5ec4/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index e20d5ee..2eab5dd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -152,6 +152,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String DELAY_FOR_SCHEMA_UPDATE_CHECK = "phoenix.schema.change.delay";
     public static final String DEFAULT_KEEP_DELETED_CELLS_ATTRIB = "phoenix.table.default.keep.deleted.cells";
     public static final String DEFAULT_STORE_NULLS_ATTRIB = "phoenix.table.default.store.nulls";
+    public static final String METRICS_ENABLED = "phoenix.query.metrics.enabled";
 
     /**
      * Get executor service used for parallel scans

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c65d5ec4/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 0f9139f..8cd740a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -61,6 +61,7 @@ import static org.apache.phoenix.query.QueryServices.STATS_USE_CURRENT_TIME_ATTR
 import static org.apache.phoenix.query.QueryServices.THREAD_POOL_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.THREAD_TIMEOUT_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.METRICS_ENABLED;
 
 import java.util.Map.Entry;
 
@@ -181,7 +182,8 @@ public class QueryServicesOptions {
 
     // TODO Change this to true as part of PHOENIX-1543
     public static final boolean DEFAULT_AUTO_COMMIT = false;
-
+    public static final boolean DEFAULT_IS_METRICS_ENABLED = true;
+    
     private final Configuration config;
 
     private QueryServicesOptions(Configuration config) {
@@ -232,7 +234,8 @@ public class QueryServicesOptions {
             .setIfUnset(SCAN_RESULT_CHUNK_SIZE, DEFAULT_SCAN_RESULT_CHUNK_SIZE)
             .setIfUnset(ALLOW_ONLINE_TABLE_SCHEMA_UPDATE, DEFAULT_ALLOW_ONLINE_TABLE_SCHEMA_UPDATE)
             .setIfUnset(NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_RETRIES_FOR_SCHEMA_UPDATE_CHECK)
-            .setIfUnset(DELAY_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_DELAY_FOR_SCHEMA_UPDATE_CHECK);
+            .setIfUnset(DELAY_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_DELAY_FOR_SCHEMA_UPDATE_CHECK)
+            .setIfUnset(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED);
             ;
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user set
@@ -431,7 +434,11 @@ public class QueryServicesOptions {
     public int getSpillableGroupByNumSpillFiles() {
         return config.getInt(GROUPBY_SPILL_FILES_ATTRIB, DEFAULT_GROUPBY_SPILL_FILES);
     }
-
+    
+    public boolean isMetricsEnabled() {
+        return config.getBoolean(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED);
+    }
+    
     public QueryServicesOptions setMaxServerCacheTTLMs(int ttl) {
         return set(MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, ttl);
     }
@@ -500,5 +507,10 @@ public class QueryServicesOptions {
         config.setLong(DELAY_FOR_SCHEMA_UPDATE_CHECK, delayInMillis);
         return this;
     }
+    
+    public QueryServicesOptions setMetricsEnabled(boolean flag) {
+        config.setBoolean(METRICS_ENABLED, flag);
+        return this;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c65d5ec4/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 02a2776..b030510 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -30,6 +30,7 @@ import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -60,6 +61,8 @@ import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.monitoring.Metric;
+import org.apache.phoenix.monitoring.PhoenixMetrics;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
@@ -964,5 +967,11 @@ public class PhoenixRuntime {
         List<PColumn> pkColumns = table.getPKColumns();
         return new RowKeyColumnExpression(pkColumns.get(pkPosition), new RowKeyValueAccessor(pkColumns, pkPosition));
     }
-
+    
+    /**
+     * Exposes the various internal phoenix metrics. 
+     */
+    public static Collection<Metric> getInternalPhoenixMetrics() {
+        return PhoenixMetrics.getMetrics();
+    }
 }