You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2015/06/27 01:29:04 UTC

[2/4] phoenix git commit: PHOENIX-1819 Build a framework to capture and report phoenix client side request level metrics

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
index 5270277..bb4054b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
@@ -57,6 +57,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.job.JobManager;
+import org.apache.phoenix.monitoring.GlobalClientMetrics;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -255,12 +256,9 @@ public class CsvBulkLoadTool extends Configured implements Tool {
         }
         
         List<Future<Boolean>> runningJobs = new ArrayList<Future<Boolean>>();
-        boolean useInstrumentedPool = conn
-                .unwrap(PhoenixConnection.class)
-                .getQueryServices()
-                .getProps()
-                .getBoolean(QueryServices.METRICS_ENABLED,
-                        QueryServicesOptions.DEFAULT_IS_METRICS_ENABLED);
+        boolean useInstrumentedPool = GlobalClientMetrics.isMetricsEnabled()
+                || conn.unwrap(PhoenixConnection.class).isRequestLevelMetricsEnabled();
+                        
         ExecutorService executor =
                 JobManager.createThreadPoolExec(Integer.MAX_VALUE, 5, 20, useInstrumentedPool);
         try{

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index eb6dc3d..b500a25 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.mapreduce;
 
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
+
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
@@ -32,6 +34,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.LookAheadResultIterator;
 import org.apache.phoenix.iterate.PeekingResultIterator;
@@ -40,6 +43,7 @@ import org.apache.phoenix.iterate.RoundRobinResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.iterate.TableResultIterator;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
@@ -100,8 +104,12 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
         final List<Scan> scans = pSplit.getScans();
         try {
             List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size());
+            StatementContext ctx = queryPlan.getContext();
+            ReadMetricQueue readMetrics = ctx.getReadMetricsQueue();
+            String tableName = queryPlan.getTableRef().getTable().getPhysicalName().getString();
             for (Scan scan : scans) {
-                final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(), scan);
+                final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(),
+                        queryPlan.getTableRef(), scan, readMetrics.allotMetric(SCAN_BYTES, tableName));
                 PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
                 iterators.add(peekingResultIterator);
             }
@@ -112,7 +120,7 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
             this.resultIterator = iterator;
             // Clone the row projector as it's not thread safe and would be used simultaneously by
             // multiple threads otherwise.
-            this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector().cloneIfNecessary(),queryPlan.getContext().getStatement());
+            this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector().cloneIfNecessary(), queryPlan.getContext());
         } catch (SQLException e) {
             LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ",e.getMessage()));
             Throwables.propagate(e);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
index 02c1dea..79b49c5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
@@ -17,9 +17,6 @@
  */
 package org.apache.phoenix.memory;
 
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MEMORY_MANAGER_BYTES;
-import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MEMORY_WAIT_TIME;
-
 import org.apache.http.annotation.GuardedBy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,8 +89,6 @@ public class GlobalMemoryManager implements MemoryManager {
             }
             usedMemoryBytes += nBytes;
         }
-        MEMORY_WAIT_TIME.update(System.currentTimeMillis() - startTimeMs);
-        MEMORY_MANAGER_BYTES.update(nBytes);
         return nBytes;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java
