You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2018/05/14 20:33:30 UTC

[1/2] phoenix git commit: PHOENIX-4701 Write client-side metrics asynchronously to SYSTEM.LOG

Repository: phoenix
Updated Branches:
  refs/heads/master 1966edb19 -> 7afaceb7e


http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
index 3de2be1..1256f5c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
@@ -27,6 +27,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.phoenix.log.LogLevel;
+
 /**
  * Queue that tracks various writes/mutations related phoenix request metrics.
  */
@@ -81,12 +83,16 @@ public class MutationMetricQueue {
      * Class that holds together the various metrics associated with mutations.
      */
     public static class MutationMetric {
-        private final CombinableMetric numMutations = new CombinableMetricImpl(MUTATION_BATCH_SIZE);
-        private final CombinableMetric mutationsSizeBytes = new CombinableMetricImpl(MUTATION_BYTES);
-        private final CombinableMetric totalCommitTimeForMutations = new CombinableMetricImpl(MUTATION_COMMIT_TIME);
-        private final CombinableMetric numFailedMutations = new CombinableMetricImpl(MUTATION_BATCH_FAILED_SIZE);
-
-        public MutationMetric(long numMutations, long mutationsSizeBytes, long commitTimeForMutations, long numFailedMutations) {
+        private final CombinableMetric numMutations;;
+        private final CombinableMetric mutationsSizeBytes;
+        private final CombinableMetric totalCommitTimeForMutations;
+        private final CombinableMetric numFailedMutations;
+
+        public MutationMetric(LogLevel connectionLogLevel, long numMutations, long mutationsSizeBytes, long commitTimeForMutations, long numFailedMutations) {
+            this.numMutations = MetricUtil.getCombinableMetric(connectionLogLevel,MUTATION_BATCH_SIZE);
+            this.mutationsSizeBytes =MetricUtil.getCombinableMetric(connectionLogLevel,MUTATION_BYTES);
+            this.totalCommitTimeForMutations =MetricUtil.getCombinableMetric(connectionLogLevel,MUTATION_COMMIT_TIME);
+            this.numFailedMutations = MetricUtil.getCombinableMetric(connectionLogLevel,MUTATION_BATCH_FAILED_SIZE);
             this.numMutations.change(numMutations);
             this.mutationsSizeBytes.change(mutationsSizeBytes);
             this.totalCommitTimeForMutations.change(commitTimeForMutations);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
index b995267..3121ecd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
@@ -27,7 +27,7 @@ import static org.apache.phoenix.monitoring.MetricType.WALL_CLOCK_TIME_MS;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric;
+import org.apache.phoenix.log.LogLevel;
 
 /**
  * Class that represents the overall metrics associated with a query being executed by the phoenix.
@@ -42,16 +42,15 @@ public class OverAllQueryMetrics {
     private final CombinableMetric queryFailed;
     private final CombinableMetric cacheRefreshedDueToSplits;
 
-    public OverAllQueryMetrics(boolean isMetricsEnabled) {
-        queryWatch = new MetricsStopWatch(isMetricsEnabled);
-        resultSetWatch = new MetricsStopWatch(isMetricsEnabled);
-        numParallelScans = isMetricsEnabled ? new CombinableMetricImpl(NUM_PARALLEL_SCANS) : NoOpRequestMetric.INSTANCE;
-        wallClockTimeMS = isMetricsEnabled ? new CombinableMetricImpl(WALL_CLOCK_TIME_MS) : NoOpRequestMetric.INSTANCE;
-        resultSetTimeMS = isMetricsEnabled ? new CombinableMetricImpl(RESULT_SET_TIME_MS) : NoOpRequestMetric.INSTANCE;
-        queryTimedOut = isMetricsEnabled ? new CombinableMetricImpl(QUERY_TIMEOUT_COUNTER) : NoOpRequestMetric.INSTANCE;
-        queryFailed = isMetricsEnabled ? new CombinableMetricImpl(QUERY_FAILED_COUNTER) : NoOpRequestMetric.INSTANCE;
-        cacheRefreshedDueToSplits = isMetricsEnabled ? new CombinableMetricImpl(CACHE_REFRESH_SPLITS_COUNTER)
-                : NoOpRequestMetric.INSTANCE;
+    public OverAllQueryMetrics(LogLevel connectionLogLevel) {
+        queryWatch = new MetricsStopWatch(WALL_CLOCK_TIME_MS.isLoggingEnabled(connectionLogLevel));
+        resultSetWatch = new MetricsStopWatch(RESULT_SET_TIME_MS.isLoggingEnabled(connectionLogLevel));
+        numParallelScans = MetricUtil.getCombinableMetric(connectionLogLevel, NUM_PARALLEL_SCANS);
+        wallClockTimeMS = MetricUtil.getCombinableMetric(connectionLogLevel, WALL_CLOCK_TIME_MS);
+        resultSetTimeMS = MetricUtil.getCombinableMetric(connectionLogLevel, RESULT_SET_TIME_MS);
+        queryTimedOut = MetricUtil.getCombinableMetric(connectionLogLevel, QUERY_TIMEOUT_COUNTER);
+        queryFailed = MetricUtil.getCombinableMetric(connectionLogLevel, QUERY_FAILED_COUNTER);
+        cacheRefreshedDueToSplits = MetricUtil.getCombinableMetric(connectionLogLevel, CACHE_REFRESH_SPLITS_COUNTER);
     }
 
     public void updateNumParallelScans(long numParallelScans) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
index c008635..fa81e9c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
@@ -25,6 +25,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.annotation.Nonnull;
 
+import org.apache.phoenix.log.LogLevel;
 import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -40,19 +41,23 @@ public class ReadMetricQueue {
 
     private final List<ScanMetricsHolder> scanMetricsHolderList = new ArrayList<ScanMetricsHolder>();
 
-    private final boolean isRequestMetricsEnabled;
 
-    public ReadMetricQueue(boolean isRequestMetricsEnabled) {
-        this.isRequestMetricsEnabled = isRequestMetricsEnabled;
+    private LogLevel connectionLogLevel;
+
+    public ReadMetricQueue(LogLevel connectionLogLevel) {
+        this.connectionLogLevel = connectionLogLevel;
     }
 
     public CombinableMetric allotMetric(MetricType type, String tableName) {
-        if (!isRequestMetricsEnabled) { return NoOpRequestMetric.INSTANCE; }
-        MetricKey key = new MetricKey(type, tableName);
-        Queue<CombinableMetric> q = getMetricQueue(key);
-        CombinableMetric metric = getMetric(type);
-        q.offer(metric);
-        return metric;
+        if (type.isLoggingEnabled(connectionLogLevel)) {
+            MetricKey key = new MetricKey(type, tableName);
+            Queue<CombinableMetric> q = getMetricQueue(key);
+            CombinableMetric metric = getMetric(type);
+            q.offer(metric);
+            return metric;
+        } else {
+            return NoOpRequestMetric.INSTANCE;
+        }
     }
 
     @VisibleForTesting
@@ -173,10 +178,6 @@ public class ReadMetricQueue {
         return q;
     }
 
-    public boolean isRequestMetricsEnabled() {
-        return isRequestMetricsEnabled;
-    }
-    
     public void addScanHolder(ScanMetricsHolder holder){
         scanMetricsHolderList.add(holder);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
index 9125cd8..494b3e6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
@@ -34,6 +34,7 @@ import java.util.Map;
 
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.JsonMapper;
+import org.apache.phoenix.log.LogLevel;
 
 public class ScanMetricsHolder {
 
@@ -52,13 +53,11 @@ public class ScanMetricsHolder {
     private Object scan;
 
     private static final ScanMetricsHolder NO_OP_INSTANCE =
-            new ScanMetricsHolder(new ReadMetricQueue(false), "",null);
+            new ScanMetricsHolder(new ReadMetricQueue(LogLevel.OFF), "",null);
 
     public static ScanMetricsHolder getInstance(ReadMetricQueue readMetrics, String tableName,
-            Scan scan, boolean isRequestMetricsEnabled) {
-        if (!isRequestMetricsEnabled) {
-            return NO_OP_INSTANCE;
-        }
+            Scan scan, LogLevel connectionLogLevel) {
+        if (connectionLogLevel == LogLevel.OFF) { return NO_OP_INSTANCE; }
         scan.setScanMetricsEnabled(true);
         return new ScanMetricsHolder(readMetrics, tableName, scan);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
index 4373887..699982f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.monitoring;
 
+import org.apache.phoenix.log.LogLevel;
 
 /**
  * Class that encapsulates the various metrics associated with the spooling done by phoenix as part of servicing a
@@ -26,7 +27,7 @@ public class SpoolingMetricsHolder {
 
     private final CombinableMetric spoolFileSizeMetric;
     private final CombinableMetric numSpoolFileMetric;
-    public static final SpoolingMetricsHolder NO_OP_INSTANCE = new SpoolingMetricsHolder(new ReadMetricQueue(false), "");
+    public static final SpoolingMetricsHolder NO_OP_INSTANCE = new SpoolingMetricsHolder(new ReadMetricQueue(LogLevel.OFF), "");
 
     public SpoolingMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
         this.spoolFileSizeMetric = readMetrics.allotMetric(MetricType.SPOOL_FILE_SIZE, tableName);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
index 98ff57c..6117b40 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
@@ -23,6 +23,8 @@ import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
 import static org.apache.phoenix.monitoring.MetricType.TASK_QUEUE_WAIT_TIME;
 import static org.apache.phoenix.monitoring.MetricType.TASK_REJECTED_COUNTER;
 
+import org.apache.phoenix.log.LogLevel;
+
 
 /**
  * Class to encapsulate the various metrics associated with submitting and executing a task to the phoenix client
@@ -35,7 +37,7 @@ public class TaskExecutionMetricsHolder {
     private final CombinableMetric taskExecutionTime;
     private final CombinableMetric numTasks;
     private final CombinableMetric numRejectedTasks;
-    public static final TaskExecutionMetricsHolder NO_OP_INSTANCE = new TaskExecutionMetricsHolder(new ReadMetricQueue(false), "");
+    public static final TaskExecutionMetricsHolder NO_OP_INSTANCE = new TaskExecutionMetricsHolder(new ReadMetricQueue(LogLevel.OFF), "");
     
     public TaskExecutionMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
         taskQueueWaitTime = readMetrics.allotMetric(TASK_QUEUE_WAIT_TIME, tableName);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 1d3feed..8b7b708 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -2471,7 +2471,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
     // Available for testing
     protected String getLogTableDDL() {
-        return QueryConstants.CREATE_LOG_METADATA;
+        return setSystemLogDDLProperties(QueryConstants.CREATE_LOG_METADATA);
+    }
+
+    private String setSystemLogDDLProperties(String ddl) {
+        return String.format(ddl, props.getInt(LOG_SALT_BUCKETS_ATTRIB, QueryServicesOptions.DEFAULT_LOG_SALT_BUCKETS));
+
     }
 
     private String setSystemDDLProperties(String ddl) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 3531a87..a7fad8e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -164,7 +164,12 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
     }
 
     protected String getLogTableDDL() {
-        return QueryConstants.CREATE_LOG_METADATA;
+        return setSystemLogDDLProperties(QueryConstants.CREATE_LOG_METADATA);
+    }
+    
+    private String setSystemLogDDLProperties(String ddl) {
+        return String.format(ddl, props.getInt(LOG_SALT_BUCKETS_ATTRIB, QueryServicesOptions.DEFAULT_LOG_SALT_BUCKETS));
+
     }
 
     private String setSystemDDLProperties(String ddl) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 65806ae..52abed0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -18,112 +18,7 @@
 package org.apache.phoenix.query;
 
 
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARG_POSITION;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BASE_COLUMN_COUNT;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BIND_PARAMETERS;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BUFFER_LENGTH;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CACHE_SIZE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHAR_OCTET_LENGTH;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLIENT_IP;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CURRENT_VALUE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXCEPTION_TRACE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXPLAIN_PLAN;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GLOBAL_SCAN_DETAILS;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POST_KEY;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_AUTOINCREMENT;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NULLABLE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NO_OF_RESULTS_ITERATED;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_PREC_RADIX;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_ID;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_STATUS;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REF_GENERATION;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REMARKS;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCAN_METRICS_JSON;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCOPE_CATALOG;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCOPE_SCHEMA;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCOPE_TABLE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SELF_REFERENCING_COL_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SEQUENCE_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SEQUENCE_SCHEMA;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SOURCE_DATA_TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SQL_DATA_TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SQL_DATETIME_SUB;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TOTAL_EXECUTION_TIME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTION_PROVIDER;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_SEQUENCE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USE_STATS_FOR_PARALLELIZATION;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.*;
 
 import java.math.BigDecimal;
 
@@ -134,9 +29,11 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.monitoring.MetricType;
 import org.apache.phoenix.schema.MetaDataSplitPolicy;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.SystemFunctionSplitPolicy;
@@ -412,28 +309,31 @@ public interface QueryConstants {
             PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
     
     public static final String CREATE_LOG_METADATA =
-            "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"(\n" +
+            "CREATE IMMUTABLE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"(\n" +
              // Pk columns
+             START_TIME + " DECIMAL, \n" +
+             TABLE_NAME + " VARCHAR, \n" +
+             QUERY_ID + " VARCHAR NOT NULL,\n" +
             TENANT_ID + " VARCHAR ," +
-            QUERY_ID + " VARCHAR NOT NULL,\n" +
             USER + " VARCHAR , \n" +
             CLIENT_IP + " VARCHAR, \n" +
             // Function metadata (will be null for argument row)
             QUERY +  " VARCHAR, \n" +
             EXPLAIN_PLAN + " VARCHAR, \n" +
             // Argument metadata (will be null for function row)
-            START_TIME + " TIMESTAMP, \n" +
-            TOTAL_EXECUTION_TIME + " BIGINT, \n" +
             NO_OF_RESULTS_ITERATED + " BIGINT, \n" +
             QUERY_STATUS + " VARCHAR, \n" +
             EXCEPTION_TRACE + " VARCHAR, \n" +
             GLOBAL_SCAN_DETAILS + " VARCHAR, \n" +
             BIND_PARAMETERS + " VARCHAR, \n" +
             SCAN_METRICS_JSON + " VARCHAR, \n" +
-            " CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (QUERY_ID))\n" +
+            MetricType.getMetricColumnsDetails()+"\n"+
+            " CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (START_TIME, TABLE_NAME, QUERY_ID))\n" +
+            PhoenixDatabaseMetaData.SALT_BUCKETS + "=%s,\n"+
             PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE+ ",\n" +
             HColumnDescriptor.TTL + "=" + MetaDataProtocol.DEFAULT_LOG_TTL+",\n"+
-            PhoenixDatabaseMetaData.COLUMN_ENCODED_BYTES +" = 0";
+            TableProperty.IMMUTABLE_STORAGE_SCHEME.toString() + " = " + ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS.name() + ",\n" +
+            TableProperty.COLUMN_ENCODED_BYTES.toString()+" = 1";
     
     public static final byte[] OFFSET_FAMILY = "f_offset".getBytes();
     public static final byte[] OFFSET_COLUMN = "c_offset".getBytes();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index db0b10b..559d165 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -197,6 +197,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String COMMIT_STATS_ASYNC = "phoenix.stats.commit.async";
     // Maximum size in bytes taken up by cached table stats in the client
     public static final String STATS_MAX_CACHE_SIZE = "phoenix.stats.cache.maxSize";
+    public static final String LOG_SALT_BUCKETS_ATTRIB = "phoenix.log.saltBuckets";
 
     public static final String SEQUENCE_SALT_BUCKETS_ATTRIB = "phoenix.sequence.saltBuckets";
     public static final String COPROCESSOR_PRIORITY_ATTRIB = "phoenix.coprocessor.priority";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 0e6e89f..d708785 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -356,6 +356,7 @@ public class QueryServicesOptions {
     public static final boolean DEFAULT_COST_BASED_OPTIMIZER_ENABLED = false;
     public static final String DEFAULT_LOGGING_LEVEL = LogLevel.OFF.name();
     public static final String DEFAULT_LOG_SAMPLE_RATE = "1.0";
+    public static final int DEFAULT_LOG_SALT_BUCKETS = 32;
 
     private final Configuration config;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 20ac732..8f14680 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -2286,7 +2286,7 @@ public class MetaDataClient {
                 }
             }
             // System tables have hard-coded column qualifiers. So we can't use column encoding for them.
-            else if (!SchemaUtil.isSystemTable(Bytes.toBytes(SchemaUtil.getTableName(schemaName, tableName)))) {
+            else if (!SchemaUtil.isSystemTable(Bytes.toBytes(SchemaUtil.getTableName(schemaName, tableName)))|| SchemaUtil.isLogTable(schemaName, tableName)) {
                 /*
                  * Indexes inherit the storage scheme of the parent data tables. Otherwise, we always attempt to 
                  * create tables with encoded column names. 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index 9d2e53c..94cbfea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -436,7 +436,7 @@ public final class QueryUtil {
         }
         return url;
     }
-
+    
     private static int getInt(String key, int defaultValue, Properties props, Configuration conf) {
         if (conf == null) {
             Preconditions.checkNotNull(props);
@@ -487,5 +487,10 @@ public final class QueryUtil {
     public static String getViewPartitionClause(String partitionColumnName, long autoPartitionNum) {
         return partitionColumnName  + " " + toSQL(CompareOp.EQUAL) + " " + autoPartitionNum;
     }
+
+    public static Connection getConnectionForQueryLog(Configuration config) throws ClassNotFoundException, SQLException {
+        //we don't need this connection to upgrade anything or start dispatcher
+        return getConnectionOnServer(config);
+    }
     
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 92a2cde..dd00a69 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -1170,6 +1170,10 @@ public class SchemaUtil {
         Cell isNamespaceMappedCell = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, IS_NAMESPACE_MAPPED_BYTES);
         return isNamespaceMappedCell!=null && (boolean) PBoolean.INSTANCE.toObject(isNamespaceMappedCell.getValue());
     }
+
+    public static boolean isLogTable(String schemaName, String tableName) {
+        return PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA.equals(schemaName) && PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE.equals(tableName);
+    }
     
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
index e0a731d..d858e72 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
@@ -24,10 +24,12 @@ import java.util.Arrays;
 
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.log.LogLevel;
 import org.apache.phoenix.memory.DelegatingMemoryManager;
 import org.apache.phoenix.memory.GlobalMemoryManager;
 import org.apache.phoenix.memory.MemoryManager;
 import org.apache.phoenix.monitoring.MemoryMetricsHolder;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
 import org.apache.phoenix.monitoring.SpoolingMetricsHolder;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
@@ -54,7 +56,10 @@ public class SpoolingResultIteratorTest {
             };
 
         MemoryManager memoryManager = new DelegatingMemoryManager(new GlobalMemoryManager(threshold));
-        ResultIterator scanner = new SpoolingResultIterator(SpoolingMetricsHolder.NO_OP_INSTANCE, MemoryMetricsHolder.NO_OP_INSTANCE, iterator, memoryManager, threshold, maxSizeSpool,"/tmp");
+        ResultIterator scanner = new SpoolingResultIterator(
+                SpoolingMetricsHolder.NO_OP_INSTANCE,
+                new MemoryMetricsHolder(new ReadMetricQueue(LogLevel.OFF), ""), iterator, memoryManager, threshold,
+                maxSizeSpool, "/tmp");
         AssertResults.assertResults(scanner, expectedResults);
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
index c10c4d1..2ff8aca 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
@@ -112,11 +112,10 @@ public class PhoenixRecordReader<T extends DBWritable> implements
             String tableName = queryPlan.getTableRef().getTable().getPhysicalName().getString();
             long renewScannerLeaseThreshold = queryPlan.getContext().getConnection()
                     .getQueryServices().getRenewLeaseThresholdMilliSeconds();
-            boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled();
             for (Scan scan : scans) {
                 scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes
                         .toBytes(true));
-                ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, tableName, scan, isRequestMetricsEnabled);
+                ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, tableName, scan, ctx.getConnection().getLogLevel());
                 final TableResultIterator tableResultIterator = new TableResultIterator(
                         queryPlan.getContext().getConnection().getMutationState(), scan, scanMetricsHolder,
                         renewScannerLeaseThreshold, queryPlan, MapReduceParallelScanGrouper.getInstance());


[2/2] phoenix git commit: PHOENIX-4701 Write client-side metrics asynchronously to SYSTEM.LOG

Posted by an...@apache.org.
PHOENIX-4701 Write client-side metrics asynchronously to SYSTEM.LOG


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7afaceb7
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7afaceb7
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7afaceb7

Branch: refs/heads/master
Commit: 7afaceb7e7355e59ae9465a02b812b230fc58edd
Parents: 1966edb
Author: Ankit Singhal <an...@gmail.com>
Authored: Mon May 14 13:33:22 2018 -0700
Committer: Ankit Singhal <an...@gmail.com>
Committed: Mon May 14 13:33:22 2018 -0700

----------------------------------------------------------------------
 bin/hbase-site.xml                              |   4 +
 .../apache/phoenix/end2end/QueryLoggerIT.java   |  76 ++++++----
 .../phoenix/monitoring/PhoenixMetricsIT.java    |  28 ++--
 .../phoenix/compile/StatementContext.java       |   4 +-
 .../apache/phoenix/execute/MutationState.java   |   4 +-
 .../phoenix/iterate/ChunkedResultIterator.java  |   2 +-
 .../phoenix/iterate/ParallelIterators.java      |   3 +-
 .../apache/phoenix/iterate/SerialIterators.java |   3 +-
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  10 +-
 .../apache/phoenix/jdbc/PhoenixResultSet.java   |  54 ++++---
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  21 ++-
 .../java/org/apache/phoenix/log/LogLevel.java   |   2 +-
 .../java/org/apache/phoenix/log/LogWriter.java  |   6 +-
 .../org/apache/phoenix/log/QueryLogInfo.java    |  38 +++--
 .../org/apache/phoenix/log/QueryLogState.java   |  22 ---
 .../org/apache/phoenix/log/QueryLogger.java     |  74 +++++++---
 .../org/apache/phoenix/log/QueryLoggerUtil.java |  62 ++++----
 .../org/apache/phoenix/log/QueryStatus.java     |  22 +++
 .../org/apache/phoenix/log/RingBufferEvent.java |  38 +++--
 .../phoenix/log/RingBufferEventTranslator.java  |  21 ++-
 .../org/apache/phoenix/log/TableLogWriter.java  | 144 +++++++++++--------
 .../phoenix/mapreduce/PhoenixRecordReader.java  |  13 +-
 .../phoenix/monitoring/MemoryMetricsHolder.java |   1 -
 .../apache/phoenix/monitoring/MetricType.java   | 123 ++++++++++------
 .../apache/phoenix/monitoring/MetricUtil.java   |  30 ++++
 .../phoenix/monitoring/MutationMetricQueue.java |  18 ++-
 .../phoenix/monitoring/OverAllQueryMetrics.java |  21 ++-
 .../phoenix/monitoring/ReadMetricQueue.java     |  27 ++--
 .../phoenix/monitoring/ScanMetricsHolder.java   |   9 +-
 .../monitoring/SpoolingMetricsHolder.java       |   3 +-
 .../monitoring/TaskExecutionMetricsHolder.java  |   4 +-
 .../query/ConnectionQueryServicesImpl.java      |   7 +-
 .../query/ConnectionlessQueryServicesImpl.java  |   7 +-
 .../apache/phoenix/query/QueryConstants.java    | 124 ++--------------
 .../org/apache/phoenix/query/QueryServices.java |   1 +
 .../phoenix/query/QueryServicesOptions.java     |   1 +
 .../apache/phoenix/schema/MetaDataClient.java   |   2 +-
 .../java/org/apache/phoenix/util/QueryUtil.java |   7 +-
 .../org/apache/phoenix/util/SchemaUtil.java     |   4 +
 .../iterate/SpoolingResultIteratorTest.java     |   7 +-
 .../hive/mapreduce/PhoenixRecordReader.java     |   3 +-
 41 files changed, 570 insertions(+), 480 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/bin/hbase-site.xml
----------------------------------------------------------------------
diff --git a/bin/hbase-site.xml b/bin/hbase-site.xml
index 0ab9fd8..2f360e2 100644
--- a/bin/hbase-site.xml
+++ b/bin/hbase-site.xml
@@ -24,4 +24,8 @@
     <name>hbase.regionserver.wal.codec</name>
     <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
   </property>
+  <property>
+    <name>phoenix.log.level</name>
+    <value>DEBUG</value>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java
index 940ba6f..618d7d9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java
@@ -31,7 +31,6 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TOTAL_EXECUTION_TIME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -53,8 +52,10 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.log.LogLevel;
-import org.apache.phoenix.log.QueryLogState;
+import org.apache.phoenix.log.QueryStatus;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.EnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.BeforeClass;
@@ -77,6 +78,19 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
         DriverManager.registerDriver(PhoenixDriver.INSTANCE);
     } 
     
+    private static class MyClock extends EnvironmentEdge {
+        public volatile long time;
+
+        public MyClock (long time) {
+            this.time = time;
+        }
+
+        @Override
+        public long currentTime() {
+            return time;
+        }
+    }
+    
 
     @Test
     public void testDebugLogs() throws Exception {
@@ -97,12 +111,13 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
         ResultSet explainRS = conn.createStatement().executeQuery("Explain " + query);
 
         String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"";
-        rs = conn.createStatement().executeQuery(logQuery);
-        boolean foundQueryLog = false;
         int delay = 5000;
 
         // sleep for sometime to let query log committed
         Thread.sleep(delay);
+        rs = conn.createStatement().executeQuery(logQuery);
+        boolean foundQueryLog = false;
+
         while (rs.next()) {
             if (rs.getString(QUERY_ID).equals(queryId)) {
                 foundQueryLog = true;
@@ -113,11 +128,9 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
                 assertEquals(rs.getString(GLOBAL_SCAN_DETAILS), context.getScan().toJSON());
                 assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 10);
                 assertEquals(rs.getString(QUERY), query);
-                assertEquals(rs.getString(QUERY_STATUS), QueryLogState.COMPLETED.toString());
-                assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay);
+                assertEquals(rs.getString(QUERY_STATUS), QueryStatus.COMPLETED.toString());
                 assertEquals(rs.getString(TENANT_ID), null);
-                assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null);
-                assertTrue(rs.getString(SCAN_METRICS_JSON).contains("scanMetrics"));
+                assertTrue(rs.getString(SCAN_METRICS_JSON)==null);
                 assertEquals(rs.getString(EXCEPTION_TRACE),null);
             }else{
                 //confirm we are not logging system queries
@@ -140,7 +153,10 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
         String query = "SELECT * FROM " + tableName;
         int count=100;
         for (int i = 0; i < count; i++) {
-            conn.createStatement().executeQuery(query);
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            while(rs.next()){
+                
+            }
         }
         
         String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"";
@@ -178,12 +194,12 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
         }
 
         String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"";
-        rs = conn.createStatement().executeQuery(logQuery);
-        boolean foundQueryLog = false;
         int delay = 5000;
 
         // sleep for sometime to let query log committed
         Thread.sleep(delay);
+        rs = conn.createStatement().executeQuery(logQuery);
+        boolean foundQueryLog = false;
         while (rs.next()) {
             if (rs.getString(QUERY_ID).equals(queryId)) {
                 foundQueryLog = true;
@@ -191,12 +207,10 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
                 assertEquals(rs.getString(CLIENT_IP), InetAddress.getLocalHost().getHostAddress());
                 assertEquals(rs.getString(EXPLAIN_PLAN), null);
                 assertEquals(rs.getString(GLOBAL_SCAN_DETAILS),null);
-                assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 0);
+                assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 10);
                 assertEquals(rs.getString(QUERY), query);
-                assertEquals(rs.getString(QUERY_STATUS),null);
-                assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay);
+                assertEquals(rs.getString(QUERY_STATUS),QueryStatus.COMPLETED.toString());
                 assertEquals(rs.getString(TENANT_ID), null);
-                assertTrue(rs.getString(TOTAL_EXECUTION_TIME) == null);
             }
         }
         assertTrue(foundQueryLog);
@@ -222,12 +236,12 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
         }
 
         String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"";
-        rs = conn.createStatement().executeQuery(logQuery);
-        boolean foundQueryLog = false;
         int delay = 5000;
 
         // sleep for sometime to let query log committed
         Thread.sleep(delay);
+        rs = conn.createStatement().executeQuery(logQuery);
+        boolean foundQueryLog = false;
         while (rs.next()) {
             if (rs.getString(QUERY_ID).equals(queryId)) {
                 foundQueryLog = true;
@@ -255,7 +269,9 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
         props.setProperty(QueryServices.LOG_LEVEL, loglevel.name());
         Connection conn = DriverManager.getConnection(getUrl(),props);
         assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),loglevel);
-        
+        final MyClock clock = new MyClock(100);
+        EnvironmentEdgeManager.injectEdge(clock);
+        try{
         String query = "SELECT * FROM " + tableName +" where V = ?";
         
         PreparedStatement pstmt = conn.prepareStatement(query);
@@ -270,12 +286,12 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
         ResultSet explainRS = conn.createStatement()
                 .executeQuery("Explain " + "SELECT * FROM " + tableName + " where V = 'value5'");
         String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"";
-        rs = conn.createStatement().executeQuery(logQuery);
-        boolean foundQueryLog = false;
         int delay = 5000;
-        
+
         // sleep for sometime to let query log committed
         Thread.sleep(delay);
+        rs = conn.createStatement().executeQuery(logQuery);
+        boolean foundQueryLog = false;
         while (rs.next()) {
             if (rs.getString(QUERY_ID).equals(queryId)) {
                 foundQueryLog = true;
@@ -286,14 +302,18 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
                 assertEquals(rs.getString(GLOBAL_SCAN_DETAILS), context.getScan().toJSON());
                 assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 1);
                 assertEquals(rs.getString(QUERY), query);
-                assertEquals(rs.getString(QUERY_STATUS), QueryLogState.COMPLETED.toString());
-                assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay);
+                assertEquals(rs.getString(QUERY_STATUS), QueryStatus.COMPLETED.toString());
+                assertTrue(LogLevel.TRACE == loglevel ? rs.getString(SCAN_METRICS_JSON).contains("scanMetrics")
+                        : rs.getString(SCAN_METRICS_JSON) == null);
+                assertEquals(rs.getTimestamp(START_TIME).getTime(),100);
                 assertEquals(rs.getString(TENANT_ID), null);
-                assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null);
             }
         }
         assertTrue(foundQueryLog);
         conn.close();
+        }finally{
+            EnvironmentEdgeManager.injectEdge(null);
+        }
     }
     
     
@@ -315,14 +335,14 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
             assertEquals(e.getErrorCode(), SQLExceptionCode.TABLE_UNDEFINED.getErrorCode());
         }
         String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"";
-        ResultSet rs = conn.createStatement().executeQuery(logQuery);
-        boolean foundQueryLog = false;
         int delay = 5000;
 
         // sleep for sometime to let query log committed
         Thread.sleep(delay);
+        ResultSet rs = conn.createStatement().executeQuery(logQuery);
+        boolean foundQueryLog = false;
         while (rs.next()) {
-            if (QueryLogState.FAILED.name().equals(rs.getString(QUERY_STATUS))) {
+            if (QueryStatus.FAILED.name().equals(rs.getString(QUERY_STATUS))) {
                 foundQueryLog = true;
                 assertEquals(rs.getString(USER), System.getProperty("user.name"));
                 assertEquals(rs.getString(CLIENT_IP), InetAddress.getLocalHost().getHostAddress());
@@ -331,8 +351,6 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
                 assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 0);
                 assertEquals(rs.getString(QUERY), query);
                 assertTrue(rs.getString(EXCEPTION_TRACE).contains(SQLExceptionCode.TABLE_UNDEFINED.getMessage()));
-                assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay);
-                assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null);
             }
         }
         assertTrue(foundQueryLog);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index e45ddcd..73cdf0a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -10,6 +10,10 @@
 package org.apache.phoenix.monitoring;
 
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_REGION_SERVER_RESULTS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_CALLS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_SCANNED_REGIONS;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_FAILED_COUNT;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_SIZE;
@@ -29,19 +33,6 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQ
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FILE_COUNTER;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_END_TO_END_TIME;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTION_TIME;
-
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_CALLS;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_REMOTE_RPC_CALLS;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_NOT_SERVING_REGION_EXCEPTION;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_REGION_SERVER_RESULTS;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_IN_REMOTE_RESULTS;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_SCANNED_REGIONS;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_RETRIES;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_REMOTE_RPC_RETRIES;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_ROWS_SCANNED;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_ROWS_FILTERED;
-
 import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
 import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
 import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
@@ -73,11 +64,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.LoggingPhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.jdbc.PhoenixMetricsLog;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
-import org.apache.phoenix.jdbc.LoggingPhoenixConnection;
+import org.apache.phoenix.log.LogLevel;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -109,6 +101,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
         props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
         // disable renewing leases as this will force spooling to happen.
         props.put(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false));
+        props.put(QueryServices.LOG_LEVEL, LogLevel.DEBUG.toString());
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
         // need the non-test driver for some tests that check number of hconnections, etc.
         DriverManager.registerDriver(PhoenixDriver.INSTANCE);
@@ -377,6 +370,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
         insertRowsInTable(tableName, numRows);
         Properties props = new Properties();
         props.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, "false");
+        props.setProperty(QueryServices.LOG_LEVEL, LogLevel.OFF.name());
         Connection conn = DriverManager.getConnection(getUrl(), props);
         ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
         while (rs.next()) {}
@@ -706,7 +700,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
 
     private void changeInternalStateForTesting(PhoenixResultSet rs) {
         // get and set the internal state for testing purposes.
-        ReadMetricQueue testMetricsQueue = new TestReadMetricsQueue(true);
+        ReadMetricQueue testMetricsQueue = new TestReadMetricsQueue(LogLevel.DEBUG);
         StatementContext ctx = (StatementContext)Whitebox.getInternalState(rs, "context");
         Whitebox.setInternalState(ctx, "readMetricsQueue", testMetricsQueue);
         Whitebox.setInternalState(rs, "readMetricsQueue", testMetricsQueue);
@@ -772,8 +766,8 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
 
     private class TestReadMetricsQueue extends ReadMetricQueue {
 
-        public TestReadMetricsQueue(boolean isRequestMetricsEnabled) {
-            super(isRequestMetricsEnabled);
+        public TestReadMetricsQueue(LogLevel connectionLogLevel) {
+            super(connectionLogLevel);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index 3ea5dd5..4358ee3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -135,8 +135,8 @@ public class StatementContext {
         this.dataColumns = this.currentTable == null ? Collections.<PColumn, Integer> emptyMap() : Maps
                 .<PColumn, Integer> newLinkedHashMap();
         this.subqueryResults = Maps.<SelectStatement, Object> newHashMap();
-        this.readMetricsQueue = new ReadMetricQueue(isRequestMetricsEnabled);
-        this.overAllQueryMetrics = new OverAllQueryMetrics(isRequestMetricsEnabled);
+        this.readMetricsQueue = new ReadMetricQueue(connection.getLogLevel());
+        this.overAllQueryMetrics = new OverAllQueryMetrics(connection.getLogLevel());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 18f4fea..f3a383e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -39,8 +39,8 @@ import javax.annotation.concurrent.Immutable;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -1181,7 +1181,7 @@ public class MutationState implements SQLCloseable {
 						numFailedMutations = uncommittedStatementIndexes.length;
 						GLOBAL_MUTATION_BATCH_FAILED_COUNT.update(numFailedMutations);
                     } finally {
-                    	MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime, numFailedMutations);
+                    	MutationMetric mutationsMetric = new MutationMetric(connection.getLogLevel(),numMutations, mutationSizeBytes, mutationCommitTime, numFailedMutations);
                         mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric);
                         try {
                             if (cache!=null) 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index 8595fd4..acb6c04 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -147,7 +147,7 @@ public class ChunkedResultIterator implements PeekingResultIterator {
             String tableName = tableRef.getTable().getPhysicalName().getString();
             ReadMetricQueue readMetrics = context.getReadMetricsQueue();
             ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, tableName, scan,
-                readMetrics.isRequestMetricsEnabled());
+                    context.getConnection().getLogLevel());
             long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
             ResultIterator singleChunkResultIterator =
                     new SingleChunkResultIterator(new TableResultIterator(mutationState, scan,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 41d278d..262ae44 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -107,11 +107,10 @@ public class ParallelIterators extends BaseResultIterators {
         context.getOverallQueryMetrics().updateNumParallelScans(numScans);
         GLOBAL_NUM_PARALLEL_SCANS.update(numScans);
         final long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
-        boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled();
         for (final ScanLocator scanLocation : scanLocations) {
             final Scan scan = scanLocation.getScan();
             final ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, physicalTableName,
-                scan, isRequestMetricsEnabled);
+                scan, context.getConnection().getLogLevel());
             final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(readMetrics, physicalTableName);
             final TableResultIterator tableResultItr =
                     context.getConnection().getTableResultIteratorFactory().newIterator(

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index c13fcdb..1693421 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -171,7 +171,6 @@ public class SerialIterators extends BaseResultIterators {
                 return EMPTY_ITERATOR;
             }
             ReadMetricQueue readMetrics = context.getReadMetricsQueue();
-            boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled();
             while (index < scans.size()) {
                 Scan currentScan = scans.get(index++);
                 if (remainingOffset != null) {
@@ -179,7 +178,7 @@ public class SerialIterators extends BaseResultIterators {
                 }
                 ScanMetricsHolder scanMetricsHolder =
                         ScanMetricsHolder.getInstance(readMetrics, tableName, currentScan,
-                            isRequestMetricsEnabled);
+                            context.getConnection().getLogLevel());
                 TableResultIterator itr =
                         new TableResultIterator(mutationState, currentScan, scanMetricsHolder,
                                 renewLeaseThreshold, plan, scanGrouper, caches);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index d3626f8..312d17b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -364,9 +364,10 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
                                 function.getTenantId())));
             }
         };
-        this.isRequestLevelMetricsEnabled = JDBCUtil
-                .isCollectingRequestLevelMetricsEnabled(url, info,
-                        this.services.getProps());
+        this.logLevel= LogLevel.valueOf(this.services.getProps().get(QueryServices.LOG_LEVEL,
+                QueryServicesOptions.DEFAULT_LOGGING_LEVEL));
+        this.isRequestLevelMetricsEnabled = JDBCUtil.isCollectingRequestLevelMetricsEnabled(url, info,
+                this.services.getProps());
         this.mutationState = mutationState == null ? newMutationState(maxSize,
                 maxSizeBytes) : new MutationState(mutationState);
         this.metaData = metaData;
@@ -380,8 +381,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
         this.scannerQueue = new LinkedBlockingQueue<>();
         this.tableResultIteratorFactory = new DefaultTableResultIteratorFactory();
         this.isRunningUpgrade = isRunningUpgrade;
-        this.logLevel= LogLevel.valueOf(this.services.getProps().get(QueryServices.LOG_LEVEL,
-                QueryServicesOptions.DEFAULT_LOGGING_LEVEL));
+        
         this.logSamplingRate = Double.parseDouble(this.services.getProps().get(QueryServices.LOG_SAMPLE_RATE,
                 QueryServicesOptions.DEFAULT_LOG_SAMPLE_RATE));
         GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index 153fa08..84816a0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -51,8 +51,8 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.log.QueryLogInfo;
-import org.apache.phoenix.log.QueryLogState;
 import org.apache.phoenix.log.QueryLogger;
+import org.apache.phoenix.log.QueryStatus;
 import org.apache.phoenix.monitoring.MetricType;
 import org.apache.phoenix.monitoring.OverAllQueryMetrics;
 import org.apache.phoenix.monitoring.ReadMetricQueue;
@@ -76,8 +76,6 @@ import org.apache.phoenix.util.SQLCloseable;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMap.Builder;
 
 
 
@@ -133,9 +131,9 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable {
 
     private Long count = 0L;
 
-    private QueryLogState logStatus = QueryLogState.COMPLETED;
+    private Object exception;
+
 
-    private RuntimeException exception;
     
     public PhoenixResultSet(ResultIterator resultIterator, RowProjector rowProjector, StatementContext ctx) throws SQLException {
         this.rowProjector = rowProjector;
@@ -144,7 +142,7 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable {
         this.statement = context.getStatement();
         this.readMetricsQueue = context.getReadMetricsQueue();
         this.overAllQueryMetrics = context.getOverallQueryMetrics();
-        this.queryLogger = context.getQueryLogger();
+        this.queryLogger = context.getQueryLogger() != null ? context.getQueryLogger() : QueryLogger.NO_OP_INSTANCE;
     }
     
     @Override
@@ -181,6 +179,19 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable {
             statement.getResultSets().remove(this);
             overAllQueryMetrics.endQuery();
             overAllQueryMetrics.stopResultSetWatch();
+            if (!queryLogger.isSynced()) {
+                if(this.exception==null){
+                    queryLogger.log(QueryLogInfo.QUERY_STATUS_I,QueryStatus.COMPLETED.toString());
+                }
+                queryLogger.log(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, count);
+                if (queryLogger.isDebugEnabled()) {
+                    queryLogger.log(QueryLogInfo.SCAN_METRICS_JSON_I,
+                            readMetricsQueue.getScanMetricsHolderList().toString());
+                    readMetricsQueue.getScanMetricsHolderList().clear();
+                }
+                // if not already synced , like closing before result set exhausted
+                queryLogger.sync(getReadMetrics(), getOverAllRequestReadMetrics());
+            }
         }
     }
 
@@ -799,36 +810,33 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable {
             }
             rowProjector.reset();
         } catch (RuntimeException e) {
-            this.logStatus=QueryLogState.FAILED;
             // FIXME: Expression.evaluate does not throw SQLException
             // so this will unwrap throws from that.
+            queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.FAILED.toString());
+            if (queryLogger.isDebugEnabled()) {
+                queryLogger.log(QueryLogInfo.EXCEPTION_TRACE_I, Throwables.getStackTraceAsString(e));
+            }
             this.exception = e;
             if (e.getCause() instanceof SQLException) {
                 throw (SQLException) e.getCause();
             }
             throw e;
         }finally{
-            if (currentRow == null && queryLogger != null ) {
+            if (this.exception!=null) {
+                queryLogger.log(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, count);
                 if (queryLogger.isDebugEnabled()) {
-                    Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder();
-                    queryLogBuilder.put(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, count);
-                    queryLogBuilder.put(QueryLogInfo.TOTAL_EXECUTION_TIME_I,
-                            System.currentTimeMillis() - queryLogger.getStartTime());
-                    queryLogBuilder.put(QueryLogInfo.SCAN_METRICS_JSON_I,
+                    queryLogger.log(QueryLogInfo.SCAN_METRICS_JSON_I,
                             readMetricsQueue.getScanMetricsHolderList().toString());
-                    if (this.exception != null) {
-                        queryLogBuilder.put(QueryLogInfo.EXCEPTION_TRACE_I,
-                                Throwables.getStackTraceAsString(this.exception));
-                    }
                     readMetricsQueue.getScanMetricsHolderList().clear();
-                    queryLogger.log(logStatus, queryLogBuilder.build());
+                }
+                if (queryLogger != null) {
+                    queryLogger.sync(getReadMetrics(), getOverAllRequestReadMetrics());
                 }
             }
-        }
-        if (currentRow == null) {
-            
-            overAllQueryMetrics.endQuery();
-            overAllQueryMetrics.stopResultSetWatch();
+            if (currentRow == null) {
+                overAllQueryMetrics.endQuery();
+                overAllQueryMetrics.stopResultSetWatch();
+            }
         }
         return currentRow != null;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 25b9fb0..015f04c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -93,7 +93,7 @@ import org.apache.phoenix.iterate.MaterializedResultIterator;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.log.QueryLogInfo;
-import org.apache.phoenix.log.QueryLogState;
+import org.apache.phoenix.log.QueryStatus;
 import org.apache.phoenix.log.QueryLogger;
 import org.apache.phoenix.log.QueryLoggerUtil;
 import org.apache.phoenix.optimize.Cost;
@@ -190,8 +190,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMap.Builder;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.math.IntMath;
@@ -319,10 +317,8 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                         StatementContext context = plan.getContext();
                         context.setQueryLogger(queryLogger);
                         if(queryLogger.isDebugEnabled()){
-                            Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder();
-                            queryLogBuilder.put(QueryLogInfo.EXPLAIN_PLAN_I, QueryUtil.getExplainPlan(resultIterator));
-                            queryLogBuilder.put(QueryLogInfo.GLOBAL_SCAN_DETAILS_I, context.getScan()!=null?context.getScan().toString():null);
-                            queryLogger.log(QueryLogState.COMPILED, queryLogBuilder.build());
+                            queryLogger.log(QueryLogInfo.EXPLAIN_PLAN_I, QueryUtil.getExplainPlan(resultIterator));
+                            queryLogger.log(QueryLogInfo.GLOBAL_SCAN_DETAILS_I, context.getScan()!=null?context.getScan().toString():null);
                         }
                         context.getOverallQueryMetrics().startQuery();
                         PhoenixResultSet rs = newResultSet(resultIterator, plan.getProjector(), plan.getContext());
@@ -351,6 +347,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                         }
                         throw e;
                     }catch (RuntimeException e) {
+                        
                         // FIXME: Expression.evaluate does not throw SQLException
                         // so this will unwrap throws from that.
                         if (e.getCause() instanceof SQLException) {
@@ -367,11 +364,9 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                 }, PhoenixContextExecutor.inContext());
         }catch (Exception e) {
             if (queryLogger.isDebugEnabled()) {
-                Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder();
-                queryLogBuilder.put(QueryLogInfo.TOTAL_EXECUTION_TIME_I,
-                        System.currentTimeMillis() - queryLogger.getStartTime());
-                queryLogBuilder.put(QueryLogInfo.EXCEPTION_TRACE_I, Throwables.getStackTraceAsString(e));
-                queryLogger.log(QueryLogState.FAILED, queryLogBuilder.build());
+                queryLogger.log(QueryLogInfo.EXCEPTION_TRACE_I, Throwables.getStackTraceAsString(e));
+                queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.FAILED.toString());
+                queryLogger.sync(null, null);
             }
             Throwables.propagateIfInstanceOf(e, SQLException.class);
             Throwables.propagate(e);
@@ -1781,7 +1776,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
         }
         QueryLogger queryLogger = QueryLogger.getInstance(connection,isSystemTable);
         QueryLoggerUtil.logInitialDetails(queryLogger, connection.getTenantId(),
-                connection.getQueryServices(), sql, queryLogger.getStartTime(), getParameters());
+                connection.getQueryServices(), sql, getParameters());
         return queryLogger;
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java b/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java
index 5792658..269b4f4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java
@@ -18,5 +18,5 @@
 package org.apache.phoenix.log;
 
 public enum LogLevel {
-    OFF, INFO, DEBUG, TRACE
+    OFF,INFO, DEBUG, TRACE
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java
index 817f9ec..dab03e7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java
@@ -31,16 +31,18 @@ public interface LogWriter {
      * @param event
      * @throws SQLException
      * @throws IOException
+     * @throws ClassNotFoundException 
      */
-    void write(RingBufferEvent event) throws SQLException, IOException;
+    void write(RingBufferEvent event) throws SQLException, IOException, ClassNotFoundException;
 
     /**
      * will be called when disruptor is getting shutdown
      * 
      * @throws IOException
+     * @throws SQLException 
      */
 
-    void close() throws IOException;
+    void close() throws IOException, SQLException;
 
     /**
      * if writer is closed and cannot write further event

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java
index 87de267..fb38ba2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java
@@ -28,8 +28,8 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_STATUS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCAN_METRICS_JSON;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TOTAL_EXECUTION_TIME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER;
 
 import org.apache.phoenix.schema.types.PDataType;
@@ -40,29 +40,27 @@ import org.apache.phoenix.schema.types.PVarchar;
 
 public enum QueryLogInfo {
     
-    CLIENT_IP_I(CLIENT_IP, QueryLogState.STARTED, LogLevel.INFO, PVarchar.INSTANCE),
-    QUERY_I(QUERY,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE),
-    BIND_PARAMETERS_I(BIND_PARAMETERS,QueryLogState.STARTED, LogLevel.TRACE,PVarchar.INSTANCE),
-    QUERY_ID_I(QUERY_ID,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE),
-    TENANT_ID_I(TENANT_ID,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE),
-    START_TIME_I(START_TIME,QueryLogState.STARTED, LogLevel.INFO,PTimestamp.INSTANCE),
-    USER_I(USER,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE),
-    EXPLAIN_PLAN_I(EXPLAIN_PLAN,QueryLogState.COMPILED, LogLevel.DEBUG,PVarchar.INSTANCE),
-    GLOBAL_SCAN_DETAILS_I(GLOBAL_SCAN_DETAILS,QueryLogState.COMPILED, LogLevel.DEBUG,PVarchar.INSTANCE),
-    NO_OF_RESULTS_ITERATED_I(NO_OF_RESULTS_ITERATED,QueryLogState.COMPLETED, LogLevel.DEBUG,PLong.INSTANCE),
-    EXCEPTION_TRACE_I(EXCEPTION_TRACE,QueryLogState.COMPLETED, LogLevel.DEBUG,PVarchar.INSTANCE),
-    QUERY_STATUS_I(QUERY_STATUS,QueryLogState.COMPLETED, LogLevel.DEBUG,PVarchar.INSTANCE),
-    TOTAL_EXECUTION_TIME_I(TOTAL_EXECUTION_TIME,QueryLogState.COMPLETED, LogLevel.DEBUG,PLong.INSTANCE),
-    SCAN_METRICS_JSON_I(SCAN_METRICS_JSON,QueryLogState.COMPLETED, LogLevel.DEBUG,PVarchar.INSTANCE);
+    CLIENT_IP_I(CLIENT_IP, LogLevel.INFO, PVarchar.INSTANCE),
+    QUERY_I(QUERY, LogLevel.INFO,PVarchar.INSTANCE),
+    BIND_PARAMETERS_I(BIND_PARAMETERS, LogLevel.TRACE,PVarchar.INSTANCE),
+    QUERY_ID_I(QUERY_ID, LogLevel.INFO,PVarchar.INSTANCE),
+    TENANT_ID_I(TENANT_ID, LogLevel.INFO,PVarchar.INSTANCE),
+    START_TIME_I(START_TIME, LogLevel.INFO,PTimestamp.INSTANCE),
+    USER_I(USER, LogLevel.INFO,PVarchar.INSTANCE),
+    EXPLAIN_PLAN_I(EXPLAIN_PLAN,LogLevel.DEBUG,PVarchar.INSTANCE),
+    GLOBAL_SCAN_DETAILS_I(GLOBAL_SCAN_DETAILS, LogLevel.DEBUG,PVarchar.INSTANCE),
+    NO_OF_RESULTS_ITERATED_I(NO_OF_RESULTS_ITERATED, LogLevel.INFO,PLong.INSTANCE),
+    EXCEPTION_TRACE_I(EXCEPTION_TRACE, LogLevel.DEBUG,PVarchar.INSTANCE),
+    QUERY_STATUS_I(QUERY_STATUS, LogLevel.INFO,PVarchar.INSTANCE),
+    SCAN_METRICS_JSON_I(SCAN_METRICS_JSON, LogLevel.TRACE,PVarchar.INSTANCE), 
+    TABLE_NAME_I(TABLE_NAME, LogLevel.DEBUG,PVarchar.INSTANCE);
     
     public final String columnName;
-    public final QueryLogState logState;
     public final LogLevel logLevel;
     public final PDataType dataType;
 
-    private QueryLogInfo(String columnName, QueryLogState logState, LogLevel logLevel, PDataType dataType) {
+    private QueryLogInfo(String columnName, LogLevel logLevel, PDataType dataType) {
         this.columnName = columnName;
-        this.logState=logState;
         this.logLevel=logLevel;
         this.dataType=dataType;
     }
@@ -71,10 +69,6 @@ public enum QueryLogInfo {
         return columnName;
     }
 
-    public QueryLogState getLogState() {
-        return logState;
-    }
-
     public LogLevel getLogLevel() {
         return logLevel;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java
deleted file mode 100644
index e27f0e8..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.log;
-
-public enum QueryLogState {
-    STARTED, PLAN, COMPILED, EXECUTION, COMPLETED,FAILED 
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java
index b2fb235..ef5559c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java
@@ -17,13 +17,17 @@
  */
 package org.apache.phoenix.log;
 
+import java.util.Map;
 import java.util.UUID;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
 
 import io.netty.util.internal.ThreadLocalRandom;
 
@@ -34,15 +38,17 @@ public class QueryLogger {
     private final ThreadLocal<RingBufferEventTranslator> threadLocalTranslator = new ThreadLocal<>();
     private QueryLoggerDisruptor queryDisruptor;
     private String queryId;
-    private Long startTime;
     private LogLevel logLevel;
+    private Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder();
+    private boolean isSynced;
     private static final Log LOG = LogFactory.getLog(QueryLoggerDisruptor.class);
     
     private QueryLogger(PhoenixConnection connection) {
         this.queryId = UUID.randomUUID().toString();
         this.queryDisruptor = connection.getQueryServices().getQueryDisruptor();
-        this.startTime = System.currentTimeMillis();
         logLevel = connection.getLogLevel();
+        log(QueryLogInfo.QUERY_ID_I, queryId);
+        log(QueryLogInfo.START_TIME_I, EnvironmentEdgeManager.currentTimeMillis());
     }
     
     private QueryLogger() {
@@ -58,21 +64,32 @@ public class QueryLogger {
         return result;
     }
     
-    private static final QueryLogger NO_OP_INSTANCE = new QueryLogger() {
+    public static final QueryLogger NO_OP_INSTANCE = new QueryLogger() {
         @Override
-        public void log(QueryLogState logState, ImmutableMap<QueryLogInfo, Object> map) {
+        public void log(QueryLogInfo queryLogInfo, Object info) {
 
         }
-        
+
         @Override
-        public boolean isDebugEnabled(){
+        public boolean isDebugEnabled() {
             return false;
         }
-        
+
         @Override
-        public boolean isInfoEnabled(){
+        public boolean isInfoEnabled() {
             return false;
         }
+
+        @Override
+        public void sync(
+                Map<String, Map<MetricType, Long>> readMetrics, Map<MetricType, Long> overAllMetrics) {
+
+        }
+        
+        @Override
+        public boolean isSynced(){
+            return true;
+        }
     };
 
     public static QueryLogger getInstance(PhoenixConnection connection, boolean isSystemTable) {
@@ -82,14 +99,14 @@ public class QueryLogger {
     }
 
     /**
-     * Add query log in the table, columns will be logged depending upon the connection logLevel
-     * @param logState State of the query
-     * @param map Value of the map should be in format of the corresponding data type 
+     * Add query log in the table, columns will be logged depending upon the connection logLevel 
      */
-    public void log(QueryLogState logState, ImmutableMap<QueryLogInfo, Object> map) {
-        final RingBufferEventTranslator translator = getCachedTranslator();
-        translator.setQueryInfo(logState, map, logLevel);
-        publishLogs(translator);
+    public void log(QueryLogInfo queryLogInfo, Object info) {
+        try {
+            queryLogBuilder.put(queryLogInfo, info);
+        } catch (Exception e) {
+            LOG.warn("Unable to add log info because of " + e.getMessage());
+        }
     }
     
     private boolean publishLogs(RingBufferEventTranslator translator) {
@@ -102,13 +119,6 @@ public class QueryLogger {
     }
 
     /**
-     * Start time when the logger was started, if {@link LogLevel#OFF} then it's the current time
-     */
-    public Long getStartTime() {
-        return startTime != null ? startTime : System.currentTimeMillis();
-    }
-    
-    /**
      *  Is debug logging currently enabled?
      *  Call this method to prevent having to perform expensive operations (for example, String concatenation) when the log level is more than debug.
      */
@@ -117,7 +127,8 @@ public class QueryLogger {
     }
     
     private boolean isLevelEnabled(LogLevel logLevel){
-        return this.logLevel != null ? logLevel.ordinal() <= this.logLevel.ordinal() : false;
+        return this.logLevel != null && logLevel != LogLevel.OFF ? logLevel.ordinal() <= this.logLevel.ordinal()
+                : false;
     }
     
     /**
@@ -142,4 +153,21 @@ public class QueryLogger {
         return this.queryId;
     }
     
+
+    public void sync(Map<String, Map<MetricType, Long>> readMetrics, Map<MetricType, Long> overAllMetrics) {
+        if (!isSynced) {
+            isSynced = true;
+            final RingBufferEventTranslator translator = getCachedTranslator();
+            translator.setQueryInfo(logLevel, queryLogBuilder.build(), readMetrics, overAllMetrics);
+            publishLogs(translator);
+        }
+    }
+    
+    /**
+     * Is Synced already
+     */
+    public boolean isSynced(){
+        return this.isSynced;
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java
index d5c4878..21917ed 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java
@@ -25,48 +25,36 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.schema.PName;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMap.Builder;
-
 public class QueryLoggerUtil {
 
+
     public static void logInitialDetails(QueryLogger queryLogger, PName tenantId, ConnectionQueryServices queryServices,
-            String query, long startTime, List<Object> bindParameters) {
+            String query, List<Object> bindParameters) {
         try {
-            queryLogger.log(QueryLogState.STARTED,
-                    getInitialDetails(tenantId, queryServices, query, startTime, bindParameters));
+            String clientIP;
+            try {
+                clientIP = InetAddress.getLocalHost().getHostAddress();
+            } catch (UnknownHostException e) {
+                clientIP = "UnknownHost";
+            }
+
+            if (clientIP != null) {
+                queryLogger.log(QueryLogInfo.CLIENT_IP_I, clientIP);
+            }
+            if (query != null) {
+                queryLogger.log(QueryLogInfo.QUERY_I, query);
+            }
+            if (bindParameters != null) {
+                queryLogger.log(QueryLogInfo.BIND_PARAMETERS_I, StringUtils.join(bindParameters, ","));
+            }
+            if (tenantId != null) {
+                queryLogger.log(QueryLogInfo.TENANT_ID_I, tenantId.getString());
+            }
+
+            queryLogger.log(QueryLogInfo.USER_I, queryServices.getUserName() != null ? queryServices.getUserName()
+                    : queryServices.getUser().getShortName());
         } catch (Exception e) {
-            // Ignore for now
-        }
-
-    }
-
-    private static ImmutableMap<QueryLogInfo, Object> getInitialDetails(PName tenantId,
-            ConnectionQueryServices queryServices, String query, long startTime, List<Object> bindParameters) {
-        Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder();
-        String clientIP;
-        try {
-            clientIP = InetAddress.getLocalHost().getHostAddress();
-        } catch (UnknownHostException e) {
-            clientIP = "UnknownHost";
+            // Ignore
         }
-
-        if (clientIP != null) {
-            queryLogBuilder.put(QueryLogInfo.CLIENT_IP_I, clientIP);
-        }
-        if (query != null) {
-            queryLogBuilder.put(QueryLogInfo.QUERY_I, query);
-        }
-        queryLogBuilder.put(QueryLogInfo.START_TIME_I, startTime);
-        if (bindParameters != null) {
-            queryLogBuilder.put(QueryLogInfo.BIND_PARAMETERS_I, StringUtils.join(bindParameters, ","));
-        }
-        if (tenantId != null) {
-            queryLogBuilder.put(QueryLogInfo.TENANT_ID_I, tenantId.getString());
-        }
-
-        queryLogBuilder.put(QueryLogInfo.USER_I, queryServices.getUserName() != null ? queryServices.getUserName()
-                : queryServices.getUser().getShortName());
-        return queryLogBuilder.build();
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/QueryStatus.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryStatus.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryStatus.java
new file mode 100644
index 0000000..0e634c1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryStatus.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+public enum QueryStatus {
+    COMPILED, COMPLETED,FAILED 
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java
index 96e4bf9..8854e68 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java
@@ -17,14 +17,19 @@
  */
 package org.apache.phoenix.log;
 
+import java.util.Map;
+
+import org.apache.phoenix.monitoring.MetricType;
+
 import com.google.common.collect.ImmutableMap;
 import com.lmax.disruptor.EventFactory;
 
  class RingBufferEvent {
     private String queryId;
-    private QueryLogState logState;
     private LogLevel connectionLogLevel;
     private ImmutableMap<QueryLogInfo, Object> queryInfo;
+    private Map<String, Map<MetricType, Long>> readMetrics;
+    private Map<MetricType, Long> overAllMetrics;
     
     public static final Factory FACTORY = new Factory();
     
@@ -40,7 +45,6 @@ import com.lmax.disruptor.EventFactory;
     }
 
     public void clear() {
-        this.logState=null;
         this.queryInfo=null;
         this.queryId=null;
     }
@@ -53,10 +57,6 @@ import com.lmax.disruptor.EventFactory;
     public static Factory getFactory() {
         return FACTORY;
     }
-    
-    public QueryLogState getLogState() {
-        return logState;
-    }
 
     public void setQueryInfo(ImmutableMap<QueryLogInfo, Object> queryInfo) {
         this.queryInfo=queryInfo;
@@ -73,12 +73,6 @@ import com.lmax.disruptor.EventFactory;
         
     }
 
-    public void setLogState(QueryLogState logState) {
-        this.logState=logState;
-        
-    }
-
-
     public LogLevel getConnectionLogLevel() {
         return connectionLogLevel;
     }
@@ -88,6 +82,26 @@ import com.lmax.disruptor.EventFactory;
         this.connectionLogLevel = connectionLogLevel;
     }
 
+
+    public Map<String, Map<MetricType, Long>> getReadMetrics() {
+        return readMetrics;
+    }
+
+
+    public void setReadMetrics(Map<String, Map<MetricType, Long>> readMetrics) {
+        this.readMetrics = readMetrics;
+    }
+
+
+    public Map<MetricType, Long> getOverAllMetrics() {
+        return overAllMetrics;
+    }
+
+
+    public void setOverAllMetrics(Map<MetricType, Long> overAllMetrics) {
+        this.overAllMetrics = overAllMetrics;
+    }
+
     
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java
index 653ddd6..742f8e1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java
@@ -17,14 +17,19 @@
  */
 package org.apache.phoenix.log;
 
+import java.util.Map;
+
+import org.apache.phoenix.monitoring.MetricType;
+
 import com.google.common.collect.ImmutableMap;
 import com.lmax.disruptor.EventTranslator;
 
 class RingBufferEventTranslator implements EventTranslator<RingBufferEvent> {
     private String queryId;
-    private QueryLogState logState;
     private ImmutableMap<QueryLogInfo, Object> queryInfo;
     private LogLevel connectionLogLevel;
+    private Map<String, Map<MetricType, Long>> readMetrics;
+    private Map<MetricType, Long> overAllMetrics;
     
     public RingBufferEventTranslator(String queryId) {
         this.queryId=queryId;
@@ -34,20 +39,22 @@ class RingBufferEventTranslator implements EventTranslator<RingBufferEvent> {
     public void translateTo(RingBufferEvent event, long sequence) {
         event.setQueryId(queryId);
         event.setQueryInfo(queryInfo);
-        event.setLogState(logState);
+        event.setReadMetrics(readMetrics);
+        event.setOverAllMetrics(overAllMetrics);
         event.setConnectionLogLevel(connectionLogLevel);
         clear();
     }
 
     private void clear() {
-        setQueryInfo(null,null,null);
+        setQueryInfo(null,null,null,null);
     }
    
-    public void setQueryInfo(QueryLogState logState, ImmutableMap<QueryLogInfo, Object> queryInfo,
-            LogLevel connectionLogLevel) {
+    public void setQueryInfo(LogLevel logLevel, ImmutableMap<QueryLogInfo, Object> queryInfo, Map<String, Map<MetricType, Long>> readMetrics,
+            Map<MetricType, Long> overAllMetrics) {
         this.queryInfo = queryInfo;
-        this.logState = logState;
-        this.connectionLogLevel = connectionLogLevel;
+        this.connectionLogLevel = logLevel;
+        this.readMetrics = readMetrics;
+        this.overAllMetrics=overAllMetrics;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
index c102855..0209951 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
@@ -21,23 +21,17 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCH
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE;
 
 import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.SQLException;
-import java.util.Map.Entry;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.expression.Determinism;
-import org.apache.phoenix.expression.LiteralExpression;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.util.QueryUtil;
 
 import com.google.common.collect.ImmutableMap;
 
@@ -49,75 +43,111 @@ public class TableLogWriter implements LogWriter {
     private static final Log LOG = LogFactory.getLog(LogWriter.class);
     private Connection connection;
     private boolean isClosed;
-    private Table table;
+    private PreparedStatement upsertStatement;
     private Configuration config;
+    private Map<MetricType,Integer> metricOrdinals=new HashMap<MetricType,Integer>();
 
     public TableLogWriter(Configuration configuration) {
-        this.config = configuration;
-        try {
-            this.connection = ConnectionFactory.createConnection(configuration);
-            table = this.connection.getTable(SchemaUtil.getPhysicalTableName(
-                    SchemaUtil.getTableNameAsBytes(SYSTEM_CATALOG_SCHEMA, SYSTEM_LOG_TABLE), config));
-        } catch (Exception e) {
-            LOG.warn("Unable to initiate LogWriter for writing query logs to table");
+        this.config=configuration;
+    }
+    
+    private PreparedStatement buildUpsertStatement(Connection conn) throws SQLException {
+        StringBuilder buf = new StringBuilder("UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"(");
+        int queryLogEntries=0;
+        for (QueryLogInfo info : QueryLogInfo.values()) {
+            buf.append(info.columnName);
+            buf.append(',');
+            queryLogEntries++;
+        }
+        for (MetricType metric : MetricType.values()) {
+            if (metric.logLevel() != LogLevel.OFF) {
+                metricOrdinals.put(metric, ++queryLogEntries);
+                buf.append(metric.columnName());
+                buf.append(',');
+            }
+        }
+        buf.setLength(buf.length()-1);
+        buf.append(") VALUES (");
+        for (int i = 0; i < QueryLogInfo.values().length; i++) {
+            buf.append("?,");
         }
+        for (MetricType metric : MetricType.values()) {
+            if (metric.logLevel() != LogLevel.OFF) {
+                buf.append("?,");
+            }
+        }
+        buf.setLength(buf.length()-1);
+        buf.append(")");
+        return conn.prepareStatement(buf.toString());
     }
 
     @Override
-    public void write(RingBufferEvent event) throws SQLException, IOException {
-        if(isClosed()){
+    public void write(RingBufferEvent event) throws SQLException, IOException, ClassNotFoundException {
+        if (isClosed()) {
             LOG.warn("Unable to commit query log as Log committer is already closed");
             return;
         }
-        if (table == null || connection == null) {
-            LOG.warn("Unable to commit query log as connection was not initiated ");
-            return;
+        if (connection == null) {
+            synchronized (this) {
+                if (connection == null) {
+                    connection = QueryUtil.getConnectionForQueryLog(this.config);
+                    this.upsertStatement = buildUpsertStatement(connection);
+                }
+            }
         }
-        ImmutableMap<QueryLogInfo, Object> queryInfo=event.getQueryInfo();
-        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-        Put put =new Put(Bytes.toBytes(event.getQueryId()));
-        for (Entry<QueryLogInfo, Object> entry : queryInfo.entrySet()) {
-            if (entry.getKey().logLevel.ordinal() <= event.getConnectionLogLevel().ordinal()) {
-                LiteralExpression expression = LiteralExpression.newConstant(entry.getValue(), entry.getKey().dataType,
-                        Determinism.ALWAYS);
-                expression.evaluate(null, ptr);
-                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes(entry.getKey().columnName),
-                        ByteUtil.copyKeyBytesIfNecessary(ptr));
+
+        ImmutableMap<QueryLogInfo, Object> queryInfoMap = event.getQueryInfo();
+        for (QueryLogInfo info : QueryLogInfo.values()) {
+            if (queryInfoMap.containsKey(info) && info.logLevel.ordinal() <= event.getConnectionLogLevel().ordinal()) {
+                upsertStatement.setObject(info.ordinal() + 1, queryInfoMap.get(info));
+            } else {
+                upsertStatement.setObject(info.ordinal() + 1, null);
             }
         }
-        
-        if (QueryLogInfo.QUERY_STATUS_I.logLevel.ordinal() <= event.getConnectionLogLevel().ordinal()
-                && (event.getLogState() == QueryLogState.COMPLETED || event.getLogState() == QueryLogState.FAILED)) {
-            LiteralExpression expression = LiteralExpression.newConstant(event.getLogState().toString(),
-                    QueryLogInfo.QUERY_STATUS_I.dataType, Determinism.ALWAYS);
-            expression.evaluate(null, ptr);
-            put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
-                    Bytes.toBytes(QueryLogInfo.QUERY_STATUS_I.columnName), ByteUtil.copyKeyBytesIfNecessary(ptr));
+        Map<MetricType, Long> overAllMetrics = event.getOverAllMetrics();
+        Map<String, Map<MetricType, Long>> readMetrics = event.getReadMetrics();
+
+        for (MetricType metric : MetricType.values()) {
+            if (overAllMetrics != null && overAllMetrics.containsKey(metric)
+                    && metric.isLoggingEnabled(event.getConnectionLogLevel())) {
+                upsertStatement.setObject(metricOrdinals.get(metric), overAllMetrics.get(metric));
+            } else {
+                if (metric.logLevel() != LogLevel.OFF) {
+                    upsertStatement.setObject(metricOrdinals.get(metric), null);
+                }
+            }
+        }
+
+        if (readMetrics != null && !readMetrics.isEmpty()) {
+            for (Map.Entry<String, Map<MetricType, Long>> entry : readMetrics.entrySet()) {
+                upsertStatement.setObject(QueryLogInfo.TABLE_NAME_I.ordinal() + 1, entry.getKey());
+                for (MetricType metric : entry.getValue().keySet()) {
+                    if (metric.isLoggingEnabled(event.getConnectionLogLevel())) {
+                        upsertStatement.setObject(metricOrdinals.get(metric), entry.getValue().get(metric));
+                    }
+                }
+                upsertStatement.executeUpdate();
+            }
+        } else {
+            upsertStatement.executeUpdate();
         }
-        put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
-        table.put(put);
-        
+        connection.commit();
     }
     
     @Override
     public void close() throws IOException {
-        if(isClosed()){
-            return;
-        }
-        isClosed=true;
+        if (isClosed()) { return; }
+        isClosed = true;
         try {
-            if (table != null) {
-                table.close();
-            }
-            if (connection != null && !connection.isClosed()) {
-                //It should internally close all the statements
+            if (connection != null) {
+                // It should internally close all the statements
                 connection.close();
             }
-        } catch (IOException e) {
+        } catch (SQLException e) {
             // TODO Ignore?
         }
     }
-    
+
     public boolean isClosed(){
         return isClosed;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index ec1b451..58c048b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -35,7 +35,15 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.iterate.*;
+import org.apache.phoenix.iterate.ConcatResultIterator;
+import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
+import org.apache.phoenix.iterate.PeekingResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.RoundRobinResultIterator;
+import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.iterate.TableResultIterator;
+import org.apache.phoenix.iterate.TableSnapshotResultIterator;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.monitoring.ReadMetricQueue;
@@ -112,7 +120,6 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
             services.clearTableRegionCache(tableNameBytes);
 
             long renewScannerLeaseThreshold = queryPlan.getContext().getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
-            boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled();
             for (Scan scan : scans) {
                 // For MR, skip the region boundary check exception if we encounter a split. ref: PHOENIX-2599
                 scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
@@ -120,7 +127,7 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
                 PeekingResultIterator peekingResultIterator;
                 ScanMetricsHolder scanMetricsHolder =
                   ScanMetricsHolder.getInstance(readMetrics, tableName, scan,
-                      isRequestMetricsEnabled);
+                      queryPlan.getContext().getConnection().getLogLevel());
                 if (snapshotName != null) {
                   // result iterator to read snapshots
                   final TableSnapshotResultIterator tableSnapshotResultIterator = new TableSnapshotResultIterator(configuration, scan,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
index 0e82ce4..daa0bba 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
@@ -26,7 +26,6 @@ import static org.apache.phoenix.monitoring.MetricType.MEMORY_WAIT_TIME;
 public class MemoryMetricsHolder {
     private final CombinableMetric memoryChunkSizeMetric;
     private final CombinableMetric memoryWaitTimeMetric;
-    public static final MemoryMetricsHolder NO_OP_INSTANCE = new MemoryMetricsHolder(new ReadMetricQueue(false), null);
     
     public MemoryMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
         this.memoryChunkSizeMetric = readMetrics.allotMetric(MEMORY_CHUNK_BYTES, tableName);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index ef6eceb..8e1de66 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -17,62 +17,74 @@
  */
 package org.apache.phoenix.monitoring;
 
+import org.apache.phoenix.log.LogLevel;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PLong;
+
+
+/**
+ * Keeping {@link LogLevel#OFF} for metrics which are calculated globally only and doesn't need to be logged in SYSTEM.LOG 
+ */
 public enum MetricType {
 
-	NO_OP_METRIC("no", "No op metric"),
+	NO_OP_METRIC("no", "No op metric",LogLevel.OFF, PLong.INSTANCE),
 	// mutation (write) related metrics 
-    MUTATION_BATCH_SIZE("ms", "Number of mutations in the batch"),
-    MUTATION_BYTES("mb", "Size of mutations in bytes"),
-    MUTATION_COMMIT_TIME("mt", "Time it took to commit a batch of mutations"),
-    MUTATION_BATCH_FAILED_SIZE("mfs", "Number of mutations that failed to be committed"),
-    MUTATION_SQL_COUNTER("msc", "Counter for number of mutation sql statements"),
+    MUTATION_BATCH_SIZE("ms", "Number of mutations in the batch",LogLevel.OFF, PLong.INSTANCE),
+    MUTATION_BYTES("mb", "Size of mutations in bytes",LogLevel.OFF, PLong.INSTANCE),
+    MUTATION_COMMIT_TIME("mt", "Time it took to commit a batch of mutations",LogLevel.OFF, PLong.INSTANCE),
+    MUTATION_BATCH_FAILED_SIZE("mfs", "Number of mutations that failed to be committed",LogLevel.OFF, PLong.INSTANCE),
+    MUTATION_SQL_COUNTER("msc", "Counter for number of mutation sql statements",LogLevel.OFF, PLong.INSTANCE),
     // query (read) related metrics
-    QUERY_TIME("qt", "Query times"),
-    QUERY_TIMEOUT_COUNTER("qo", "Number of times query timed out"),
-    QUERY_FAILED_COUNTER("qf", "Number of times query failed"),
-    NUM_PARALLEL_SCANS("ps", "Number of scans that were executed in parallel"),
-    SCAN_BYTES("sb", "Number of bytes read by scans"),
-    SELECT_SQL_COUNTER("sc", "Counter for number of sql queries"),
+    QUERY_TIME("qt", "Query times",LogLevel.OFF, PLong.INSTANCE),
+    QUERY_TIMEOUT_COUNTER("qo", "Number of times query timed out",LogLevel.DEBUG, PLong.INSTANCE),
+    QUERY_FAILED_COUNTER("qf", "Number of times query failed",LogLevel.DEBUG, PLong.INSTANCE),
+    NUM_PARALLEL_SCANS("ps", "Number of scans that were executed in parallel",LogLevel.DEBUG, PLong.INSTANCE),
+    SCAN_BYTES("sb", "Number of bytes read by scans",LogLevel.OFF, PLong.INSTANCE),
+    SELECT_SQL_COUNTER("sc", "Counter for number of sql queries",LogLevel.OFF, PLong.INSTANCE),
     // task metrics
-    TASK_QUEUE_WAIT_TIME("tw", "Time in milliseconds tasks had to wait in the queue of the thread pool executor"),
-    TASK_END_TO_END_TIME("tee", "Time in milliseconds spent by tasks from creation to completion"),
-    TASK_EXECUTION_TIME("tx", "Time in milliseconds tasks took to execute"),
-    TASK_EXECUTED_COUNTER("te", "Counter for number of tasks submitted to the thread pool executor"),
-    TASK_REJECTED_COUNTER("tr", "Counter for number of tasks that were rejected by the thread pool executor"),
+    TASK_QUEUE_WAIT_TIME("tw", "Time in milliseconds tasks had to wait in the queue of the thread pool executor",LogLevel.DEBUG, PLong.INSTANCE),
+    TASK_END_TO_END_TIME("tee", "Time in milliseconds spent by tasks from creation to completion",LogLevel.DEBUG, PLong.INSTANCE),
+    TASK_EXECUTION_TIME("tx", "Time in milliseconds tasks took to execute",LogLevel.DEBUG, PLong.INSTANCE),
+    TASK_EXECUTED_COUNTER("te", "Counter for number of tasks submitted to the thread pool executor",LogLevel.DEBUG, PLong.INSTANCE),
+    TASK_REJECTED_COUNTER("tr", "Counter for number of tasks that were rejected by the thread pool executor",LogLevel.DEBUG, PLong.INSTANCE),
     // spool metrics
-    SPOOL_FILE_SIZE("ss", "Size of spool files created in bytes"),
-    SPOOL_FILE_COUNTER("sn", "Number of spool files created"),
+    SPOOL_FILE_SIZE("ss", "Size of spool files created in bytes",LogLevel.DEBUG, PLong.INSTANCE),
+    SPOOL_FILE_COUNTER("sn", "Number of spool files created",LogLevel.DEBUG, PLong.INSTANCE),
     // misc metrics
-    MEMORY_CHUNK_BYTES("mc", "Number of bytes allocated by the memory manager"),
-    MEMORY_WAIT_TIME("mw", "Number of milliseconds threads needed to wait for memory to be allocated through memory manager"),
-    CACHE_REFRESH_SPLITS_COUNTER("cr", "Number of times cache was refreshed because of splits"),
-    WALL_CLOCK_TIME_MS("tq", "Wall clock time elapsed for the overall query execution"),
-    RESULT_SET_TIME_MS("tn", "Wall clock time elapsed for reading all records using resultSet.next()"),
-    OPEN_PHOENIX_CONNECTIONS_COUNTER("o", "Number of open phoenix connections"),
-    QUERY_SERVICES_COUNTER("cqs", "Number of ConnectionQueryServicesImpl instantiated"),
-    HCONNECTIONS_COUNTER("h", "Number of HConnections created by phoenix driver"),
+    MEMORY_CHUNK_BYTES("mc", "Number of bytes allocated by the memory manager",LogLevel.DEBUG, PLong.INSTANCE),
+    MEMORY_WAIT_TIME("mw", "Number of milliseconds threads needed to wait for memory to be allocated through memory manager",LogLevel.DEBUG, PLong.INSTANCE),
+    CACHE_REFRESH_SPLITS_COUNTER("cr", "Number of times cache was refreshed because of splits",LogLevel.DEBUG, PLong.INSTANCE),
+    WALL_CLOCK_TIME_MS("tq", "Wall clock time elapsed for the overall query execution",LogLevel.INFO, PLong.INSTANCE),
+    RESULT_SET_TIME_MS("tn", "Wall clock time elapsed for reading all records using resultSet.next()",LogLevel.INFO, PLong.INSTANCE),
+    OPEN_PHOENIX_CONNECTIONS_COUNTER("o", "Number of open phoenix connections",LogLevel.OFF, PLong.INSTANCE),
+    QUERY_SERVICES_COUNTER("cqs", "Number of ConnectionQueryServicesImpl instantiated",LogLevel.OFF, PLong.INSTANCE),
+    HCONNECTIONS_COUNTER("h", "Number of HConnections created by phoenix driver",LogLevel.OFF, PLong.INSTANCE),
     PHOENIX_CONNECTIONS_THROTTLED_COUNTER("ct", "Number of client Phoenix connections prevented from opening " +
-                                              "because there are already too many to that target cluster."),
-    PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("ca","Number of requests for Phoenix connections, whether successful or not."),
+                                              "because there are already too many to that target cluster.",LogLevel.OFF, PLong.INSTANCE),
+    PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("ca","Number of requests for Phoenix connections, whether successful or not.",LogLevel.OFF, PLong.INSTANCE),
     // hbase metrics
-    COUNT_RPC_CALLS("rp", "Number of RPC calls"),
-    COUNT_REMOTE_RPC_CALLS("rr", "Number of remote RPC calls"),
-    COUNT_MILLS_BETWEEN_NEXTS("n", "Sum of milliseconds between sequential next calls"),
-    COUNT_NOT_SERVING_REGION_EXCEPTION("nsr", "Number of NotServingRegionException caught"),
-    COUNT_BYTES_REGION_SERVER_RESULTS("rs", "Number of bytes in Result objects from region servers"),
-    COUNT_BYTES_IN_REMOTE_RESULTS("rrs", "Number of bytes in Result objects from remote region servers"),
-    COUNT_SCANNED_REGIONS("rg", "Number of regions scanned"),
-    COUNT_RPC_RETRIES("rpr", "Number of RPC retries"),
-    COUNT_REMOTE_RPC_RETRIES("rrr", "Number of remote RPC retries"),
-    COUNT_ROWS_SCANNED("ws", "Number of rows scanned"),
-    COUNT_ROWS_FILTERED("wf", "Number of rows filtered");
+    COUNT_RPC_CALLS("rp", "Number of RPC calls",LogLevel.DEBUG, PLong.INSTANCE),
+    COUNT_REMOTE_RPC_CALLS("rr", "Number of remote RPC calls",LogLevel.DEBUG, PLong.INSTANCE),
+    COUNT_MILLS_BETWEEN_NEXTS("n", "Sum of milliseconds between sequential next calls",LogLevel.DEBUG, PLong.INSTANCE),
+    COUNT_NOT_SERVING_REGION_EXCEPTION("nsr", "Number of NotServingRegionException caught",LogLevel.DEBUG, PLong.INSTANCE),
+    COUNT_BYTES_REGION_SERVER_RESULTS("rs", "Number of bytes in Result objects from region servers",LogLevel.DEBUG, PLong.INSTANCE),
+    COUNT_BYTES_IN_REMOTE_RESULTS("rrs", "Number of bytes in Result objects from remote region servers",LogLevel.DEBUG, PLong.INSTANCE),
+    COUNT_SCANNED_REGIONS("rg", "Number of regions scanned",LogLevel.DEBUG, PLong.INSTANCE),
+    COUNT_RPC_RETRIES("rpr", "Number of RPC retries",LogLevel.DEBUG, PLong.INSTANCE),
+    COUNT_REMOTE_RPC_RETRIES("rrr", "Number of remote RPC retries",LogLevel.DEBUG, PLong.INSTANCE),
+    COUNT_ROWS_SCANNED("ws", "Number of rows scanned",LogLevel.DEBUG, PLong.INSTANCE),
+    COUNT_ROWS_FILTERED("wf", "Number of rows filtered",LogLevel.DEBUG,PLong.INSTANCE);
 	
     private final String description;
     private final String shortName;
+    private LogLevel logLevel;
+    private PDataType dataType;
 
-    private MetricType(String shortName, String description) {
+    private MetricType(String shortName, String description, LogLevel logLevel, PDataType dataType) {
     	this.shortName = shortName;
         this.description = description;
+        this.logLevel=logLevel;
+        this.dataType=dataType;
     }
 
     public String description() {
@@ -82,5 +94,34 @@ public enum MetricType {
     public String shortName() {
         return shortName;
     }
+    
+    public LogLevel logLevel() {
+        return logLevel;
+    }
+    
+    public PDataType dataType() {
+        return dataType;
+    }
+    
+    public String columnName() {
+        return name();
+    }
+    
+    public boolean isLoggingEnabled(LogLevel connectionLogLevel){
+        return logLevel() != LogLevel.OFF && (logLevel().ordinal() <= connectionLogLevel.ordinal());
+    }
 
+    public static String getMetricColumnsDetails() {
+        StringBuilder buffer=new StringBuilder();
+        for(MetricType metric:MetricType.values()){
+            if (metric.logLevel() != LogLevel.OFF) {
+                buffer.append(metric.columnName());
+                buffer.append(" ");
+                buffer.append(metric.dataType.getSqlTypeName());
+                buffer.append(",");
+            }
+        }
+        return buffer.toString();
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricUtil.java
new file mode 100644
index 0000000..1e5ac08
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricUtil.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import org.apache.phoenix.log.LogLevel;
+import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric;
+
+public class MetricUtil {
+
+    public static CombinableMetric getCombinableMetric(LogLevel connectionLogLevel, MetricType type) {
+        if (!type.isLoggingEnabled(connectionLogLevel)) { return NoOpRequestMetric.INSTANCE; }
+        return new CombinableMetricImpl(type);
+    }
+
+}