new file mode 100644
index 0000000..796e8ba
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Version of {@link Metric} that can be used when the metric is being concurrently accessed or modified by multiple
+ * threads.
+ */
+public class AtomicMetric implements Metric {
+
+    private final MetricType type;
+    private final AtomicLong value = new AtomicLong();
+
+    public AtomicMetric(MetricType type) {
+        this.type = type;
+    }
+
+    @Override
+    public String getName() {
+        return type.name();
+    }
+
+    @Override
+    public String getDescription() {
+        return type.description();
+    }
+
+    @Override
+    public long getValue() {
+        return value.get();
+    }
+
+    @Override
+    public void change(long delta) {
+        value.addAndGet(delta);
+    }
+
+    @Override
+    public void increment() {
+        value.incrementAndGet();
+    }
+
+    @Override
+    public String getCurrentMetricState() {
+        return getName() + ": " + value.get();
+    }
+
+    @Override
+    public void reset() {
+        value.set(0);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java
new file mode 100644
index 0000000..7ebb0c1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+
+
+/**
+ * Interface for representing a metric that could be published and possibly combined with a metric of the same
+ * type.
+ */
+public interface CombinableMetric extends Metric {
+
+    String getPublishString();
+
+    CombinableMetric combine(CombinableMetric metric);
+
+    public class NoOpRequestMetric implements CombinableMetric {
+
+        public static NoOpRequestMetric INSTANCE = new NoOpRequestMetric();
+        private static final String EMPTY_STRING = "";
+
+        @Override
+        public String getName() {
+            return EMPTY_STRING;
+        }
+
+        @Override
+        public String getDescription() {
+            return EMPTY_STRING;
+        }
+
+        @Override
+        public long getValue() {
+            return 0;
+        }
+
+        @Override
+        public void change(long delta) {}
+
+        @Override
+        public void increment() {}
+
+        @Override
+        public String getCurrentMetricState() {
+            return EMPTY_STRING;
+        }
+
+        @Override
+        public void reset() {}
+
+        @Override
+        public String getPublishString() {
+            return EMPTY_STRING;
+        }
+
+        @Override
+        public CombinableMetric combine(CombinableMetric metric) {
+            return INSTANCE;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java
new file mode 100644
index 0000000..fa6f7d3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class CombinableMetricImpl implements CombinableMetric {
+
+    private final Metric metric;
+
+    public CombinableMetricImpl(MetricType type) {
+        metric = new NonAtomicMetric(type);
+    }
+
+    @Override
+    public String getName() {
+        return metric.getName();
+    }
+
+    @Override
+    public String getDescription() {
+        return metric.getDescription();
+    }
+
+    @Override
+    public long getValue() {
+        return metric.getValue();
+    }
+
+    @Override
+    public void change(long delta) {
+        metric.change(delta);
+    }
+
+    @Override
+    public void increment() {
+        metric.increment();
+    }
+
+    @Override
+    public String getCurrentMetricState() {
+        return metric.getCurrentMetricState();
+    }
+
+    @Override
+    public void reset() {
+        metric.reset();
+    }
+
+    @Override
+    public String getPublishString() {
+        return getCurrentMetricState();
+    }
+
+    @Override
+    public CombinableMetric combine(CombinableMetric metric) {
+        checkArgument(this.getClass().equals(metric.getClass()));
+        this.metric.change(metric.getValue());
+        return this;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Counter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Counter.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Counter.java
deleted file mode 100644
index 141294d..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Counter.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.monitoring;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-/**
- * Incrementing only counter that keeps track of the 
- * number of occurrences of something.
- * 
- */
-@ThreadSafe
-class Counter implements Metric {
-    
-    private final AtomicLong counter;
-    private final String name;
-    private final String description;
-    
-    public Counter(String name, String description) {
-        this.name = name;
-        this.description = description;
-        this.counter = new AtomicLong(0);
-    }
-    
-    public long increment() {
-        return counter.incrementAndGet();
-    }
-    
-    public long getCurrentCount() {
-        return counter.get();
-    }
-    
-    @Override
-    public String getName() {
-        return name;
-    }
-    
-    @Override
-    public String getDescription() {
-        return description;
-    }
-    
-    @Override
-    public void reset() {
-        counter.set(0);
-    }
-    
-    @Override
-    public String toString() {
-        return "Name: " + name + ", Current count: " + counter.get();
-    }
-    
-    @Override
-    public String getCurrentMetricState() {
-        return toString();
-    }
-
-    @Override
-    public long getNumberOfSamples() {
-        return getCurrentCount();
-    }
-
-    @Override
-    public long getTotalSum() {
-        return getCurrentCount();
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
new file mode 100644
index 0000000..a8f3bb4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.MEMORY_WAIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_TIME;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.TASK_REJECTED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.SELECT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SPOOL_FILE_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.SPOOL_FILE_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_QUEUE_WAIT_TIME;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.phoenix.query.QueryServicesOptions;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Central place where we keep track of all the global client phoenix metrics. These metrics are different from
+ * {@link ReadMetricQueue} or {@link MutationMetricQueue} as they are collected at the client JVM level as opposed
+ * to the above two which are collected for every phoenix request.
+ */
+
+public enum GlobalClientMetrics {
+    
+    GLOBAL_MUTATION_BATCH_SIZE(MUTATION_BATCH_SIZE),
+    GLOBAL_MUTATION_BYTES(MUTATION_BYTES),
+    GLOBAL_MUTATION_COMMIT_TIME(MUTATION_COMMIT_TIME),
+    GLOBAL_QUERY_TIME(QUERY_TIME),
+    GLOBAL_NUM_PARALLEL_SCANS(NUM_PARALLEL_SCANS),
+    GLOBAL_SCAN_BYTES(SCAN_BYTES),
+    GLOBAL_SPOOL_FILE_SIZE(SPOOL_FILE_SIZE),
+    GLOBAL_MEMORY_CHUNK_BYTES(MEMORY_CHUNK_BYTES),
+    GLOBAL_MEMORY_WAIT_TIME(MEMORY_WAIT_TIME),
+    GLOBAL_TASK_QUEUE_WAIT_TIME(TASK_QUEUE_WAIT_TIME),
+    GLOBAL_TASK_END_TO_END_TIME(TASK_END_TO_END_TIME),
+    GLOBAL_TASK_EXECUTION_TIME(TASK_EXECUTION_TIME),
+    GLOBAL_MUTATION_SQL_COUNTER(MUTATION_SQL_COUNTER),
+    GLOBAL_SELECT_SQL_COUNTER(SELECT_SQL_COUNTER),
+    GLOBAL_TASK_EXECUTED_COUNTER(TASK_EXECUTED_COUNTER),
+    GLOBAL_REJECTED_TASK_COUNTER(TASK_REJECTED_COUNTER),
+    GLOBAL_QUERY_TIMEOUT_COUNTER(QUERY_TIMEOUT_COUNTER),
+    GLOBAL_FAILED_QUERY_COUNTER(QUERY_FAILED_COUNTER),
+    GLOBAL_SPOOL_FILE_COUNTER(SPOOL_FILE_COUNTER);
+    
+    private static final boolean isGlobalMetricsEnabled = QueryServicesOptions.withDefaults().isGlobalMetricsEnabled();
+    private GlobalMetric metric;
+
+    public void update(long value) {
+        if (isGlobalMetricsEnabled) {
+            metric.change(value);
+        }
+    }
+
+    @VisibleForTesting
+    public GlobalMetric getMetric() {
+        return metric;
+    }
+
+    @Override
+    public String toString() {
+        return metric.toString();
+    }
+
+    private GlobalClientMetrics(MetricType metricType) {
+        this.metric = new GlobalMetricImpl(metricType);
+    }
+
+    public void increment() {
+        if (isGlobalMetricsEnabled) {
+            metric.increment();
+        }
+    }
+
+    public static Collection<GlobalMetric> getMetrics() {
+        List<GlobalMetric> metrics = new ArrayList<>();
+        for (GlobalClientMetrics m : GlobalClientMetrics.values()) {
+            metrics.add(m.metric);
+        }
+        return metrics;
+    }
+
+    public static boolean isMetricsEnabled() {
+        return isGlobalMetricsEnabled;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetric.java
new file mode 100644
index 0000000..f3b562f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetric.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+/**
+ * Class that exposes the various internal phoenix metrics collected
+ * at the JVM level. Because metrics are dynamic in nature, it is not guaranteed that the
+ * state exposed will always be in sync with each other. One should use
+ * these metrics primarily for monitoring and debugging purposes. 
+ */
+public interface GlobalMetric extends Metric {
+    
+    /**
+     * @return Number of samples collected since the last {@link #reset()} call.
+     */
+    public long getNumberOfSamples();
+    
+    /**
+     * @return Sum of the values of the metric sampled since the last {@link #reset()} call.
+     */
+    public long getTotalSum();
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
new file mode 100644
index 0000000..26a16e1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class GlobalMetricImpl implements GlobalMetric {
+
+    private AtomicLong numberOfSamples = new AtomicLong(0);
+    private Metric metric;
+
+    public GlobalMetricImpl(MetricType type) {
+        this.metric = new AtomicMetric(type);
+    }
+
+    /**
+     * Reset the internal state. Typically called after metric information has been collected and a new phase of
+     * collection is being requested for the next interval.
+     */
+    @Override
+    public void reset() {
+        metric.reset();
+        numberOfSamples.set(0);
+    }
+
+    @Override
+    public long getNumberOfSamples() {
+        return numberOfSamples.get();
+    }
+
+    @Override
+    public long getTotalSum() {
+        return metric.getValue();
+    }
+
+    @Override
+    public void change(long delta) {
+        metric.change(delta);
+        numberOfSamples.incrementAndGet();
+    }
+
+    @Override
+    public void increment() {
+        metric.increment();
+        numberOfSamples.incrementAndGet();
+    }
+
+    @Override
+    public String getName() {
+        return metric.getName();
+    }
+
+    @Override
+    public String getDescription() {
+        return metric.getDescription();
+    }
+
+    @Override
+    public long getValue() {
+        return metric.getValue();
+    }
+
+    @Override
+    public String getCurrentMetricState() {
+        return metric.getCurrentMetricState() + ", Number of samples: " + numberOfSamples.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
new file mode 100644
index 0000000..0e82ce4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.MEMORY_WAIT_TIME;
+
+/**
+ * Class that encapsulates the metrics regarding memory resources needed for servicing a request.
+ */
+public class MemoryMetricsHolder {
+    private final CombinableMetric memoryChunkSizeMetric;
+    private final CombinableMetric memoryWaitTimeMetric;
+    public static final MemoryMetricsHolder NO_OP_INSTANCE = new MemoryMetricsHolder(new ReadMetricQueue(false), null);
+    
+    public MemoryMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
+        this.memoryChunkSizeMetric = readMetrics.allotMetric(MEMORY_CHUNK_BYTES, tableName);
+        this.memoryWaitTimeMetric = readMetrics.allotMetric(MEMORY_WAIT_TIME, tableName);
+    }
+
+    public CombinableMetric getMemoryChunkSizeMetric() {
+        return memoryChunkSizeMetric;
+    }
+
+    public CombinableMetric getMemoryWaitTimeMetric() {
+        return memoryWaitTimeMetric;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
index aef792c..1ad1c7a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java
@@ -18,47 +18,46 @@
 package org.apache.phoenix.monitoring;
 
 /**
- * Interface that exposes the various internal phoenix metrics collected.
- * Because metrics are dynamic in nature, it is not guaranteed that the
- * state exposed will always be in sync with each other. One should use
- * these metrics primarily for monitoring and debugging purposes. 
+ * Interface that represents phoenix-internal metric.
  */
 public interface Metric {
-    
     /**
-     * 
      * @return Name of the metric
      */
     public String getName();
-    
+
     /**
-     * 
      * @return Description of the metric
      */
     public String getDescription();
-    
+
     /**
-     * Reset the internal state. Typically called after
-     * metric information has been collected and a new
-     * phase of collection is being requested for the next
-     * interval.
+     * @return Current value of the metric
      */
-    public void reset();
-    
+    public long getValue();
+
     /**
+     * Change the metric by the specified amount
      * 
-     * @return String that represents the current state of the metric.
-     * Typically used to log the current state.
+     * @param delta
+     *            amount by which the metric value should be changed
      */
-    public String getCurrentMetricState();
-    
+    public void change(long delta);
+
+    /**
+     * Change the value of metric by 1
+     */
+    public void increment();
+
     /**
-     * @return Number of samples collected since the last {@link #reset()} call.
+     * @return String that represents the current state of the metric. Typically used for logging or reporting purposes.
      */
-    public long getNumberOfSamples();
+    public String getCurrentMetricState();
     
     /**
-     * @return Sum of the values of the metric sampled since the last {@link #reset()} call.
+     * Reset the metric
      */
-    public long getTotalSum();
+    public void reset();
+
 }
+

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
new file mode 100644
index 0000000..a0c2a4a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+public enum MetricType {
+
+    MUTATION_BATCH_SIZE("Batch sizes of mutations"),
+    MUTATION_BYTES("Size of mutations in bytes"),
+    MUTATION_COMMIT_TIME("Time it took to commit mutations"),
+    QUERY_TIME("Query times"),
+    NUM_PARALLEL_SCANS("Number of scans that were executed in parallel"),
+    SCAN_BYTES("Number of bytes read by scans"),
+    MEMORY_CHUNK_BYTES("Number of bytes allocated by the memory manager"),
+    MEMORY_WAIT_TIME("Number of milliseconds threads needed to wait for memory to be allocated through memory manager"),
+    MUTATION_SQL_COUNTER("Counter for number of mutation sql statements"),
+    SELECT_SQL_COUNTER("Counter for number of sql queries"),
+    TASK_QUEUE_WAIT_TIME("Time in milliseconds tasks had to wait in the queue of the thread pool executor"),
+    TASK_END_TO_END_TIME("Time in milliseconds spent by tasks from creation to completion"),
+    TASK_EXECUTION_TIME("Time in milliseconds tasks took to execute"),
+    TASK_EXECUTED_COUNTER("Counter for number of tasks submitted to the thread pool executor"),
+    TASK_REJECTED_COUNTER("Counter for number of tasks that were rejected by the thread pool executor"),
+    QUERY_TIMEOUT_COUNTER("Number of times query timed out"),
+    QUERY_FAILED_COUNTER("Number of times query failed"),
+    SPOOL_FILE_SIZE("Size of spool files created in bytes"),
+    SPOOL_FILE_COUNTER("Number of spool files created"),
+    CACHE_REFRESH_SPLITS_COUNTER("Number of times cache was refreshed because of splits"),
+    WALL_CLOCK_TIME_MS("Wall clock time elapsed for the overall query execution"),
+    RESULT_SET_TIME_MS("Wall clock time elapsed for reading all records using resultSet.next()");
+    
+    private final String description;
+
+    private MetricType(String description) {
+        this.description = description;
+    }
+
+    public String description() {
+        return description;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricsStopWatch.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricsStopWatch.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricsStopWatch.java
new file mode 100644
index 0000000..bffb9ad
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricsStopWatch.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import com.google.common.base.Stopwatch;
+
+/**
+ * 
+ * Stop watch that is cognizant of the fact whether or not metrics is enabled.
+ * If metrics isn't enabled it doesn't do anything. Otherwise, it delegates
+ * calls to a {@code Stopwatch}.
+ *
+ */
+final class MetricsStopWatch {
+    
+    private final boolean isMetricsEnabled;
+    private final Stopwatch stopwatch;
+    
+    MetricsStopWatch(boolean isMetricsEnabled) {
+        this.isMetricsEnabled = isMetricsEnabled;
+        this.stopwatch = new Stopwatch();
+    }
+    
+    void start()  {
+        if (isMetricsEnabled) {
+            stopwatch.start();
+        }
+    }
+    
+    void stop() {
+        if (isMetricsEnabled) {
+            if (stopwatch.isRunning()) {
+                stopwatch.stop();
+            }
+        }
+    }
+    
+    long getElapsedTimeInMs() {
+        if (isMetricsEnabled) {
+            return stopwatch.elapsedMillis();
+        }
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
new file mode 100644
index 0000000..e90da46
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Queue that tracks various writes/mutations related phoenix request metrics.
+ */
+public class MutationMetricQueue {
+    
+    // Map of table name -> mutation metric
+    private Map<String, MutationMetric> tableMutationMetric = new HashMap<>();
+    
+    public void addMetricsForTable(String tableName, MutationMetric metric) {
+        MutationMetric tableMetric = tableMutationMetric.get(tableName);
+        if (tableMetric == null) {
+            tableMutationMetric.put(tableName, metric);
+        } else {
+            tableMetric.combineMetric(metric);
+        }
+    }
+
+    public void combineMetricQueues(MutationMetricQueue other) {
+        Map<String, MutationMetric> tableMetricMap = other.tableMutationMetric;
+        for (Entry<String, MutationMetric> entry : tableMetricMap.entrySet()) {
+            addMetricsForTable(entry.getKey(), entry.getValue());
+        }
+    }
+    
+    /**
+     * Publish the metrics to wherever you want them published. The internal state is cleared out after every publish.
+     * @return map of table name -> list of pair of (metric name, metric value)
+     */
+    public Map<String, Map<String, Long>> aggregate() {
+        Map<String, Map<String, Long>> publishedMetrics = new HashMap<>();
+        for (Entry<String, MutationMetric> entry : tableMutationMetric.entrySet()) {
+            String tableName = entry.getKey();
+            MutationMetric metric = entry.getValue();
+            Map<String, Long> publishedMetricsForTable = publishedMetrics.get(tableName);
+            if (publishedMetricsForTable == null) {
+                publishedMetricsForTable = new HashMap<>();
+                publishedMetrics.put(tableName, publishedMetricsForTable);
+            }
+            publishedMetricsForTable.put(metric.getNumMutations().getName(), metric.getNumMutations().getValue());
+            publishedMetricsForTable.put(metric.getMutationsSizeBytes().getName(), metric.getMutationsSizeBytes().getValue());
+            publishedMetricsForTable.put(metric.getCommitTimeForMutations().getName(), metric.getCommitTimeForMutations().getValue());
+        }
+        return publishedMetrics;
+    }
+    
+    public void clearMetrics() {
+        tableMutationMetric.clear(); // help gc
+    }
+    
+    /**
+     * Class that holds together the various metrics associated with mutations.
+     */
+    public static class MutationMetric {
+        private final CombinableMetric numMutations = new CombinableMetricImpl(MUTATION_BATCH_SIZE);
+        private final CombinableMetric mutationsSizeBytes = new CombinableMetricImpl(MUTATION_BYTES);
+        private final CombinableMetric totalCommitTimeForMutations = new CombinableMetricImpl(MUTATION_COMMIT_TIME);
+
+        public MutationMetric(long numMutations, long mutationsSizeBytes, long commitTimeForMutations) {
+            this.numMutations.change(numMutations);
+            this.mutationsSizeBytes.change(mutationsSizeBytes);
+            this.totalCommitTimeForMutations.change(commitTimeForMutations);
+        }
+
+        public CombinableMetric getCommitTimeForMutations() {
+            return totalCommitTimeForMutations;
+        }
+
+        public CombinableMetric getNumMutations() {
+            return numMutations;
+        }
+
+        public CombinableMetric getMutationsSizeBytes() {
+            return mutationsSizeBytes;
+        }
+
+        public void combineMetric(MutationMetric other) {
+            this.numMutations.combine(other.numMutations);
+            this.mutationsSizeBytes.combine(other.mutationsSizeBytes);
+            this.totalCommitTimeForMutations.combine(other.totalCommitTimeForMutations);
+        }
+
+    }
+
+    /**
+     * Class to represent a no-op mutation metric. Used in places where request level metric tracking for mutations is not
+     * needed or desired.
+     */
+    public static class NoOpMutationMetricsQueue extends MutationMetricQueue {
+
+        public static final NoOpMutationMetricsQueue NO_OP_MUTATION_METRICS_QUEUE = new NoOpMutationMetricsQueue();
+
+        private NoOpMutationMetricsQueue() {}
+
+        @Override
+        public void addMetricsForTable(String tableName, MutationMetric metric) {}
+
+        @Override
+        public Map<String, Map<String, Long>> aggregate() { return Collections.emptyMap(); }
+        
+        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java
new file mode 100644
index 0000000..2d92116
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+/**
+ * Version of {@link Metric} that can be used when the metric isn't getting concurrently modified/accessed by multiple
+ * threads and the memory consistency effects of happen-before can be established. For example - phoenix client side
+ * metrics are modified/accessed by only one thread at a time. Further, the actions of threads in the phoenix client
+ * thread pool happen-before the actions of the thread that performs the aggregation of metrics. This makes
+ * {@link NonAtomicMetric} a good fit for storing Phoenix's client side request level metrics.
+ */
+class NonAtomicMetric implements Metric {
+
+    private final MetricType type;
+    private long value;
+
+    public NonAtomicMetric(MetricType type) {
+        this.type = type;
+    }
+
+    @Override
+    public String getName() {
+        return type.name();
+    }
+
+    @Override
+    public String getDescription() {
+        return type.description();
+    }
+
+    @Override
+    public long getValue() {
+        return value;
+    }
+
+    @Override
+    public void change(long delta) {
+        value += delta;
+    }
+
+    @Override
+    public void increment() {
+        value++;
+    }
+
+    @Override
+    public String getCurrentMetricState() {
+        return getName() + ": " + value;
+    }
+
+    @Override
+    public void reset() {
+        value = 0;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
new file mode 100644
index 0000000..1f71542
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.MetricType.CACHE_REFRESH_SPLITS_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.RESULT_SET_TIME_MS;
+import static org.apache.phoenix.monitoring.MetricType.WALL_CLOCK_TIME_MS;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric;
+
+/**
+ * Class that represents the overall metrics associated with a query being executed by the phoenix.
+ */
+public class OverAllQueryMetrics {
+    private final MetricsStopWatch queryWatch;
+    private final MetricsStopWatch resultSetWatch;
+    private final CombinableMetric numParallelScans;
+    private final CombinableMetric wallClockTimeMS;
+    private final CombinableMetric resultSetTimeMS;
+    private final CombinableMetric queryTimedOut;
+    private final CombinableMetric queryFailed;
+    private final CombinableMetric cacheRefreshedDueToSplits;
+
+    public OverAllQueryMetrics(boolean isMetricsEnabled) {
+        queryWatch = new MetricsStopWatch(isMetricsEnabled);
+        resultSetWatch = new MetricsStopWatch(isMetricsEnabled);
+        numParallelScans = isMetricsEnabled ? new CombinableMetricImpl(NUM_PARALLEL_SCANS) : NoOpRequestMetric.INSTANCE;
+        wallClockTimeMS = isMetricsEnabled ? new CombinableMetricImpl(WALL_CLOCK_TIME_MS) : NoOpRequestMetric.INSTANCE;
+        resultSetTimeMS = isMetricsEnabled ? new CombinableMetricImpl(RESULT_SET_TIME_MS) : NoOpRequestMetric.INSTANCE;
+        queryTimedOut = isMetricsEnabled ? new CombinableMetricImpl(QUERY_TIMEOUT_COUNTER) : NoOpRequestMetric.INSTANCE;
+        queryFailed = isMetricsEnabled ? new CombinableMetricImpl(QUERY_FAILED_COUNTER) : NoOpRequestMetric.INSTANCE;
+        cacheRefreshedDueToSplits = isMetricsEnabled ? new CombinableMetricImpl(CACHE_REFRESH_SPLITS_COUNTER)
+                : NoOpRequestMetric.INSTANCE;
+    }
+
+    public void updateNumParallelScans(long numParallelScans) {
+        this.numParallelScans.change(numParallelScans);
+    }
+
+    public void queryTimedOut() {
+        queryTimedOut.increment();
+    }
+
+    public void queryFailed() {
+        queryFailed.increment();
+    }
+
+    public void cacheRefreshedDueToSplits() {
+        cacheRefreshedDueToSplits.increment();
+    }
+
+    public void startQuery() {
+        queryWatch.start();
+    }
+
+    public void endQuery() {
+        queryWatch.stop();
+        wallClockTimeMS.change(queryWatch.getElapsedTimeInMs());
+    }
+
+    public void startResultSetWatch() {
+        resultSetWatch.start();
+    }
+
+    public void stopResultSetWatch() {
+        resultSetWatch.stop();
+        resultSetTimeMS.change(resultSetWatch.getElapsedTimeInMs());
+    }
+
+    public Map<String, Long> publish() {
+        Map<String, Long> metricsForPublish = new HashMap<>();
+        metricsForPublish.put(numParallelScans.getName(), numParallelScans.getValue());
+        metricsForPublish.put(wallClockTimeMS.getName(), wallClockTimeMS.getValue());
+        metricsForPublish.put(resultSetTimeMS.getName(), resultSetTimeMS.getValue());
+        metricsForPublish.put(queryTimedOut.getName(), queryTimedOut.getValue());
+        metricsForPublish.put(queryFailed.getName(), queryFailed.getValue());
+        metricsForPublish.put(cacheRefreshedDueToSplits.getName(), cacheRefreshedDueToSplits.getValue());
+        return metricsForPublish;
+    }
+
+    public void reset() {
+        numParallelScans.reset();
+        wallClockTimeMS.reset();
+        resultSetTimeMS.reset();
+        queryTimedOut.reset();
+        queryFailed.reset();
+        cacheRefreshedDueToSplits.reset();
+        queryWatch.stop();
+        resultSetWatch.stop();
+    }
+
+    public OverAllQueryMetrics combine(OverAllQueryMetrics metric) {
+        cacheRefreshedDueToSplits.combine(metric.cacheRefreshedDueToSplits);
+        queryFailed.combine(metric.queryFailed);
+        queryTimedOut.combine(metric.queryTimedOut);
+        numParallelScans.combine(metric.numParallelScans);
+        return this;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java
deleted file mode 100644
index 28e2f2e..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.monitoring;
-
-/**
- * Central place where we keep track of all the internal
- * phoenix metrics that we track.
- * 
- */
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.phoenix.query.QueryServicesOptions;
-
-public class PhoenixMetrics {
-    private static final boolean isMetricsEnabled = QueryServicesOptions.withDefaults().isMetricsEnabled();
-
-    public static boolean isMetricsEnabled() {
-        return isMetricsEnabled;
-    }
-
-    public enum SizeMetric {
-        MUTATION_BATCH_SIZE("CumulativeBatchSizesOfMutations", "Cumulative batch sizes of mutations"),
-        MUTATION_BYTES("CumulativeMutationSize", "Cumulative size of mutations in bytes"),
-        MUTATION_COMMIT_TIME("CumulativeMutationTime", "Cumulative time it took to send mutations"),
-        QUERY_TIME("QueryTime", "Cumulative query times"),
-        PARALLEL_SCANS("CumulativeNumberOfParallelScans", "Cumulative number of scans executed that were executed in parallel"),
-        SCAN_BYTES("CumulativeScanBytesSize", "Cumulative number of bytes read by scans"),
-        SPOOL_FILE_SIZE("CumulativeSpoolFilesSize", "Cumulative size of spool files created in bytes"),
-        MEMORY_MANAGER_BYTES("CumulativeBytesAllocated", "Cumulative number of bytes allocated by the memory manager"),
-        MEMORY_WAIT_TIME("CumulativeMemoryWaitTime", "Cumulative number of milliseconds threads needed to wait for memory to be allocated through memory manager"),
-        TASK_QUEUE_WAIT_TIME("CumulativeTaskQueueWaitTime", "Cumulative time in milliseconds tasks had to wait in the queue of the thread pool executor"),
-        TASK_END_TO_END_TIME("CumulativeTaskEndToEndTime", "Cumulative time in milliseconds spent by tasks from creation to completion"),
-        TASK_EXECUTION_TIME("CumulativeTaskExecutionTime", "Cumulative time in milliseconds tasks took to execute");
-
-        private final SizeStatistic metric;
-
-        private SizeMetric(String metricName, String metricDescription) {
-            metric = new SizeStatistic(metricName, metricDescription);
-        }
-
-        public void update(long value) {
-            if (isMetricsEnabled) {
-                metric.add(value);
-            }
-        }
-        
-        // exposed for testing.
-        public Metric getMetric() {
-            return metric;
-        }
-        
-        @Override
-        public String toString() {
-            return metric.toString();
-        }
-    }
-
-    public enum CountMetric {
-        MUTATION_COUNT("NumMutationCounter", "Counter for number of mutation statements"),
-        QUERY_COUNT("NumQueryCounter", "Counter for number of queries"),
-        TASK_COUNT("NumberOfTasksCounter", "Counter for number of tasks submitted to the thread pool executor"),
-        REJECTED_TASK_COUNT("RejectedTasksCounter", "Counter for number of tasks that were rejected by the thread pool executor"),
-        QUERY_TIMEOUT("QueryTimeoutCounter", "Number of times query timed out"),
-        FAILED_QUERY("QueryFailureCounter", "Number of times query failed"),
-        NUM_SPOOL_FILE("NumSpoolFilesCounter", "Number of spool files created");
-
-        private final Counter metric;
-
-        private CountMetric(String metricName, String metricDescription) {
-            metric = new Counter(metricName, metricDescription);
-        }
-
-        public void increment() {
-            if (isMetricsEnabled) {
-                metric.increment();
-            }
-        }
-        
-        // exposed for testing.
-        public Metric getMetric() {
-            return metric;
-        }
-        
-        @Override
-        public String toString() {
-            return metric.toString();
-        }
-    }
-    
-    public static Collection<Metric> getMetrics() {
-        List<Metric> metrics = new ArrayList<>();
-        for (SizeMetric s : SizeMetric.values()) {
-            metrics.add(s.metric);
-        }
-        for (CountMetric s : CountMetric.values()) {
-            metrics.add(s.metric);
-        }
-        return metrics;
-    }
-
-}    
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
new file mode 100644
index 0000000..e6c6be2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.annotation.Nonnull;
+
+import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Queue of all metrics associated with performing reads from the cluster.
+ */
+public class ReadMetricQueue {
+
+    private static final int MAX_QUEUE_SIZE = 20000; // TODO: should this be configurable?
+
+    private final ConcurrentMap<MetricKey, Queue<CombinableMetric>> metricsMap = new ConcurrentHashMap<>();
+
+    private final boolean isRequestMetricsEnabled;
+
+    public ReadMetricQueue(boolean isRequestMetricsEnabled) {
+        this.isRequestMetricsEnabled = isRequestMetricsEnabled;
+    }
+
+    public CombinableMetric allotMetric(MetricType type, String tableName) {
+        if (!isRequestMetricsEnabled) { return NoOpRequestMetric.INSTANCE; }
+        MetricKey key = new MetricKey(type, tableName);
+        Queue<CombinableMetric> q = getMetricQueue(key);
+        CombinableMetric metric = getMetric(type);
+        q.offer(metric);
+        return metric;
+    }
+
+    @VisibleForTesting
+    public CombinableMetric getMetric(MetricType type) {
+        CombinableMetric metric = new CombinableMetricImpl(type);
+        return metric;
+    }
+
+    /**
+     * @return map of table name -> list of pair of (metric name, metric value)
+     */
+    public Map<String, Map<String, Long>> aggregate() {
+        Map<String, Map<String, Long>> publishedMetrics = new HashMap<>();
+        for (Entry<MetricKey, Queue<CombinableMetric>> entry : metricsMap.entrySet()) {
+            String tableNameToPublish = entry.getKey().tableName;
+            Collection<CombinableMetric> metrics = entry.getValue();
+            if (metrics.size() > 0) {
+                CombinableMetric m = combine(metrics);
+                Map<String, Long> map = publishedMetrics.get(tableNameToPublish);
+                if (map == null) {
+                    map = new HashMap<>();
+                    publishedMetrics.put(tableNameToPublish, map);
+                }
+                map.put(m.getName(), m.getValue());
+            }
+        }
+        return publishedMetrics;
+    }
+    
+    public void clearMetrics() {
+        metricsMap.clear(); // help gc
+    }
+
+    private static CombinableMetric combine(Collection<CombinableMetric> metrics) {
+        int size = metrics.size();
+        if (size == 0) { throw new IllegalArgumentException("Metrics collection needs to have at least one element"); }
+        Iterator<CombinableMetric> itr = metrics.iterator();
+        CombinableMetric combinedMetric = itr.next();
+        while (itr.hasNext()) {
+            combinedMetric = combinedMetric.combine(itr.next());
+        }
+        return combinedMetric;
+    }
+
+    /**
+     * Combine the metrics. This method should only be called in a single threaded manner when the two metric holders
+     * are not getting modified.
+     */
+    public ReadMetricQueue combineReadMetrics(ReadMetricQueue other) {
+        ConcurrentMap<MetricKey, Queue<CombinableMetric>> otherMetricsMap = other.metricsMap;
+        for (Entry<MetricKey, Queue<CombinableMetric>> entry : otherMetricsMap.entrySet()) {
+            MetricKey key = entry.getKey();
+            Queue<CombinableMetric> otherQueue = entry.getValue();
+            CombinableMetric combinedMetric = null;
+            // combine the metrics corresponding to this metric key before putting it in the queue.
+            for (CombinableMetric m : otherQueue) {
+                if (combinedMetric == null) {
+                    combinedMetric = m;
+                } else {
+                    combinedMetric.combine(m);
+                }
+            }
+            if (combinedMetric != null) {
+                Queue<CombinableMetric> thisQueue = getMetricQueue(key);
+                thisQueue.offer(combinedMetric);
+            }
+        }
+        return this;
+    }
+
+    /**
+     * Inner class whose instances are used as keys in the metrics map.
+     */
+    private static class MetricKey {
+        @Nonnull
+        private final MetricType type;
+
+        @Nonnull
+        private final String tableName;
+
+        MetricKey(MetricType type, String tableName) {
+            checkNotNull(type);
+            checkNotNull(tableName);
+            this.type = type;
+            this.tableName = tableName;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + tableName.hashCode();
+            result = prime * result + type.hashCode();
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) return true;
+            if (obj == null) return false;
+            if (getClass() != obj.getClass()) return false;
+            MetricKey other = (MetricKey)obj;
+            if (tableName.equals(other.tableName) && type == other.type) return true;
+            return false;
+        }
+
+    }
+
+    private Queue<CombinableMetric> getMetricQueue(MetricKey key) {
+        Queue<CombinableMetric> q = metricsMap.get(key);
+        if (q == null) {
+            q = new LinkedBlockingQueue<CombinableMetric>(MAX_QUEUE_SIZE);
+            Queue<CombinableMetric> curQ = metricsMap.putIfAbsent(key, q);
+            if (curQ != null) {
+                q = curQ;
+            }
+        }
+        return q;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java
deleted file mode 100644
index 9eca754..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.monitoring;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * 
- * Statistic that keeps track of the sum of long values that 
- * could be used to represent a phoenix metric. For performance 
- * reasons the internal state in this metric is not strictly covariant
- * and hence should only be used for monitoring and debugging purposes. 
- */
-class SizeStatistic implements Metric {
-    
-    private final AtomicLong total = new AtomicLong(0);
-    private final AtomicLong numSamples = new AtomicLong(0);
-    private final String name;
-    private final String description;
-    
-    public SizeStatistic(String name, String description) {
-        this.name = name;
-        this.description = description;
-    }
-    
-    @Override
-    public String getName() {
-        return name;
-    }
-    
-    @Override
-    public String getDescription() {
-        return description;
-    }   
-
-    @Override
-    public void reset() {
-        total.set(0);
-        numSamples.set(0);
-    }
-    
-    @Override
-    public String getCurrentMetricState() {
-        return "Name:" + description + ", Total: " + total.get() + ", Number of samples: " + numSamples.get();
-    }
-
-    @Override
-    public long getNumberOfSamples() {
-        return numSamples.get();
-    }
-
-    @Override
-    public long getTotalSum() {
-        return total.get();
-    }
-    
-    public long add(long value) {
-        // there is a race condition here but what the heck.
-        numSamples.incrementAndGet();
-        return total.addAndGet(value);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
new file mode 100644
index 0000000..4373887
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+
+/**
+ * Class that encapsulates the various metrics associated with the spooling done by phoenix as part of servicing a
+ * request.
+ */
+public class SpoolingMetricsHolder {
+
+    private final CombinableMetric spoolFileSizeMetric;
+    private final CombinableMetric numSpoolFileMetric;
+    public static final SpoolingMetricsHolder NO_OP_INSTANCE = new SpoolingMetricsHolder(new ReadMetricQueue(false), "");
+
+    public SpoolingMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
+        this.spoolFileSizeMetric = readMetrics.allotMetric(MetricType.SPOOL_FILE_SIZE, tableName);
+        this.numSpoolFileMetric = readMetrics.allotMetric(MetricType.SPOOL_FILE_COUNTER, tableName);
+    }
+
+    public CombinableMetric getSpoolFileSizeMetric() {
+        return spoolFileSizeMetric;
+    }
+
+    public CombinableMetric getNumSpoolFileMetric() {
+        return numSpoolFileMetric;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
new file mode 100644
index 0000000..98ff57c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_QUEUE_WAIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.TASK_REJECTED_COUNTER;
+
+
+/**
+ * Class to encapsulate the various metrics associated with submitting and executing a task to the phoenix client
+ * thread pool.
+ */
+public class TaskExecutionMetricsHolder {
+
+    private final CombinableMetric taskQueueWaitTime;
+    private final CombinableMetric taskEndToEndTime;
+    private final CombinableMetric taskExecutionTime;
+    private final CombinableMetric numTasks;
+    private final CombinableMetric numRejectedTasks;
+    public static final TaskExecutionMetricsHolder NO_OP_INSTANCE = new TaskExecutionMetricsHolder(new ReadMetricQueue(false), "");
+    
+    public TaskExecutionMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
+        taskQueueWaitTime = readMetrics.allotMetric(TASK_QUEUE_WAIT_TIME, tableName);
+        taskEndToEndTime = readMetrics.allotMetric(TASK_END_TO_END_TIME, tableName);
+        taskExecutionTime = readMetrics.allotMetric(TASK_EXECUTION_TIME, tableName);
+        numTasks = readMetrics.allotMetric(TASK_EXECUTED_COUNTER, tableName);
+        numRejectedTasks = readMetrics.allotMetric(TASK_REJECTED_COUNTER, tableName);
+    }
+
+    public CombinableMetric getTaskQueueWaitTime() {
+        return taskQueueWaitTime;
+    }
+
+    public CombinableMetric getTaskEndToEndTime() {
+        return taskEndToEndTime;
+    }
+
+    public CombinableMetric getTaskExecutionTime() {
+        return taskExecutionTime;
+    }
+
+    public CombinableMetric getNumTasks() {
+        return numTasks;
+    }
+
+    public CombinableMetric getNumRejectedTasks() {
+        return numRejectedTasks;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
index 898a919..c16b86d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
@@ -45,7 +45,7 @@ public abstract class BaseQueryServicesImpl implements QueryServices {
                 options.getKeepAliveMs(), 
                 options.getThreadPoolSize(), 
                 options.getQueueSize(),
-                options.isMetricsEnabled());
+                options.isGlobalMetricsEnabled());
         this.memoryManager = new GlobalMemoryManager(
                 Runtime.getRuntime().maxMemory() * options.getMaxMemoryPerc() / 100,
                 options.getMaxMemoryWaitMs());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 3e7d084..825cc83 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -157,7 +157,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String DELAY_FOR_SCHEMA_UPDATE_CHECK = "phoenix.schema.change.delay";
     public static final String DEFAULT_KEEP_DELETED_CELLS_ATTRIB = "phoenix.table.default.keep.deleted.cells";
     public static final String DEFAULT_STORE_NULLS_ATTRIB = "phoenix.table.default.store.nulls";
-    public static final String METRICS_ENABLED = "phoenix.query.metrics.enabled";
+    public static final String GLOBAL_METRICS_ENABLED = "phoenix.query.global.metrics.enabled";
     
     // rpc queue configs
     public static final String INDEX_HANDLER_COUNT_ATTRIB = "phoenix.rpc.index.handler.count";
@@ -165,6 +165,7 @@ public interface QueryServices extends SQLCloseable {
     
     public static final String FORCE_ROW_KEY_ORDER_ATTRIB = "phoenix.query.force.rowkeyorder";
     public static final String ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB = "phoenix.functions.allowUserDefinedFunctions";
+    public static final String COLLECT_REQUEST_LEVEL_METRICS = "phoenix.query.request.metrics.enabled";
     
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 02c695e..4e8879b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -18,15 +18,16 @@
 package org.apache.phoenix.query;
 
 import static org.apache.phoenix.query.QueryServices.ALLOW_ONLINE_TABLE_SCHEMA_UPDATE;
-import static org.apache.phoenix.query.QueryServices.ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME;
 import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS;
 import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK;
 import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.GLOBAL_METRICS_ENABLED;
 import static org.apache.phoenix.query.QueryServices.GROUPBY_MAX_CACHE_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB;
@@ -43,7 +44,6 @@ import static org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_TIME_TO_LI
 import static org.apache.phoenix.query.QueryServices.MAX_SERVER_METADATA_CACHE_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.METRICS_ENABLED;
 import static org.apache.phoenix.query.QueryServices.MIN_STATS_UPDATE_FREQ_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK;
@@ -187,13 +187,14 @@ public class QueryServicesOptions {
 
     // TODO Change this to true as part of PHOENIX-1543
     public static final boolean DEFAULT_AUTO_COMMIT = false;
-    public static final boolean DEFAULT_IS_METRICS_ENABLED = true;
+    public static final boolean DEFAULT_IS_GLOBAL_METRICS_ENABLED = true;
     
     private static final String DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY = ClientRpcControllerFactory.class.getName();
     
     public static final boolean DEFAULT_USE_BYTE_BASED_REGEX = false;
     public static final boolean DEFAULT_FORCE_ROW_KEY_ORDER = false;
     public static final boolean DEFAULT_ALLOW_USER_DEFINED_FUNCTIONS = false;
+    public static final boolean DEFAULT_REQUEST_LEVEL_METRICS_ENABLED = false;
 
     private final Configuration config;
 
@@ -246,10 +247,11 @@ public class QueryServicesOptions {
             .setIfUnset(ALLOW_ONLINE_TABLE_SCHEMA_UPDATE, DEFAULT_ALLOW_ONLINE_TABLE_SCHEMA_UPDATE)
             .setIfUnset(NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_RETRIES_FOR_SCHEMA_UPDATE_CHECK)
             .setIfUnset(DELAY_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_DELAY_FOR_SCHEMA_UPDATE_CHECK)
-            .setIfUnset(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED)
+            .setIfUnset(GLOBAL_METRICS_ENABLED, DEFAULT_IS_GLOBAL_METRICS_ENABLED)
             .setIfUnset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY)
             .setIfUnset(USE_BYTE_BASED_REGEX_ATTRIB, DEFAULT_USE_BYTE_BASED_REGEX)
-            .setIfUnset(FORCE_ROW_KEY_ORDER_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER);
+            .setIfUnset(FORCE_ROW_KEY_ORDER_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER)
+            .setIfUnset(COLLECT_REQUEST_LEVEL_METRICS, DEFAULT_REQUEST_LEVEL_METRICS_ENABLED)
             ;
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user set
@@ -445,10 +447,10 @@ public class QueryServicesOptions {
         return config.getInt(GROUPBY_SPILL_FILES_ATTRIB, DEFAULT_GROUPBY_SPILL_FILES);
     }
     
-    public boolean isMetricsEnabled() {
-        return config.getBoolean(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED);
+    public boolean isGlobalMetricsEnabled() {
+        return config.getBoolean(GLOBAL_METRICS_ENABLED, DEFAULT_IS_GLOBAL_METRICS_ENABLED);
     }
-    
+
     public boolean isUseByteBasedRegex() {
         return config.getBoolean(USE_BYTE_BASED_REGEX_ATTRIB, DEFAULT_USE_BYTE_BASED_REGEX);
     }
@@ -526,11 +528,7 @@ public class QueryServicesOptions {
         return this;
     
     }
-    public QueryServicesOptions setMetricsEnabled(boolean flag) {
-        config.setBoolean(METRICS_ENABLED, flag);
-        return this;
-    }
-
+    
     public QueryServicesOptions setUseByteBasedRegex(boolean flag) {
         config.setBoolean(USE_BYTE_BASED_REGEX_ATTRIB, flag);
         return this;
@@ -540,4 +538,5 @@ public class QueryServicesOptions {
         config.setBoolean(FORCE_ROW_KEY_ORDER_ATTRIB, forceRowKeyOrder);
         return this;
     }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
index 265fc78..159e0c9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java
@@ -17,12 +17,23 @@
  */
 package org.apache.phoenix.trace;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
-import org.apache.commons.configuration.Configuration;
+import static org.apache.phoenix.metrics.MetricInfo.ANNOTATION;
+import static org.apache.phoenix.metrics.MetricInfo.DESCRIPTION;
+import static org.apache.phoenix.metrics.MetricInfo.END;
+import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME;
+import static org.apache.phoenix.metrics.MetricInfo.PARENT;
+import static org.apache.phoenix.metrics.MetricInfo.SPAN;
+import static org.apache.phoenix.metrics.MetricInfo.START;
+import static org.apache.phoenix.metrics.MetricInfo.TAG;
+import static org.apache.phoenix.metrics.MetricInfo.TRACE;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
 import org.apache.commons.configuration.SubsetConfiguration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,20 +42,15 @@ import org.apache.hadoop.metrics2.AbstractMetric;
 import org.apache.hadoop.metrics2.MetricsRecord;
 import org.apache.hadoop.metrics2.MetricsSink;
 import org.apache.hadoop.metrics2.MetricsTag;
-import org.apache.phoenix.metrics.*;
+import org.apache.phoenix.metrics.MetricInfo;
+import org.apache.phoenix.metrics.Metrics;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.QueryUtil;
 
-import javax.annotation.Nullable;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.*;
-
-import static org.apache.phoenix.metrics.MetricInfo.*;
-import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 
 /**
  * Write the metrics to a phoenix table.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e29d57b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
index 06534d1..cbe5a1a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
@@ -154,4 +154,9 @@ public class JDBCUtil {
         }
         return Boolean.valueOf(autoCommit);
     }
+    
+    public static boolean isCollectingRequestLevelMetricsEnabled(String url, Properties overrideProps, ReadOnlyProps queryServicesProps) throws SQLException {
+        String batchSizeStr = findProperty(url, overrideProps, PhoenixRuntime.REQUEST_METRIC_ATTRIB);
+        return (batchSizeStr == null ? queryServicesProps.getBoolean(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, QueryServicesOptions.DEFAULT_REQUEST_LEVEL_METRICS_ENABLED) : Boolean.parseBoolean(batchSizeStr));
+    }
 }