You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2018/05/14 20:33:30 UTC
[1/2] phoenix git commit: PHOENIX-4701 Write client-side metrics
asynchronously to SYSTEM.LOG
Repository: phoenix
Updated Branches:
refs/heads/master 1966edb19 -> 7afaceb7e
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
index 3de2be1..1256f5c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
@@ -27,6 +27,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.phoenix.log.LogLevel;
+
/**
* Queue that tracks various writes/mutations related phoenix request metrics.
*/
@@ -81,12 +83,16 @@ public class MutationMetricQueue {
* Class that holds together the various metrics associated with mutations.
*/
public static class MutationMetric {
- private final CombinableMetric numMutations = new CombinableMetricImpl(MUTATION_BATCH_SIZE);
- private final CombinableMetric mutationsSizeBytes = new CombinableMetricImpl(MUTATION_BYTES);
- private final CombinableMetric totalCommitTimeForMutations = new CombinableMetricImpl(MUTATION_COMMIT_TIME);
- private final CombinableMetric numFailedMutations = new CombinableMetricImpl(MUTATION_BATCH_FAILED_SIZE);
-
- public MutationMetric(long numMutations, long mutationsSizeBytes, long commitTimeForMutations, long numFailedMutations) {
+ private final CombinableMetric numMutations;;
+ private final CombinableMetric mutationsSizeBytes;
+ private final CombinableMetric totalCommitTimeForMutations;
+ private final CombinableMetric numFailedMutations;
+
+ public MutationMetric(LogLevel connectionLogLevel, long numMutations, long mutationsSizeBytes, long commitTimeForMutations, long numFailedMutations) {
+ this.numMutations = MetricUtil.getCombinableMetric(connectionLogLevel,MUTATION_BATCH_SIZE);
+ this.mutationsSizeBytes =MetricUtil.getCombinableMetric(connectionLogLevel,MUTATION_BYTES);
+ this.totalCommitTimeForMutations =MetricUtil.getCombinableMetric(connectionLogLevel,MUTATION_COMMIT_TIME);
+ this.numFailedMutations = MetricUtil.getCombinableMetric(connectionLogLevel,MUTATION_BATCH_FAILED_SIZE);
this.numMutations.change(numMutations);
this.mutationsSizeBytes.change(mutationsSizeBytes);
this.totalCommitTimeForMutations.change(commitTimeForMutations);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
index b995267..3121ecd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
@@ -27,7 +27,7 @@ import static org.apache.phoenix.monitoring.MetricType.WALL_CLOCK_TIME_MS;
import java.util.HashMap;
import java.util.Map;
-import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric;
+import org.apache.phoenix.log.LogLevel;
/**
* Class that represents the overall metrics associated with a query being executed by the phoenix.
@@ -42,16 +42,15 @@ public class OverAllQueryMetrics {
private final CombinableMetric queryFailed;
private final CombinableMetric cacheRefreshedDueToSplits;
- public OverAllQueryMetrics(boolean isMetricsEnabled) {
- queryWatch = new MetricsStopWatch(isMetricsEnabled);
- resultSetWatch = new MetricsStopWatch(isMetricsEnabled);
- numParallelScans = isMetricsEnabled ? new CombinableMetricImpl(NUM_PARALLEL_SCANS) : NoOpRequestMetric.INSTANCE;
- wallClockTimeMS = isMetricsEnabled ? new CombinableMetricImpl(WALL_CLOCK_TIME_MS) : NoOpRequestMetric.INSTANCE;
- resultSetTimeMS = isMetricsEnabled ? new CombinableMetricImpl(RESULT_SET_TIME_MS) : NoOpRequestMetric.INSTANCE;
- queryTimedOut = isMetricsEnabled ? new CombinableMetricImpl(QUERY_TIMEOUT_COUNTER) : NoOpRequestMetric.INSTANCE;
- queryFailed = isMetricsEnabled ? new CombinableMetricImpl(QUERY_FAILED_COUNTER) : NoOpRequestMetric.INSTANCE;
- cacheRefreshedDueToSplits = isMetricsEnabled ? new CombinableMetricImpl(CACHE_REFRESH_SPLITS_COUNTER)
- : NoOpRequestMetric.INSTANCE;
+ public OverAllQueryMetrics(LogLevel connectionLogLevel) {
+ queryWatch = new MetricsStopWatch(WALL_CLOCK_TIME_MS.isLoggingEnabled(connectionLogLevel));
+ resultSetWatch = new MetricsStopWatch(RESULT_SET_TIME_MS.isLoggingEnabled(connectionLogLevel));
+ numParallelScans = MetricUtil.getCombinableMetric(connectionLogLevel, NUM_PARALLEL_SCANS);
+ wallClockTimeMS = MetricUtil.getCombinableMetric(connectionLogLevel, WALL_CLOCK_TIME_MS);
+ resultSetTimeMS = MetricUtil.getCombinableMetric(connectionLogLevel, RESULT_SET_TIME_MS);
+ queryTimedOut = MetricUtil.getCombinableMetric(connectionLogLevel, QUERY_TIMEOUT_COUNTER);
+ queryFailed = MetricUtil.getCombinableMetric(connectionLogLevel, QUERY_FAILED_COUNTER);
+ cacheRefreshedDueToSplits = MetricUtil.getCombinableMetric(connectionLogLevel, CACHE_REFRESH_SPLITS_COUNTER);
}
public void updateNumParallelScans(long numParallelScans) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
index c008635..fa81e9c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
@@ -25,6 +25,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Nonnull;
+import org.apache.phoenix.log.LogLevel;
import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric;
import com.google.common.annotations.VisibleForTesting;
@@ -40,19 +41,23 @@ public class ReadMetricQueue {
private final List<ScanMetricsHolder> scanMetricsHolderList = new ArrayList<ScanMetricsHolder>();
- private final boolean isRequestMetricsEnabled;
- public ReadMetricQueue(boolean isRequestMetricsEnabled) {
- this.isRequestMetricsEnabled = isRequestMetricsEnabled;
+ private LogLevel connectionLogLevel;
+
+ public ReadMetricQueue(LogLevel connectionLogLevel) {
+ this.connectionLogLevel = connectionLogLevel;
}
public CombinableMetric allotMetric(MetricType type, String tableName) {
- if (!isRequestMetricsEnabled) { return NoOpRequestMetric.INSTANCE; }
- MetricKey key = new MetricKey(type, tableName);
- Queue<CombinableMetric> q = getMetricQueue(key);
- CombinableMetric metric = getMetric(type);
- q.offer(metric);
- return metric;
+ if (type.isLoggingEnabled(connectionLogLevel)) {
+ MetricKey key = new MetricKey(type, tableName);
+ Queue<CombinableMetric> q = getMetricQueue(key);
+ CombinableMetric metric = getMetric(type);
+ q.offer(metric);
+ return metric;
+ } else {
+ return NoOpRequestMetric.INSTANCE;
+ }
}
@VisibleForTesting
@@ -173,10 +178,6 @@ public class ReadMetricQueue {
return q;
}
- public boolean isRequestMetricsEnabled() {
- return isRequestMetricsEnabled;
- }
-
public void addScanHolder(ScanMetricsHolder holder){
scanMetricsHolderList.add(holder);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
index 9125cd8..494b3e6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
@@ -34,6 +34,7 @@ import java.util.Map;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.JsonMapper;
+import org.apache.phoenix.log.LogLevel;
public class ScanMetricsHolder {
@@ -52,13 +53,11 @@ public class ScanMetricsHolder {
private Object scan;
private static final ScanMetricsHolder NO_OP_INSTANCE =
- new ScanMetricsHolder(new ReadMetricQueue(false), "",null);
+ new ScanMetricsHolder(new ReadMetricQueue(LogLevel.OFF), "",null);
public static ScanMetricsHolder getInstance(ReadMetricQueue readMetrics, String tableName,
- Scan scan, boolean isRequestMetricsEnabled) {
- if (!isRequestMetricsEnabled) {
- return NO_OP_INSTANCE;
- }
+ Scan scan, LogLevel connectionLogLevel) {
+ if (connectionLogLevel == LogLevel.OFF) { return NO_OP_INSTANCE; }
scan.setScanMetricsEnabled(true);
return new ScanMetricsHolder(readMetrics, tableName, scan);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
index 4373887..699982f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.monitoring;
+import org.apache.phoenix.log.LogLevel;
/**
* Class that encapsulates the various metrics associated with the spooling done by phoenix as part of servicing a
@@ -26,7 +27,7 @@ public class SpoolingMetricsHolder {
private final CombinableMetric spoolFileSizeMetric;
private final CombinableMetric numSpoolFileMetric;
- public static final SpoolingMetricsHolder NO_OP_INSTANCE = new SpoolingMetricsHolder(new ReadMetricQueue(false), "");
+ public static final SpoolingMetricsHolder NO_OP_INSTANCE = new SpoolingMetricsHolder(new ReadMetricQueue(LogLevel.OFF), "");
public SpoolingMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
this.spoolFileSizeMetric = readMetrics.allotMetric(MetricType.SPOOL_FILE_SIZE, tableName);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
index 98ff57c..6117b40 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
@@ -23,6 +23,8 @@ import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
import static org.apache.phoenix.monitoring.MetricType.TASK_QUEUE_WAIT_TIME;
import static org.apache.phoenix.monitoring.MetricType.TASK_REJECTED_COUNTER;
+import org.apache.phoenix.log.LogLevel;
+
/**
* Class to encapsulate the various metrics associated with submitting and executing a task to the phoenix client
@@ -35,7 +37,7 @@ public class TaskExecutionMetricsHolder {
private final CombinableMetric taskExecutionTime;
private final CombinableMetric numTasks;
private final CombinableMetric numRejectedTasks;
- public static final TaskExecutionMetricsHolder NO_OP_INSTANCE = new TaskExecutionMetricsHolder(new ReadMetricQueue(false), "");
+ public static final TaskExecutionMetricsHolder NO_OP_INSTANCE = new TaskExecutionMetricsHolder(new ReadMetricQueue(LogLevel.OFF), "");
public TaskExecutionMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
taskQueueWaitTime = readMetrics.allotMetric(TASK_QUEUE_WAIT_TIME, tableName);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 1d3feed..8b7b708 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -2471,7 +2471,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// Available for testing
protected String getLogTableDDL() {
- return QueryConstants.CREATE_LOG_METADATA;
+ return setSystemLogDDLProperties(QueryConstants.CREATE_LOG_METADATA);
+ }
+
+ private String setSystemLogDDLProperties(String ddl) {
+ return String.format(ddl, props.getInt(LOG_SALT_BUCKETS_ATTRIB, QueryServicesOptions.DEFAULT_LOG_SALT_BUCKETS));
+
}
private String setSystemDDLProperties(String ddl) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 3531a87..a7fad8e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -164,7 +164,12 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
}
protected String getLogTableDDL() {
- return QueryConstants.CREATE_LOG_METADATA;
+ return setSystemLogDDLProperties(QueryConstants.CREATE_LOG_METADATA);
+ }
+
+ private String setSystemLogDDLProperties(String ddl) {
+ return String.format(ddl, props.getInt(LOG_SALT_BUCKETS_ATTRIB, QueryServicesOptions.DEFAULT_LOG_SALT_BUCKETS));
+
}
private String setSystemDDLProperties(String ddl) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 65806ae..52abed0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -18,112 +18,7 @@
package org.apache.phoenix.query;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARG_POSITION;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BASE_COLUMN_COUNT;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BIND_PARAMETERS;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BUFFER_LENGTH;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CACHE_SIZE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHAR_OCTET_LENGTH;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLIENT_IP;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CURRENT_VALUE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXCEPTION_TRACE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXPLAIN_PLAN;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GLOBAL_SCAN_DETAILS;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POST_KEY;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_AUTOINCREMENT;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NULLABLE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NO_OF_RESULTS_ITERATED;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_PREC_RADIX;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_ID;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_STATUS;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REF_GENERATION;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REMARKS;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCAN_METRICS_JSON;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCOPE_CATALOG;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCOPE_SCHEMA;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCOPE_TABLE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SELF_REFERENCING_COL_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SEQUENCE_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SEQUENCE_SCHEMA;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SOURCE_DATA_TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SQL_DATA_TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SQL_DATETIME_SUB;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TOTAL_EXECUTION_TIME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTION_PROVIDER;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_SEQUENCE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USE_STATS_FOR_PARALLELIZATION;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.*;
import java.math.BigDecimal;
@@ -134,9 +29,11 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.schema.MetaDataSplitPolicy;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.SystemFunctionSplitPolicy;
@@ -412,28 +309,31 @@ public interface QueryConstants {
PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
public static final String CREATE_LOG_METADATA =
- "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"(\n" +
+ "CREATE IMMUTABLE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"(\n" +
// Pk columns
+ START_TIME + " DECIMAL, \n" +
+ TABLE_NAME + " VARCHAR, \n" +
+ QUERY_ID + " VARCHAR NOT NULL,\n" +
TENANT_ID + " VARCHAR ," +
- QUERY_ID + " VARCHAR NOT NULL,\n" +
USER + " VARCHAR , \n" +
CLIENT_IP + " VARCHAR, \n" +
// Function metadata (will be null for argument row)
QUERY + " VARCHAR, \n" +
EXPLAIN_PLAN + " VARCHAR, \n" +
// Argument metadata (will be null for function row)
- START_TIME + " TIMESTAMP, \n" +
- TOTAL_EXECUTION_TIME + " BIGINT, \n" +
NO_OF_RESULTS_ITERATED + " BIGINT, \n" +
QUERY_STATUS + " VARCHAR, \n" +
EXCEPTION_TRACE + " VARCHAR, \n" +
GLOBAL_SCAN_DETAILS + " VARCHAR, \n" +
BIND_PARAMETERS + " VARCHAR, \n" +
SCAN_METRICS_JSON + " VARCHAR, \n" +
- " CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (QUERY_ID))\n" +
+ MetricType.getMetricColumnsDetails()+"\n"+
+ " CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (START_TIME, TABLE_NAME, QUERY_ID))\n" +
+ PhoenixDatabaseMetaData.SALT_BUCKETS + "=%s,\n"+
PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE+ ",\n" +
HColumnDescriptor.TTL + "=" + MetaDataProtocol.DEFAULT_LOG_TTL+",\n"+
- PhoenixDatabaseMetaData.COLUMN_ENCODED_BYTES +" = 0";
+ TableProperty.IMMUTABLE_STORAGE_SCHEME.toString() + " = " + ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS.name() + ",\n" +
+ TableProperty.COLUMN_ENCODED_BYTES.toString()+" = 1";
public static final byte[] OFFSET_FAMILY = "f_offset".getBytes();
public static final byte[] OFFSET_COLUMN = "c_offset".getBytes();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index db0b10b..559d165 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -197,6 +197,7 @@ public interface QueryServices extends SQLCloseable {
public static final String COMMIT_STATS_ASYNC = "phoenix.stats.commit.async";
// Maximum size in bytes taken up by cached table stats in the client
public static final String STATS_MAX_CACHE_SIZE = "phoenix.stats.cache.maxSize";
+ public static final String LOG_SALT_BUCKETS_ATTRIB = "phoenix.log.saltBuckets";
public static final String SEQUENCE_SALT_BUCKETS_ATTRIB = "phoenix.sequence.saltBuckets";
public static final String COPROCESSOR_PRIORITY_ATTRIB = "phoenix.coprocessor.priority";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 0e6e89f..d708785 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -356,6 +356,7 @@ public class QueryServicesOptions {
public static final boolean DEFAULT_COST_BASED_OPTIMIZER_ENABLED = false;
public static final String DEFAULT_LOGGING_LEVEL = LogLevel.OFF.name();
public static final String DEFAULT_LOG_SAMPLE_RATE = "1.0";
+ public static final int DEFAULT_LOG_SALT_BUCKETS = 32;
private final Configuration config;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 20ac732..8f14680 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -2286,7 +2286,7 @@ public class MetaDataClient {
}
}
// System tables have hard-coded column qualifiers. So we can't use column encoding for them.
- else if (!SchemaUtil.isSystemTable(Bytes.toBytes(SchemaUtil.getTableName(schemaName, tableName)))) {
+ else if (!SchemaUtil.isSystemTable(Bytes.toBytes(SchemaUtil.getTableName(schemaName, tableName)))|| SchemaUtil.isLogTable(schemaName, tableName)) {
/*
* Indexes inherit the storage scheme of the parent data tables. Otherwise, we always attempt to
* create tables with encoded column names.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index 9d2e53c..94cbfea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -436,7 +436,7 @@ public final class QueryUtil {
}
return url;
}
-
+
private static int getInt(String key, int defaultValue, Properties props, Configuration conf) {
if (conf == null) {
Preconditions.checkNotNull(props);
@@ -487,5 +487,10 @@ public final class QueryUtil {
public static String getViewPartitionClause(String partitionColumnName, long autoPartitionNum) {
return partitionColumnName + " " + toSQL(CompareOp.EQUAL) + " " + autoPartitionNum;
}
+
+ public static Connection getConnectionForQueryLog(Configuration config) throws ClassNotFoundException, SQLException {
+ //we don't need this connection to upgrade anything or start dispatcher
+ return getConnectionOnServer(config);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 92a2cde..dd00a69 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -1170,6 +1170,10 @@ public class SchemaUtil {
Cell isNamespaceMappedCell = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, IS_NAMESPACE_MAPPED_BYTES);
return isNamespaceMappedCell!=null && (boolean) PBoolean.INSTANCE.toObject(isNamespaceMappedCell.getValue());
}
+
+ public static boolean isLogTable(String schemaName, String tableName) {
+ return PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA.equals(schemaName) && PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE.equals(tableName);
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
index e0a731d..d858e72 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
@@ -24,10 +24,12 @@ import java.util.Arrays;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.log.LogLevel;
import org.apache.phoenix.memory.DelegatingMemoryManager;
import org.apache.phoenix.memory.GlobalMemoryManager;
import org.apache.phoenix.memory.MemoryManager;
import org.apache.phoenix.monitoring.MemoryMetricsHolder;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
import org.apache.phoenix.monitoring.SpoolingMetricsHolder;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
@@ -54,7 +56,10 @@ public class SpoolingResultIteratorTest {
};
MemoryManager memoryManager = new DelegatingMemoryManager(new GlobalMemoryManager(threshold));
- ResultIterator scanner = new SpoolingResultIterator(SpoolingMetricsHolder.NO_OP_INSTANCE, MemoryMetricsHolder.NO_OP_INSTANCE, iterator, memoryManager, threshold, maxSizeSpool,"/tmp");
+ ResultIterator scanner = new SpoolingResultIterator(
+ SpoolingMetricsHolder.NO_OP_INSTANCE,
+ new MemoryMetricsHolder(new ReadMetricQueue(LogLevel.OFF), ""), iterator, memoryManager, threshold,
+ maxSizeSpool, "/tmp");
AssertResults.assertResults(scanner, expectedResults);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
index c10c4d1..2ff8aca 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
@@ -112,11 +112,10 @@ public class PhoenixRecordReader<T extends DBWritable> implements
String tableName = queryPlan.getTableRef().getTable().getPhysicalName().getString();
long renewScannerLeaseThreshold = queryPlan.getContext().getConnection()
.getQueryServices().getRenewLeaseThresholdMilliSeconds();
- boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled();
for (Scan scan : scans) {
scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes
.toBytes(true));
- ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, tableName, scan, isRequestMetricsEnabled);
+ ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, tableName, scan, ctx.getConnection().getLogLevel());
final TableResultIterator tableResultIterator = new TableResultIterator(
queryPlan.getContext().getConnection().getMutationState(), scan, scanMetricsHolder,
renewScannerLeaseThreshold, queryPlan, MapReduceParallelScanGrouper.getInstance());
[2/2] phoenix git commit: PHOENIX-4701 Write client-side metrics
asynchronously to SYSTEM.LOG
Posted by an...@apache.org.
PHOENIX-4701 Write client-side metrics asynchronously to SYSTEM.LOG
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7afaceb7
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7afaceb7
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7afaceb7
Branch: refs/heads/master
Commit: 7afaceb7e7355e59ae9465a02b812b230fc58edd
Parents: 1966edb
Author: Ankit Singhal <an...@gmail.com>
Authored: Mon May 14 13:33:22 2018 -0700
Committer: Ankit Singhal <an...@gmail.com>
Committed: Mon May 14 13:33:22 2018 -0700
----------------------------------------------------------------------
bin/hbase-site.xml | 4 +
.../apache/phoenix/end2end/QueryLoggerIT.java | 76 ++++++----
.../phoenix/monitoring/PhoenixMetricsIT.java | 28 ++--
.../phoenix/compile/StatementContext.java | 4 +-
.../apache/phoenix/execute/MutationState.java | 4 +-
.../phoenix/iterate/ChunkedResultIterator.java | 2 +-
.../phoenix/iterate/ParallelIterators.java | 3 +-
.../apache/phoenix/iterate/SerialIterators.java | 3 +-
.../apache/phoenix/jdbc/PhoenixConnection.java | 10 +-
.../apache/phoenix/jdbc/PhoenixResultSet.java | 54 ++++---
.../apache/phoenix/jdbc/PhoenixStatement.java | 21 ++-
.../java/org/apache/phoenix/log/LogLevel.java | 2 +-
.../java/org/apache/phoenix/log/LogWriter.java | 6 +-
.../org/apache/phoenix/log/QueryLogInfo.java | 38 +++--
.../org/apache/phoenix/log/QueryLogState.java | 22 ---
.../org/apache/phoenix/log/QueryLogger.java | 74 +++++++---
.../org/apache/phoenix/log/QueryLoggerUtil.java | 62 ++++----
.../org/apache/phoenix/log/QueryStatus.java | 22 +++
.../org/apache/phoenix/log/RingBufferEvent.java | 38 +++--
.../phoenix/log/RingBufferEventTranslator.java | 21 ++-
.../org/apache/phoenix/log/TableLogWriter.java | 144 +++++++++++--------
.../phoenix/mapreduce/PhoenixRecordReader.java | 13 +-
.../phoenix/monitoring/MemoryMetricsHolder.java | 1 -
.../apache/phoenix/monitoring/MetricType.java | 123 ++++++++++------
.../apache/phoenix/monitoring/MetricUtil.java | 30 ++++
.../phoenix/monitoring/MutationMetricQueue.java | 18 ++-
.../phoenix/monitoring/OverAllQueryMetrics.java | 21 ++-
.../phoenix/monitoring/ReadMetricQueue.java | 27 ++--
.../phoenix/monitoring/ScanMetricsHolder.java | 9 +-
.../monitoring/SpoolingMetricsHolder.java | 3 +-
.../monitoring/TaskExecutionMetricsHolder.java | 4 +-
.../query/ConnectionQueryServicesImpl.java | 7 +-
.../query/ConnectionlessQueryServicesImpl.java | 7 +-
.../apache/phoenix/query/QueryConstants.java | 124 ++--------------
.../org/apache/phoenix/query/QueryServices.java | 1 +
.../phoenix/query/QueryServicesOptions.java | 1 +
.../apache/phoenix/schema/MetaDataClient.java | 2 +-
.../java/org/apache/phoenix/util/QueryUtil.java | 7 +-
.../org/apache/phoenix/util/SchemaUtil.java | 4 +
.../iterate/SpoolingResultIteratorTest.java | 7 +-
.../hive/mapreduce/PhoenixRecordReader.java | 3 +-
41 files changed, 570 insertions(+), 480 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/bin/hbase-site.xml
----------------------------------------------------------------------
diff --git a/bin/hbase-site.xml b/bin/hbase-site.xml
index 0ab9fd8..2f360e2 100644
--- a/bin/hbase-site.xml
+++ b/bin/hbase-site.xml
@@ -24,4 +24,8 @@
<name>hbase.regionserver.wal.codec</name>
<value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>
+ <property>
+ <name>phoenix.log.level</name>
+ <value>DEBUG</value>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java
index 940ba6f..618d7d9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java
@@ -31,7 +31,6 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TOTAL_EXECUTION_TIME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -53,8 +52,10 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.log.LogLevel;
-import org.apache.phoenix.log.QueryLogState;
+import org.apache.phoenix.log.QueryStatus;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.EnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.BeforeClass;
@@ -77,6 +78,19 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
DriverManager.registerDriver(PhoenixDriver.INSTANCE);
}
+ private static class MyClock extends EnvironmentEdge {
+ public volatile long time;
+
+ public MyClock (long time) {
+ this.time = time;
+ }
+
+ @Override
+ public long currentTime() {
+ return time;
+ }
+ }
+
@Test
public void testDebugLogs() throws Exception {
@@ -97,12 +111,13 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
ResultSet explainRS = conn.createStatement().executeQuery("Explain " + query);
String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"";
- rs = conn.createStatement().executeQuery(logQuery);
- boolean foundQueryLog = false;
int delay = 5000;
// sleep for sometime to let query log committed
Thread.sleep(delay);
+ rs = conn.createStatement().executeQuery(logQuery);
+ boolean foundQueryLog = false;
+
while (rs.next()) {
if (rs.getString(QUERY_ID).equals(queryId)) {
foundQueryLog = true;
@@ -113,11 +128,9 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
assertEquals(rs.getString(GLOBAL_SCAN_DETAILS), context.getScan().toJSON());
assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 10);
assertEquals(rs.getString(QUERY), query);
- assertEquals(rs.getString(QUERY_STATUS), QueryLogState.COMPLETED.toString());
- assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay);
+ assertEquals(rs.getString(QUERY_STATUS), QueryStatus.COMPLETED.toString());
assertEquals(rs.getString(TENANT_ID), null);
- assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null);
- assertTrue(rs.getString(SCAN_METRICS_JSON).contains("scanMetrics"));
+ assertTrue(rs.getString(SCAN_METRICS_JSON)==null);
assertEquals(rs.getString(EXCEPTION_TRACE),null);
}else{
//confirm we are not logging system queries
@@ -140,7 +153,10 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
String query = "SELECT * FROM " + tableName;
int count=100;
for (int i = 0; i < count; i++) {
- conn.createStatement().executeQuery(query);
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ while(rs.next()){
+
+ }
}
String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"";
@@ -178,12 +194,12 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
}
String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"";
- rs = conn.createStatement().executeQuery(logQuery);
- boolean foundQueryLog = false;
int delay = 5000;
// sleep for sometime to let query log committed
Thread.sleep(delay);
+ rs = conn.createStatement().executeQuery(logQuery);
+ boolean foundQueryLog = false;
while (rs.next()) {
if (rs.getString(QUERY_ID).equals(queryId)) {
foundQueryLog = true;
@@ -191,12 +207,10 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
assertEquals(rs.getString(CLIENT_IP), InetAddress.getLocalHost().getHostAddress());
assertEquals(rs.getString(EXPLAIN_PLAN), null);
assertEquals(rs.getString(GLOBAL_SCAN_DETAILS),null);
- assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 0);
+ assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 10);
assertEquals(rs.getString(QUERY), query);
- assertEquals(rs.getString(QUERY_STATUS),null);
- assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay);
+ assertEquals(rs.getString(QUERY_STATUS),QueryStatus.COMPLETED.toString());
assertEquals(rs.getString(TENANT_ID), null);
- assertTrue(rs.getString(TOTAL_EXECUTION_TIME) == null);
}
}
assertTrue(foundQueryLog);
@@ -222,12 +236,12 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
}
String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"";
- rs = conn.createStatement().executeQuery(logQuery);
- boolean foundQueryLog = false;
int delay = 5000;
// sleep for sometime to let query log committed
Thread.sleep(delay);
+ rs = conn.createStatement().executeQuery(logQuery);
+ boolean foundQueryLog = false;
while (rs.next()) {
if (rs.getString(QUERY_ID).equals(queryId)) {
foundQueryLog = true;
@@ -255,7 +269,9 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
props.setProperty(QueryServices.LOG_LEVEL, loglevel.name());
Connection conn = DriverManager.getConnection(getUrl(),props);
assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),loglevel);
-
+ final MyClock clock = new MyClock(100);
+ EnvironmentEdgeManager.injectEdge(clock);
+ try{
String query = "SELECT * FROM " + tableName +" where V = ?";
PreparedStatement pstmt = conn.prepareStatement(query);
@@ -270,12 +286,12 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
ResultSet explainRS = conn.createStatement()
.executeQuery("Explain " + "SELECT * FROM " + tableName + " where V = 'value5'");
String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"";
- rs = conn.createStatement().executeQuery(logQuery);
- boolean foundQueryLog = false;
int delay = 5000;
-
+
// sleep for sometime to let query log committed
Thread.sleep(delay);
+ rs = conn.createStatement().executeQuery(logQuery);
+ boolean foundQueryLog = false;
while (rs.next()) {
if (rs.getString(QUERY_ID).equals(queryId)) {
foundQueryLog = true;
@@ -286,14 +302,18 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
assertEquals(rs.getString(GLOBAL_SCAN_DETAILS), context.getScan().toJSON());
assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 1);
assertEquals(rs.getString(QUERY), query);
- assertEquals(rs.getString(QUERY_STATUS), QueryLogState.COMPLETED.toString());
- assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay);
+ assertEquals(rs.getString(QUERY_STATUS), QueryStatus.COMPLETED.toString());
+ assertTrue(LogLevel.TRACE == loglevel ? rs.getString(SCAN_METRICS_JSON).contains("scanMetrics")
+ : rs.getString(SCAN_METRICS_JSON) == null);
+ assertEquals(rs.getTimestamp(START_TIME).getTime(),100);
assertEquals(rs.getString(TENANT_ID), null);
- assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null);
}
}
assertTrue(foundQueryLog);
conn.close();
+ }finally{
+ EnvironmentEdgeManager.injectEdge(null);
+ }
}
@@ -315,14 +335,14 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
assertEquals(e.getErrorCode(), SQLExceptionCode.TABLE_UNDEFINED.getErrorCode());
}
String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"";
- ResultSet rs = conn.createStatement().executeQuery(logQuery);
- boolean foundQueryLog = false;
int delay = 5000;
// sleep for sometime to let query log committed
Thread.sleep(delay);
+ ResultSet rs = conn.createStatement().executeQuery(logQuery);
+ boolean foundQueryLog = false;
while (rs.next()) {
- if (QueryLogState.FAILED.name().equals(rs.getString(QUERY_STATUS))) {
+ if (QueryStatus.FAILED.name().equals(rs.getString(QUERY_STATUS))) {
foundQueryLog = true;
assertEquals(rs.getString(USER), System.getProperty("user.name"));
assertEquals(rs.getString(CLIENT_IP), InetAddress.getLocalHost().getHostAddress());
@@ -331,8 +351,6 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 0);
assertEquals(rs.getString(QUERY), query);
assertTrue(rs.getString(EXCEPTION_TRACE).contains(SQLExceptionCode.TABLE_UNDEFINED.getMessage()));
- assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay);
- assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null);
}
}
assertTrue(foundQueryLog);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index e45ddcd..73cdf0a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -10,6 +10,10 @@
package org.apache.phoenix.monitoring;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_REGION_SERVER_RESULTS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_CALLS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_SCANNED_REGIONS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_FAILED_COUNT;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_SIZE;
@@ -29,19 +33,6 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQ
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FILE_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_END_TO_END_TIME;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTION_TIME;
-
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_CALLS;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_REMOTE_RPC_CALLS;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_NOT_SERVING_REGION_EXCEPTION;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_REGION_SERVER_RESULTS;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_IN_REMOTE_RESULTS;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_SCANNED_REGIONS;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_RETRIES;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_REMOTE_RPC_RETRIES;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_ROWS_SCANNED;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_ROWS_FILTERED;
-
import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
@@ -73,11 +64,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.LoggingPhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.jdbc.PhoenixMetricsLog;
import org.apache.phoenix.jdbc.PhoenixResultSet;
-import org.apache.phoenix.jdbc.LoggingPhoenixConnection;
+import org.apache.phoenix.log.LogLevel;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -109,6 +101,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
// disable renewing leases as this will force spooling to happen.
props.put(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false));
+ props.put(QueryServices.LOG_LEVEL, LogLevel.DEBUG.toString());
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
// need the non-test driver for some tests that check number of hconnections, etc.
DriverManager.registerDriver(PhoenixDriver.INSTANCE);
@@ -377,6 +370,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
insertRowsInTable(tableName, numRows);
Properties props = new Properties();
props.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, "false");
+ props.setProperty(QueryServices.LOG_LEVEL, LogLevel.OFF.name());
Connection conn = DriverManager.getConnection(getUrl(), props);
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
while (rs.next()) {}
@@ -706,7 +700,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
private void changeInternalStateForTesting(PhoenixResultSet rs) {
// get and set the internal state for testing purposes.
- ReadMetricQueue testMetricsQueue = new TestReadMetricsQueue(true);
+ ReadMetricQueue testMetricsQueue = new TestReadMetricsQueue(LogLevel.DEBUG);
StatementContext ctx = (StatementContext)Whitebox.getInternalState(rs, "context");
Whitebox.setInternalState(ctx, "readMetricsQueue", testMetricsQueue);
Whitebox.setInternalState(rs, "readMetricsQueue", testMetricsQueue);
@@ -772,8 +766,8 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
private class TestReadMetricsQueue extends ReadMetricQueue {
- public TestReadMetricsQueue(boolean isRequestMetricsEnabled) {
- super(isRequestMetricsEnabled);
+ public TestReadMetricsQueue(LogLevel connectionLogLevel) {
+ super(connectionLogLevel);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index 3ea5dd5..4358ee3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -135,8 +135,8 @@ public class StatementContext {
this.dataColumns = this.currentTable == null ? Collections.<PColumn, Integer> emptyMap() : Maps
.<PColumn, Integer> newLinkedHashMap();
this.subqueryResults = Maps.<SelectStatement, Object> newHashMap();
- this.readMetricsQueue = new ReadMetricQueue(isRequestMetricsEnabled);
- this.overAllQueryMetrics = new OverAllQueryMetrics(isRequestMetricsEnabled);
+ this.readMetricsQueue = new ReadMetricQueue(connection.getLogLevel());
+ this.overAllQueryMetrics = new OverAllQueryMetrics(connection.getLogLevel());
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 18f4fea..f3a383e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -39,8 +39,8 @@ import javax.annotation.concurrent.Immutable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -1181,7 +1181,7 @@ public class MutationState implements SQLCloseable {
numFailedMutations = uncommittedStatementIndexes.length;
GLOBAL_MUTATION_BATCH_FAILED_COUNT.update(numFailedMutations);
} finally {
- MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime, numFailedMutations);
+ MutationMetric mutationsMetric = new MutationMetric(connection.getLogLevel(),numMutations, mutationSizeBytes, mutationCommitTime, numFailedMutations);
mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric);
try {
if (cache!=null)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index 8595fd4..acb6c04 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -147,7 +147,7 @@ public class ChunkedResultIterator implements PeekingResultIterator {
String tableName = tableRef.getTable().getPhysicalName().getString();
ReadMetricQueue readMetrics = context.getReadMetricsQueue();
ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, tableName, scan,
- readMetrics.isRequestMetricsEnabled());
+ context.getConnection().getLogLevel());
long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
ResultIterator singleChunkResultIterator =
new SingleChunkResultIterator(new TableResultIterator(mutationState, scan,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 41d278d..262ae44 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -107,11 +107,10 @@ public class ParallelIterators extends BaseResultIterators {
context.getOverallQueryMetrics().updateNumParallelScans(numScans);
GLOBAL_NUM_PARALLEL_SCANS.update(numScans);
final long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
- boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled();
for (final ScanLocator scanLocation : scanLocations) {
final Scan scan = scanLocation.getScan();
final ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, physicalTableName,
- scan, isRequestMetricsEnabled);
+ scan, context.getConnection().getLogLevel());
final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(readMetrics, physicalTableName);
final TableResultIterator tableResultItr =
context.getConnection().getTableResultIteratorFactory().newIterator(
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index c13fcdb..1693421 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -171,7 +171,6 @@ public class SerialIterators extends BaseResultIterators {
return EMPTY_ITERATOR;
}
ReadMetricQueue readMetrics = context.getReadMetricsQueue();
- boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled();
while (index < scans.size()) {
Scan currentScan = scans.get(index++);
if (remainingOffset != null) {
@@ -179,7 +178,7 @@ public class SerialIterators extends BaseResultIterators {
}
ScanMetricsHolder scanMetricsHolder =
ScanMetricsHolder.getInstance(readMetrics, tableName, currentScan,
- isRequestMetricsEnabled);
+ context.getConnection().getLogLevel());
TableResultIterator itr =
new TableResultIterator(mutationState, currentScan, scanMetricsHolder,
renewLeaseThreshold, plan, scanGrouper, caches);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index d3626f8..312d17b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -364,9 +364,10 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
function.getTenantId())));
}
};
- this.isRequestLevelMetricsEnabled = JDBCUtil
- .isCollectingRequestLevelMetricsEnabled(url, info,
- this.services.getProps());
+ this.logLevel= LogLevel.valueOf(this.services.getProps().get(QueryServices.LOG_LEVEL,
+ QueryServicesOptions.DEFAULT_LOGGING_LEVEL));
+ this.isRequestLevelMetricsEnabled = JDBCUtil.isCollectingRequestLevelMetricsEnabled(url, info,
+ this.services.getProps());
this.mutationState = mutationState == null ? newMutationState(maxSize,
maxSizeBytes) : new MutationState(mutationState);
this.metaData = metaData;
@@ -380,8 +381,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
this.scannerQueue = new LinkedBlockingQueue<>();
this.tableResultIteratorFactory = new DefaultTableResultIteratorFactory();
this.isRunningUpgrade = isRunningUpgrade;
- this.logLevel= LogLevel.valueOf(this.services.getProps().get(QueryServices.LOG_LEVEL,
- QueryServicesOptions.DEFAULT_LOGGING_LEVEL));
+
this.logSamplingRate = Double.parseDouble(this.services.getProps().get(QueryServices.LOG_SAMPLE_RATE,
QueryServicesOptions.DEFAULT_LOG_SAMPLE_RATE));
GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index 153fa08..84816a0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -51,8 +51,8 @@ import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.log.QueryLogInfo;
-import org.apache.phoenix.log.QueryLogState;
import org.apache.phoenix.log.QueryLogger;
+import org.apache.phoenix.log.QueryStatus;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.monitoring.OverAllQueryMetrics;
import org.apache.phoenix.monitoring.ReadMetricQueue;
@@ -76,8 +76,6 @@ import org.apache.phoenix.util.SQLCloseable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMap.Builder;
@@ -133,9 +131,9 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable {
private Long count = 0L;
- private QueryLogState logStatus = QueryLogState.COMPLETED;
+ private Object exception;
+
- private RuntimeException exception;
public PhoenixResultSet(ResultIterator resultIterator, RowProjector rowProjector, StatementContext ctx) throws SQLException {
this.rowProjector = rowProjector;
@@ -144,7 +142,7 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable {
this.statement = context.getStatement();
this.readMetricsQueue = context.getReadMetricsQueue();
this.overAllQueryMetrics = context.getOverallQueryMetrics();
- this.queryLogger = context.getQueryLogger();
+ this.queryLogger = context.getQueryLogger() != null ? context.getQueryLogger() : QueryLogger.NO_OP_INSTANCE;
}
@Override
@@ -181,6 +179,19 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable {
statement.getResultSets().remove(this);
overAllQueryMetrics.endQuery();
overAllQueryMetrics.stopResultSetWatch();
+ if (!queryLogger.isSynced()) {
+ if(this.exception==null){
+ queryLogger.log(QueryLogInfo.QUERY_STATUS_I,QueryStatus.COMPLETED.toString());
+ }
+ queryLogger.log(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, count);
+ if (queryLogger.isDebugEnabled()) {
+ queryLogger.log(QueryLogInfo.SCAN_METRICS_JSON_I,
+ readMetricsQueue.getScanMetricsHolderList().toString());
+ readMetricsQueue.getScanMetricsHolderList().clear();
+ }
+ // if not already synced , like closing before result set exhausted
+ queryLogger.sync(getReadMetrics(), getOverAllRequestReadMetrics());
+ }
}
}
@@ -799,36 +810,33 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable {
}
rowProjector.reset();
} catch (RuntimeException e) {
- this.logStatus=QueryLogState.FAILED;
// FIXME: Expression.evaluate does not throw SQLException
// so this will unwrap throws from that.
+ queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.FAILED.toString());
+ if (queryLogger.isDebugEnabled()) {
+ queryLogger.log(QueryLogInfo.EXCEPTION_TRACE_I, Throwables.getStackTraceAsString(e));
+ }
this.exception = e;
if (e.getCause() instanceof SQLException) {
throw (SQLException) e.getCause();
}
throw e;
}finally{
- if (currentRow == null && queryLogger != null ) {
+ if (this.exception!=null) {
+ queryLogger.log(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, count);
if (queryLogger.isDebugEnabled()) {
- Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder();
- queryLogBuilder.put(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, count);
- queryLogBuilder.put(QueryLogInfo.TOTAL_EXECUTION_TIME_I,
- System.currentTimeMillis() - queryLogger.getStartTime());
- queryLogBuilder.put(QueryLogInfo.SCAN_METRICS_JSON_I,
+ queryLogger.log(QueryLogInfo.SCAN_METRICS_JSON_I,
readMetricsQueue.getScanMetricsHolderList().toString());
- if (this.exception != null) {
- queryLogBuilder.put(QueryLogInfo.EXCEPTION_TRACE_I,
- Throwables.getStackTraceAsString(this.exception));
- }
readMetricsQueue.getScanMetricsHolderList().clear();
- queryLogger.log(logStatus, queryLogBuilder.build());
+ }
+ if (queryLogger != null) {
+ queryLogger.sync(getReadMetrics(), getOverAllRequestReadMetrics());
}
}
- }
- if (currentRow == null) {
-
- overAllQueryMetrics.endQuery();
- overAllQueryMetrics.stopResultSetWatch();
+ if (currentRow == null) {
+ overAllQueryMetrics.endQuery();
+ overAllQueryMetrics.stopResultSetWatch();
+ }
}
return currentRow != null;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 25b9fb0..015f04c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -93,7 +93,7 @@ import org.apache.phoenix.iterate.MaterializedResultIterator;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.log.QueryLogInfo;
-import org.apache.phoenix.log.QueryLogState;
+import org.apache.phoenix.log.QueryStatus;
import org.apache.phoenix.log.QueryLogger;
import org.apache.phoenix.log.QueryLoggerUtil;
import org.apache.phoenix.optimize.Cost;
@@ -190,8 +190,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.math.IntMath;
@@ -319,10 +317,8 @@ public class PhoenixStatement implements Statement, SQLCloseable {
StatementContext context = plan.getContext();
context.setQueryLogger(queryLogger);
if(queryLogger.isDebugEnabled()){
- Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder();
- queryLogBuilder.put(QueryLogInfo.EXPLAIN_PLAN_I, QueryUtil.getExplainPlan(resultIterator));
- queryLogBuilder.put(QueryLogInfo.GLOBAL_SCAN_DETAILS_I, context.getScan()!=null?context.getScan().toString():null);
- queryLogger.log(QueryLogState.COMPILED, queryLogBuilder.build());
+ queryLogger.log(QueryLogInfo.EXPLAIN_PLAN_I, QueryUtil.getExplainPlan(resultIterator));
+ queryLogger.log(QueryLogInfo.GLOBAL_SCAN_DETAILS_I, context.getScan()!=null?context.getScan().toString():null);
}
context.getOverallQueryMetrics().startQuery();
PhoenixResultSet rs = newResultSet(resultIterator, plan.getProjector(), plan.getContext());
@@ -351,6 +347,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
}
throw e;
}catch (RuntimeException e) {
+
// FIXME: Expression.evaluate does not throw SQLException
// so this will unwrap throws from that.
if (e.getCause() instanceof SQLException) {
@@ -367,11 +364,9 @@ public class PhoenixStatement implements Statement, SQLCloseable {
}, PhoenixContextExecutor.inContext());
}catch (Exception e) {
if (queryLogger.isDebugEnabled()) {
- Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder();
- queryLogBuilder.put(QueryLogInfo.TOTAL_EXECUTION_TIME_I,
- System.currentTimeMillis() - queryLogger.getStartTime());
- queryLogBuilder.put(QueryLogInfo.EXCEPTION_TRACE_I, Throwables.getStackTraceAsString(e));
- queryLogger.log(QueryLogState.FAILED, queryLogBuilder.build());
+ queryLogger.log(QueryLogInfo.EXCEPTION_TRACE_I, Throwables.getStackTraceAsString(e));
+ queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.FAILED.toString());
+ queryLogger.sync(null, null);
}
Throwables.propagateIfInstanceOf(e, SQLException.class);
Throwables.propagate(e);
@@ -1781,7 +1776,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
}
QueryLogger queryLogger = QueryLogger.getInstance(connection,isSystemTable);
QueryLoggerUtil.logInitialDetails(queryLogger, connection.getTenantId(),
- connection.getQueryServices(), sql, queryLogger.getStartTime(), getParameters());
+ connection.getQueryServices(), sql, getParameters());
return queryLogger;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java b/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java
index 5792658..269b4f4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java
@@ -18,5 +18,5 @@
package org.apache.phoenix.log;
public enum LogLevel {
- OFF, INFO, DEBUG, TRACE
+ OFF,INFO, DEBUG, TRACE
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java
index 817f9ec..dab03e7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java
@@ -31,16 +31,18 @@ public interface LogWriter {
* @param event
* @throws SQLException
* @throws IOException
+ * @throws ClassNotFoundException
*/
- void write(RingBufferEvent event) throws SQLException, IOException;
+ void write(RingBufferEvent event) throws SQLException, IOException, ClassNotFoundException;
/**
* will be called when disruptor is getting shutdown
*
* @throws IOException
+ * @throws SQLException
*/
- void close() throws IOException;
+ void close() throws IOException, SQLException;
/**
* if writer is closed and cannot write further event
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java
index 87de267..fb38ba2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java
@@ -28,8 +28,8 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_ID;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_STATUS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCAN_METRICS_JSON;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TOTAL_EXECUTION_TIME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER;
import org.apache.phoenix.schema.types.PDataType;
@@ -40,29 +40,27 @@ import org.apache.phoenix.schema.types.PVarchar;
public enum QueryLogInfo {
- CLIENT_IP_I(CLIENT_IP, QueryLogState.STARTED, LogLevel.INFO, PVarchar.INSTANCE),
- QUERY_I(QUERY,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE),
- BIND_PARAMETERS_I(BIND_PARAMETERS,QueryLogState.STARTED, LogLevel.TRACE,PVarchar.INSTANCE),
- QUERY_ID_I(QUERY_ID,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE),
- TENANT_ID_I(TENANT_ID,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE),
- START_TIME_I(START_TIME,QueryLogState.STARTED, LogLevel.INFO,PTimestamp.INSTANCE),
- USER_I(USER,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE),
- EXPLAIN_PLAN_I(EXPLAIN_PLAN,QueryLogState.COMPILED, LogLevel.DEBUG,PVarchar.INSTANCE),
- GLOBAL_SCAN_DETAILS_I(GLOBAL_SCAN_DETAILS,QueryLogState.COMPILED, LogLevel.DEBUG,PVarchar.INSTANCE),
- NO_OF_RESULTS_ITERATED_I(NO_OF_RESULTS_ITERATED,QueryLogState.COMPLETED, LogLevel.DEBUG,PLong.INSTANCE),
- EXCEPTION_TRACE_I(EXCEPTION_TRACE,QueryLogState.COMPLETED, LogLevel.DEBUG,PVarchar.INSTANCE),
- QUERY_STATUS_I(QUERY_STATUS,QueryLogState.COMPLETED, LogLevel.DEBUG,PVarchar.INSTANCE),
- TOTAL_EXECUTION_TIME_I(TOTAL_EXECUTION_TIME,QueryLogState.COMPLETED, LogLevel.DEBUG,PLong.INSTANCE),
- SCAN_METRICS_JSON_I(SCAN_METRICS_JSON,QueryLogState.COMPLETED, LogLevel.DEBUG,PVarchar.INSTANCE);
+ CLIENT_IP_I(CLIENT_IP, LogLevel.INFO, PVarchar.INSTANCE),
+ QUERY_I(QUERY, LogLevel.INFO,PVarchar.INSTANCE),
+ BIND_PARAMETERS_I(BIND_PARAMETERS, LogLevel.TRACE,PVarchar.INSTANCE),
+ QUERY_ID_I(QUERY_ID, LogLevel.INFO,PVarchar.INSTANCE),
+ TENANT_ID_I(TENANT_ID, LogLevel.INFO,PVarchar.INSTANCE),
+ START_TIME_I(START_TIME, LogLevel.INFO,PTimestamp.INSTANCE),
+ USER_I(USER, LogLevel.INFO,PVarchar.INSTANCE),
+ EXPLAIN_PLAN_I(EXPLAIN_PLAN,LogLevel.DEBUG,PVarchar.INSTANCE),
+ GLOBAL_SCAN_DETAILS_I(GLOBAL_SCAN_DETAILS, LogLevel.DEBUG,PVarchar.INSTANCE),
+ NO_OF_RESULTS_ITERATED_I(NO_OF_RESULTS_ITERATED, LogLevel.INFO,PLong.INSTANCE),
+ EXCEPTION_TRACE_I(EXCEPTION_TRACE, LogLevel.DEBUG,PVarchar.INSTANCE),
+ QUERY_STATUS_I(QUERY_STATUS, LogLevel.INFO,PVarchar.INSTANCE),
+ SCAN_METRICS_JSON_I(SCAN_METRICS_JSON, LogLevel.TRACE,PVarchar.INSTANCE),
+ TABLE_NAME_I(TABLE_NAME, LogLevel.DEBUG,PVarchar.INSTANCE);
public final String columnName;
- public final QueryLogState logState;
public final LogLevel logLevel;
public final PDataType dataType;
- private QueryLogInfo(String columnName, QueryLogState logState, LogLevel logLevel, PDataType dataType) {
+ private QueryLogInfo(String columnName, LogLevel logLevel, PDataType dataType) {
this.columnName = columnName;
- this.logState=logState;
this.logLevel=logLevel;
this.dataType=dataType;
}
@@ -71,10 +69,6 @@ public enum QueryLogInfo {
return columnName;
}
- public QueryLogState getLogState() {
- return logState;
- }
-
public LogLevel getLogLevel() {
return logLevel;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java
deleted file mode 100644
index e27f0e8..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.log;
-
-public enum QueryLogState {
- STARTED, PLAN, COMPILED, EXECUTION, COMPLETED,FAILED
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java
index b2fb235..ef5559c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java
@@ -17,13 +17,17 @@
*/
package org.apache.phoenix.log;
+import java.util.Map;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
import io.netty.util.internal.ThreadLocalRandom;
@@ -34,15 +38,17 @@ public class QueryLogger {
private final ThreadLocal<RingBufferEventTranslator> threadLocalTranslator = new ThreadLocal<>();
private QueryLoggerDisruptor queryDisruptor;
private String queryId;
- private Long startTime;
private LogLevel logLevel;
+ private Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder();
+ private boolean isSynced;
private static final Log LOG = LogFactory.getLog(QueryLoggerDisruptor.class);
private QueryLogger(PhoenixConnection connection) {
this.queryId = UUID.randomUUID().toString();
this.queryDisruptor = connection.getQueryServices().getQueryDisruptor();
- this.startTime = System.currentTimeMillis();
logLevel = connection.getLogLevel();
+ log(QueryLogInfo.QUERY_ID_I, queryId);
+ log(QueryLogInfo.START_TIME_I, EnvironmentEdgeManager.currentTimeMillis());
}
private QueryLogger() {
@@ -58,21 +64,32 @@ public class QueryLogger {
return result;
}
- private static final QueryLogger NO_OP_INSTANCE = new QueryLogger() {
+ public static final QueryLogger NO_OP_INSTANCE = new QueryLogger() {
@Override
- public void log(QueryLogState logState, ImmutableMap<QueryLogInfo, Object> map) {
+ public void log(QueryLogInfo queryLogInfo, Object info) {
}
-
+
@Override
- public boolean isDebugEnabled(){
+ public boolean isDebugEnabled() {
return false;
}
-
+
@Override
- public boolean isInfoEnabled(){
+ public boolean isInfoEnabled() {
return false;
}
+
+ @Override
+ public void sync(
+ Map<String, Map<MetricType, Long>> readMetrics, Map<MetricType, Long> overAllMetrics) {
+
+ }
+
+ @Override
+ public boolean isSynced(){
+ return true;
+ }
};
public static QueryLogger getInstance(PhoenixConnection connection, boolean isSystemTable) {
@@ -82,14 +99,14 @@ public class QueryLogger {
}
/**
- * Add query log in the table, columns will be logged depending upon the connection logLevel
- * @param logState State of the query
- * @param map Value of the map should be in format of the corresponding data type
+ * Add query log in the table, columns will be logged depending upon the connection logLevel
*/
- public void log(QueryLogState logState, ImmutableMap<QueryLogInfo, Object> map) {
- final RingBufferEventTranslator translator = getCachedTranslator();
- translator.setQueryInfo(logState, map, logLevel);
- publishLogs(translator);
+ public void log(QueryLogInfo queryLogInfo, Object info) {
+ try {
+ queryLogBuilder.put(queryLogInfo, info);
+ } catch (Exception e) {
+ LOG.warn("Unable to add log info because of " + e.getMessage());
+ }
}
private boolean publishLogs(RingBufferEventTranslator translator) {
@@ -102,13 +119,6 @@ public class QueryLogger {
}
/**
- * Start time when the logger was started, if {@link LogLevel#OFF} then it's the current time
- */
- public Long getStartTime() {
- return startTime != null ? startTime : System.currentTimeMillis();
- }
-
- /**
* Is debug logging currently enabled?
* Call this method to prevent having to perform expensive operations (for example, String concatenation) when the log level is more than debug.
*/
@@ -117,7 +127,8 @@ public class QueryLogger {
}
private boolean isLevelEnabled(LogLevel logLevel){
- return this.logLevel != null ? logLevel.ordinal() <= this.logLevel.ordinal() : false;
+ return this.logLevel != null && logLevel != LogLevel.OFF ? logLevel.ordinal() <= this.logLevel.ordinal()
+ : false;
}
/**
@@ -142,4 +153,21 @@ public class QueryLogger {
return this.queryId;
}
+
+ public void sync(Map<String, Map<MetricType, Long>> readMetrics, Map<MetricType, Long> overAllMetrics) {
+ if (!isSynced) {
+ isSynced = true;
+ final RingBufferEventTranslator translator = getCachedTranslator();
+ translator.setQueryInfo(logLevel, queryLogBuilder.build(), readMetrics, overAllMetrics);
+ publishLogs(translator);
+ }
+ }
+
+ /**
+ * Is Synced already
+ */
+ public boolean isSynced(){
+ return this.isSynced;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java
index d5c4878..21917ed 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java
@@ -25,48 +25,36 @@ import org.apache.commons.lang.StringUtils;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.schema.PName;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMap.Builder;
-
public class QueryLoggerUtil {
+
public static void logInitialDetails(QueryLogger queryLogger, PName tenantId, ConnectionQueryServices queryServices,
- String query, long startTime, List<Object> bindParameters) {
+ String query, List<Object> bindParameters) {
try {
- queryLogger.log(QueryLogState.STARTED,
- getInitialDetails(tenantId, queryServices, query, startTime, bindParameters));
+ String clientIP;
+ try {
+ clientIP = InetAddress.getLocalHost().getHostAddress();
+ } catch (UnknownHostException e) {
+ clientIP = "UnknownHost";
+ }
+
+ if (clientIP != null) {
+ queryLogger.log(QueryLogInfo.CLIENT_IP_I, clientIP);
+ }
+ if (query != null) {
+ queryLogger.log(QueryLogInfo.QUERY_I, query);
+ }
+ if (bindParameters != null) {
+ queryLogger.log(QueryLogInfo.BIND_PARAMETERS_I, StringUtils.join(bindParameters, ","));
+ }
+ if (tenantId != null) {
+ queryLogger.log(QueryLogInfo.TENANT_ID_I, tenantId.getString());
+ }
+
+ queryLogger.log(QueryLogInfo.USER_I, queryServices.getUserName() != null ? queryServices.getUserName()
+ : queryServices.getUser().getShortName());
} catch (Exception e) {
- // Ignore for now
- }
-
- }
-
- private static ImmutableMap<QueryLogInfo, Object> getInitialDetails(PName tenantId,
- ConnectionQueryServices queryServices, String query, long startTime, List<Object> bindParameters) {
- Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder();
- String clientIP;
- try {
- clientIP = InetAddress.getLocalHost().getHostAddress();
- } catch (UnknownHostException e) {
- clientIP = "UnknownHost";
+ // Ignore
}
-
- if (clientIP != null) {
- queryLogBuilder.put(QueryLogInfo.CLIENT_IP_I, clientIP);
- }
- if (query != null) {
- queryLogBuilder.put(QueryLogInfo.QUERY_I, query);
- }
- queryLogBuilder.put(QueryLogInfo.START_TIME_I, startTime);
- if (bindParameters != null) {
- queryLogBuilder.put(QueryLogInfo.BIND_PARAMETERS_I, StringUtils.join(bindParameters, ","));
- }
- if (tenantId != null) {
- queryLogBuilder.put(QueryLogInfo.TENANT_ID_I, tenantId.getString());
- }
-
- queryLogBuilder.put(QueryLogInfo.USER_I, queryServices.getUserName() != null ? queryServices.getUserName()
- : queryServices.getUser().getShortName());
- return queryLogBuilder.build();
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/QueryStatus.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryStatus.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryStatus.java
new file mode 100644
index 0000000..0e634c1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryStatus.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+public enum QueryStatus {
+ COMPILED, COMPLETED,FAILED
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java
index 96e4bf9..8854e68 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java
@@ -17,14 +17,19 @@
*/
package org.apache.phoenix.log;
+import java.util.Map;
+
+import org.apache.phoenix.monitoring.MetricType;
+
import com.google.common.collect.ImmutableMap;
import com.lmax.disruptor.EventFactory;
class RingBufferEvent {
private String queryId;
- private QueryLogState logState;
private LogLevel connectionLogLevel;
private ImmutableMap<QueryLogInfo, Object> queryInfo;
+ private Map<String, Map<MetricType, Long>> readMetrics;
+ private Map<MetricType, Long> overAllMetrics;
public static final Factory FACTORY = new Factory();
@@ -40,7 +45,6 @@ import com.lmax.disruptor.EventFactory;
}
public void clear() {
- this.logState=null;
this.queryInfo=null;
this.queryId=null;
}
@@ -53,10 +57,6 @@ import com.lmax.disruptor.EventFactory;
public static Factory getFactory() {
return FACTORY;
}
-
- public QueryLogState getLogState() {
- return logState;
- }
public void setQueryInfo(ImmutableMap<QueryLogInfo, Object> queryInfo) {
this.queryInfo=queryInfo;
@@ -73,12 +73,6 @@ import com.lmax.disruptor.EventFactory;
}
- public void setLogState(QueryLogState logState) {
- this.logState=logState;
-
- }
-
-
public LogLevel getConnectionLogLevel() {
return connectionLogLevel;
}
@@ -88,6 +82,26 @@ import com.lmax.disruptor.EventFactory;
this.connectionLogLevel = connectionLogLevel;
}
+
+ public Map<String, Map<MetricType, Long>> getReadMetrics() {
+ return readMetrics;
+ }
+
+
+ public void setReadMetrics(Map<String, Map<MetricType, Long>> readMetrics) {
+ this.readMetrics = readMetrics;
+ }
+
+
+ public Map<MetricType, Long> getOverAllMetrics() {
+ return overAllMetrics;
+ }
+
+
+ public void setOverAllMetrics(Map<MetricType, Long> overAllMetrics) {
+ this.overAllMetrics = overAllMetrics;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java
index 653ddd6..742f8e1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java
@@ -17,14 +17,19 @@
*/
package org.apache.phoenix.log;
+import java.util.Map;
+
+import org.apache.phoenix.monitoring.MetricType;
+
import com.google.common.collect.ImmutableMap;
import com.lmax.disruptor.EventTranslator;
class RingBufferEventTranslator implements EventTranslator<RingBufferEvent> {
private String queryId;
- private QueryLogState logState;
private ImmutableMap<QueryLogInfo, Object> queryInfo;
private LogLevel connectionLogLevel;
+ private Map<String, Map<MetricType, Long>> readMetrics;
+ private Map<MetricType, Long> overAllMetrics;
public RingBufferEventTranslator(String queryId) {
this.queryId=queryId;
@@ -34,20 +39,22 @@ class RingBufferEventTranslator implements EventTranslator<RingBufferEvent> {
public void translateTo(RingBufferEvent event, long sequence) {
event.setQueryId(queryId);
event.setQueryInfo(queryInfo);
- event.setLogState(logState);
+ event.setReadMetrics(readMetrics);
+ event.setOverAllMetrics(overAllMetrics);
event.setConnectionLogLevel(connectionLogLevel);
clear();
}
private void clear() {
- setQueryInfo(null,null,null);
+ setQueryInfo(null,null,null,null);
}
- public void setQueryInfo(QueryLogState logState, ImmutableMap<QueryLogInfo, Object> queryInfo,
- LogLevel connectionLogLevel) {
+ public void setQueryInfo(LogLevel logLevel, ImmutableMap<QueryLogInfo, Object> queryInfo, Map<String, Map<MetricType, Long>> readMetrics,
+ Map<MetricType, Long> overAllMetrics) {
this.queryInfo = queryInfo;
- this.logState = logState;
- this.connectionLogLevel = connectionLogLevel;
+ this.connectionLogLevel = logLevel;
+ this.readMetrics = readMetrics;
+ this.overAllMetrics=overAllMetrics;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
index c102855..0209951 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
@@ -21,23 +21,17 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCH
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE;
import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.SQLException;
-import java.util.Map.Entry;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.expression.Determinism;
-import org.apache.phoenix.expression.LiteralExpression;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.util.QueryUtil;
import com.google.common.collect.ImmutableMap;
@@ -49,75 +43,111 @@ public class TableLogWriter implements LogWriter {
private static final Log LOG = LogFactory.getLog(LogWriter.class);
private Connection connection;
private boolean isClosed;
- private Table table;
+ private PreparedStatement upsertStatement;
private Configuration config;
+ private Map<MetricType,Integer> metricOrdinals=new HashMap<MetricType,Integer>();
public TableLogWriter(Configuration configuration) {
- this.config = configuration;
- try {
- this.connection = ConnectionFactory.createConnection(configuration);
- table = this.connection.getTable(SchemaUtil.getPhysicalTableName(
- SchemaUtil.getTableNameAsBytes(SYSTEM_CATALOG_SCHEMA, SYSTEM_LOG_TABLE), config));
- } catch (Exception e) {
- LOG.warn("Unable to initiate LogWriter for writing query logs to table");
+ this.config=configuration;
+ }
+
+ private PreparedStatement buildUpsertStatement(Connection conn) throws SQLException {
+ StringBuilder buf = new StringBuilder("UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"(");
+ int queryLogEntries=0;
+ for (QueryLogInfo info : QueryLogInfo.values()) {
+ buf.append(info.columnName);
+ buf.append(',');
+ queryLogEntries++;
+ }
+ for (MetricType metric : MetricType.values()) {
+ if (metric.logLevel() != LogLevel.OFF) {
+ metricOrdinals.put(metric, ++queryLogEntries);
+ buf.append(metric.columnName());
+ buf.append(',');
+ }
+ }
+ buf.setLength(buf.length()-1);
+ buf.append(") VALUES (");
+ for (int i = 0; i < QueryLogInfo.values().length; i++) {
+ buf.append("?,");
}
+ for (MetricType metric : MetricType.values()) {
+ if (metric.logLevel() != LogLevel.OFF) {
+ buf.append("?,");
+ }
+ }
+ buf.setLength(buf.length()-1);
+ buf.append(")");
+ return conn.prepareStatement(buf.toString());
}
@Override
- public void write(RingBufferEvent event) throws SQLException, IOException {
- if(isClosed()){
+ public void write(RingBufferEvent event) throws SQLException, IOException, ClassNotFoundException {
+ if (isClosed()) {
LOG.warn("Unable to commit query log as Log committer is already closed");
return;
}
- if (table == null || connection == null) {
- LOG.warn("Unable to commit query log as connection was not initiated ");
- return;
+ if (connection == null) {
+ synchronized (this) {
+ if (connection == null) {
+ connection = QueryUtil.getConnectionForQueryLog(this.config);
+ this.upsertStatement = buildUpsertStatement(connection);
+ }
+ }
}
- ImmutableMap<QueryLogInfo, Object> queryInfo=event.getQueryInfo();
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- Put put =new Put(Bytes.toBytes(event.getQueryId()));
- for (Entry<QueryLogInfo, Object> entry : queryInfo.entrySet()) {
- if (entry.getKey().logLevel.ordinal() <= event.getConnectionLogLevel().ordinal()) {
- LiteralExpression expression = LiteralExpression.newConstant(entry.getValue(), entry.getKey().dataType,
- Determinism.ALWAYS);
- expression.evaluate(null, ptr);
- put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes(entry.getKey().columnName),
- ByteUtil.copyKeyBytesIfNecessary(ptr));
+
+ ImmutableMap<QueryLogInfo, Object> queryInfoMap = event.getQueryInfo();
+ for (QueryLogInfo info : QueryLogInfo.values()) {
+ if (queryInfoMap.containsKey(info) && info.logLevel.ordinal() <= event.getConnectionLogLevel().ordinal()) {
+ upsertStatement.setObject(info.ordinal() + 1, queryInfoMap.get(info));
+ } else {
+ upsertStatement.setObject(info.ordinal() + 1, null);
}
}
-
- if (QueryLogInfo.QUERY_STATUS_I.logLevel.ordinal() <= event.getConnectionLogLevel().ordinal()
- && (event.getLogState() == QueryLogState.COMPLETED || event.getLogState() == QueryLogState.FAILED)) {
- LiteralExpression expression = LiteralExpression.newConstant(event.getLogState().toString(),
- QueryLogInfo.QUERY_STATUS_I.dataType, Determinism.ALWAYS);
- expression.evaluate(null, ptr);
- put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
- Bytes.toBytes(QueryLogInfo.QUERY_STATUS_I.columnName), ByteUtil.copyKeyBytesIfNecessary(ptr));
+ Map<MetricType, Long> overAllMetrics = event.getOverAllMetrics();
+ Map<String, Map<MetricType, Long>> readMetrics = event.getReadMetrics();
+
+ for (MetricType metric : MetricType.values()) {
+ if (overAllMetrics != null && overAllMetrics.containsKey(metric)
+ && metric.isLoggingEnabled(event.getConnectionLogLevel())) {
+ upsertStatement.setObject(metricOrdinals.get(metric), overAllMetrics.get(metric));
+ } else {
+ if (metric.logLevel() != LogLevel.OFF) {
+ upsertStatement.setObject(metricOrdinals.get(metric), null);
+ }
+ }
+ }
+
+ if (readMetrics != null && !readMetrics.isEmpty()) {
+ for (Map.Entry<String, Map<MetricType, Long>> entry : readMetrics.entrySet()) {
+ upsertStatement.setObject(QueryLogInfo.TABLE_NAME_I.ordinal() + 1, entry.getKey());
+ for (MetricType metric : entry.getValue().keySet()) {
+ if (metric.isLoggingEnabled(event.getConnectionLogLevel())) {
+ upsertStatement.setObject(metricOrdinals.get(metric), entry.getValue().get(metric));
+ }
+ }
+ upsertStatement.executeUpdate();
+ }
+ } else {
+ upsertStatement.executeUpdate();
}
- put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
- table.put(put);
-
+ connection.commit();
}
@Override
public void close() throws IOException {
- if(isClosed()){
- return;
- }
- isClosed=true;
+ if (isClosed()) { return; }
+ isClosed = true;
try {
- if (table != null) {
- table.close();
- }
- if (connection != null && !connection.isClosed()) {
- //It should internally close all the statements
+ if (connection != null) {
+ // It should internally close all the statements
connection.close();
}
- } catch (IOException e) {
+ } catch (SQLException e) {
// TODO Ignore?
}
}
-
+
public boolean isClosed(){
return isClosed;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index ec1b451..58c048b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -35,7 +35,15 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.iterate.*;
+import org.apache.phoenix.iterate.ConcatResultIterator;
+import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
+import org.apache.phoenix.iterate.PeekingResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.RoundRobinResultIterator;
+import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.iterate.TableResultIterator;
+import org.apache.phoenix.iterate.TableSnapshotResultIterator;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.monitoring.ReadMetricQueue;
@@ -112,7 +120,6 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
services.clearTableRegionCache(tableNameBytes);
long renewScannerLeaseThreshold = queryPlan.getContext().getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
- boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled();
for (Scan scan : scans) {
// For MR, skip the region boundary check exception if we encounter a split. ref: PHOENIX-2599
scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
@@ -120,7 +127,7 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
PeekingResultIterator peekingResultIterator;
ScanMetricsHolder scanMetricsHolder =
ScanMetricsHolder.getInstance(readMetrics, tableName, scan,
- isRequestMetricsEnabled);
+ queryPlan.getContext().getConnection().getLogLevel());
if (snapshotName != null) {
// result iterator to read snapshots
final TableSnapshotResultIterator tableSnapshotResultIterator = new TableSnapshotResultIterator(configuration, scan,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
index 0e82ce4..daa0bba 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
@@ -26,7 +26,6 @@ import static org.apache.phoenix.monitoring.MetricType.MEMORY_WAIT_TIME;
public class MemoryMetricsHolder {
private final CombinableMetric memoryChunkSizeMetric;
private final CombinableMetric memoryWaitTimeMetric;
- public static final MemoryMetricsHolder NO_OP_INSTANCE = new MemoryMetricsHolder(new ReadMetricQueue(false), null);
public MemoryMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
this.memoryChunkSizeMetric = readMetrics.allotMetric(MEMORY_CHUNK_BYTES, tableName);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index ef6eceb..8e1de66 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -17,62 +17,74 @@
*/
package org.apache.phoenix.monitoring;
+import org.apache.phoenix.log.LogLevel;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PLong;
+
+
+/**
+ * Keeping {@link LogLevel#OFF} for metrics which are calculated globally only and doesn't need to be logged in SYSTEM.LOG
+ */
public enum MetricType {
- NO_OP_METRIC("no", "No op metric"),
+ NO_OP_METRIC("no", "No op metric",LogLevel.OFF, PLong.INSTANCE),
// mutation (write) related metrics
- MUTATION_BATCH_SIZE("ms", "Number of mutations in the batch"),
- MUTATION_BYTES("mb", "Size of mutations in bytes"),
- MUTATION_COMMIT_TIME("mt", "Time it took to commit a batch of mutations"),
- MUTATION_BATCH_FAILED_SIZE("mfs", "Number of mutations that failed to be committed"),
- MUTATION_SQL_COUNTER("msc", "Counter for number of mutation sql statements"),
+ MUTATION_BATCH_SIZE("ms", "Number of mutations in the batch",LogLevel.OFF, PLong.INSTANCE),
+ MUTATION_BYTES("mb", "Size of mutations in bytes",LogLevel.OFF, PLong.INSTANCE),
+ MUTATION_COMMIT_TIME("mt", "Time it took to commit a batch of mutations",LogLevel.OFF, PLong.INSTANCE),
+ MUTATION_BATCH_FAILED_SIZE("mfs", "Number of mutations that failed to be committed",LogLevel.OFF, PLong.INSTANCE),
+ MUTATION_SQL_COUNTER("msc", "Counter for number of mutation sql statements",LogLevel.OFF, PLong.INSTANCE),
// query (read) related metrics
- QUERY_TIME("qt", "Query times"),
- QUERY_TIMEOUT_COUNTER("qo", "Number of times query timed out"),
- QUERY_FAILED_COUNTER("qf", "Number of times query failed"),
- NUM_PARALLEL_SCANS("ps", "Number of scans that were executed in parallel"),
- SCAN_BYTES("sb", "Number of bytes read by scans"),
- SELECT_SQL_COUNTER("sc", "Counter for number of sql queries"),
+ QUERY_TIME("qt", "Query times",LogLevel.OFF, PLong.INSTANCE),
+ QUERY_TIMEOUT_COUNTER("qo", "Number of times query timed out",LogLevel.DEBUG, PLong.INSTANCE),
+ QUERY_FAILED_COUNTER("qf", "Number of times query failed",LogLevel.DEBUG, PLong.INSTANCE),
+ NUM_PARALLEL_SCANS("ps", "Number of scans that were executed in parallel",LogLevel.DEBUG, PLong.INSTANCE),
+ SCAN_BYTES("sb", "Number of bytes read by scans",LogLevel.OFF, PLong.INSTANCE),
+ SELECT_SQL_COUNTER("sc", "Counter for number of sql queries",LogLevel.OFF, PLong.INSTANCE),
// task metrics
- TASK_QUEUE_WAIT_TIME("tw", "Time in milliseconds tasks had to wait in the queue of the thread pool executor"),
- TASK_END_TO_END_TIME("tee", "Time in milliseconds spent by tasks from creation to completion"),
- TASK_EXECUTION_TIME("tx", "Time in milliseconds tasks took to execute"),
- TASK_EXECUTED_COUNTER("te", "Counter for number of tasks submitted to the thread pool executor"),
- TASK_REJECTED_COUNTER("tr", "Counter for number of tasks that were rejected by the thread pool executor"),
+ TASK_QUEUE_WAIT_TIME("tw", "Time in milliseconds tasks had to wait in the queue of the thread pool executor",LogLevel.DEBUG, PLong.INSTANCE),
+ TASK_END_TO_END_TIME("tee", "Time in milliseconds spent by tasks from creation to completion",LogLevel.DEBUG, PLong.INSTANCE),
+ TASK_EXECUTION_TIME("tx", "Time in milliseconds tasks took to execute",LogLevel.DEBUG, PLong.INSTANCE),
+ TASK_EXECUTED_COUNTER("te", "Counter for number of tasks submitted to the thread pool executor",LogLevel.DEBUG, PLong.INSTANCE),
+ TASK_REJECTED_COUNTER("tr", "Counter for number of tasks that were rejected by the thread pool executor",LogLevel.DEBUG, PLong.INSTANCE),
// spool metrics
- SPOOL_FILE_SIZE("ss", "Size of spool files created in bytes"),
- SPOOL_FILE_COUNTER("sn", "Number of spool files created"),
+ SPOOL_FILE_SIZE("ss", "Size of spool files created in bytes",LogLevel.DEBUG, PLong.INSTANCE),
+ SPOOL_FILE_COUNTER("sn", "Number of spool files created",LogLevel.DEBUG, PLong.INSTANCE),
// misc metrics
- MEMORY_CHUNK_BYTES("mc", "Number of bytes allocated by the memory manager"),
- MEMORY_WAIT_TIME("mw", "Number of milliseconds threads needed to wait for memory to be allocated through memory manager"),
- CACHE_REFRESH_SPLITS_COUNTER("cr", "Number of times cache was refreshed because of splits"),
- WALL_CLOCK_TIME_MS("tq", "Wall clock time elapsed for the overall query execution"),
- RESULT_SET_TIME_MS("tn", "Wall clock time elapsed for reading all records using resultSet.next()"),
- OPEN_PHOENIX_CONNECTIONS_COUNTER("o", "Number of open phoenix connections"),
- QUERY_SERVICES_COUNTER("cqs", "Number of ConnectionQueryServicesImpl instantiated"),
- HCONNECTIONS_COUNTER("h", "Number of HConnections created by phoenix driver"),
+ MEMORY_CHUNK_BYTES("mc", "Number of bytes allocated by the memory manager",LogLevel.DEBUG, PLong.INSTANCE),
+ MEMORY_WAIT_TIME("mw", "Number of milliseconds threads needed to wait for memory to be allocated through memory manager",LogLevel.DEBUG, PLong.INSTANCE),
+ CACHE_REFRESH_SPLITS_COUNTER("cr", "Number of times cache was refreshed because of splits",LogLevel.DEBUG, PLong.INSTANCE),
+ WALL_CLOCK_TIME_MS("tq", "Wall clock time elapsed for the overall query execution",LogLevel.INFO, PLong.INSTANCE),
+ RESULT_SET_TIME_MS("tn", "Wall clock time elapsed for reading all records using resultSet.next()",LogLevel.INFO, PLong.INSTANCE),
+ OPEN_PHOENIX_CONNECTIONS_COUNTER("o", "Number of open phoenix connections",LogLevel.OFF, PLong.INSTANCE),
+ QUERY_SERVICES_COUNTER("cqs", "Number of ConnectionQueryServicesImpl instantiated",LogLevel.OFF, PLong.INSTANCE),
+ HCONNECTIONS_COUNTER("h", "Number of HConnections created by phoenix driver",LogLevel.OFF, PLong.INSTANCE),
PHOENIX_CONNECTIONS_THROTTLED_COUNTER("ct", "Number of client Phoenix connections prevented from opening " +
- "because there are already too many to that target cluster."),
- PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("ca","Number of requests for Phoenix connections, whether successful or not."),
+ "because there are already too many to that target cluster.",LogLevel.OFF, PLong.INSTANCE),
+ PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("ca","Number of requests for Phoenix connections, whether successful or not.",LogLevel.OFF, PLong.INSTANCE),
// hbase metrics
- COUNT_RPC_CALLS("rp", "Number of RPC calls"),
- COUNT_REMOTE_RPC_CALLS("rr", "Number of remote RPC calls"),
- COUNT_MILLS_BETWEEN_NEXTS("n", "Sum of milliseconds between sequential next calls"),
- COUNT_NOT_SERVING_REGION_EXCEPTION("nsr", "Number of NotServingRegionException caught"),
- COUNT_BYTES_REGION_SERVER_RESULTS("rs", "Number of bytes in Result objects from region servers"),
- COUNT_BYTES_IN_REMOTE_RESULTS("rrs", "Number of bytes in Result objects from remote region servers"),
- COUNT_SCANNED_REGIONS("rg", "Number of regions scanned"),
- COUNT_RPC_RETRIES("rpr", "Number of RPC retries"),
- COUNT_REMOTE_RPC_RETRIES("rrr", "Number of remote RPC retries"),
- COUNT_ROWS_SCANNED("ws", "Number of rows scanned"),
- COUNT_ROWS_FILTERED("wf", "Number of rows filtered");
+ COUNT_RPC_CALLS("rp", "Number of RPC calls",LogLevel.DEBUG, PLong.INSTANCE),
+ COUNT_REMOTE_RPC_CALLS("rr", "Number of remote RPC calls",LogLevel.DEBUG, PLong.INSTANCE),
+ COUNT_MILLS_BETWEEN_NEXTS("n", "Sum of milliseconds between sequential next calls",LogLevel.DEBUG, PLong.INSTANCE),
+ COUNT_NOT_SERVING_REGION_EXCEPTION("nsr", "Number of NotServingRegionException caught",LogLevel.DEBUG, PLong.INSTANCE),
+ COUNT_BYTES_REGION_SERVER_RESULTS("rs", "Number of bytes in Result objects from region servers",LogLevel.DEBUG, PLong.INSTANCE),
+ COUNT_BYTES_IN_REMOTE_RESULTS("rrs", "Number of bytes in Result objects from remote region servers",LogLevel.DEBUG, PLong.INSTANCE),
+ COUNT_SCANNED_REGIONS("rg", "Number of regions scanned",LogLevel.DEBUG, PLong.INSTANCE),
+ COUNT_RPC_RETRIES("rpr", "Number of RPC retries",LogLevel.DEBUG, PLong.INSTANCE),
+ COUNT_REMOTE_RPC_RETRIES("rrr", "Number of remote RPC retries",LogLevel.DEBUG, PLong.INSTANCE),
+ COUNT_ROWS_SCANNED("ws", "Number of rows scanned",LogLevel.DEBUG, PLong.INSTANCE),
+ COUNT_ROWS_FILTERED("wf", "Number of rows filtered",LogLevel.DEBUG,PLong.INSTANCE);
private final String description;
private final String shortName;
+ private LogLevel logLevel;
+ private PDataType dataType;
- private MetricType(String shortName, String description) {
+ private MetricType(String shortName, String description, LogLevel logLevel, PDataType dataType) {
this.shortName = shortName;
this.description = description;
+ this.logLevel=logLevel;
+ this.dataType=dataType;
}
public String description() {
@@ -82,5 +94,34 @@ public enum MetricType {
public String shortName() {
return shortName;
}
+
+ public LogLevel logLevel() {
+ return logLevel;
+ }
+
+ public PDataType dataType() {
+ return dataType;
+ }
+
+ public String columnName() {
+ return name();
+ }
+
+ public boolean isLoggingEnabled(LogLevel connectionLogLevel){
+ return logLevel() != LogLevel.OFF && (logLevel().ordinal() <= connectionLogLevel.ordinal());
+ }
+ public static String getMetricColumnsDetails() {
+ StringBuilder buffer=new StringBuilder();
+ for(MetricType metric:MetricType.values()){
+ if (metric.logLevel() != LogLevel.OFF) {
+ buffer.append(metric.columnName());
+ buffer.append(" ");
+ buffer.append(metric.dataType.getSqlTypeName());
+ buffer.append(",");
+ }
+ }
+ return buffer.toString();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricUtil.java
new file mode 100644
index 0000000..1e5ac08
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricUtil.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import org.apache.phoenix.log.LogLevel;
+import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric;
+
+public class MetricUtil {
+
+ public static CombinableMetric getCombinableMetric(LogLevel connectionLogLevel, MetricType type) {
+ if (!type.isLoggingEnabled(connectionLogLevel)) { return NoOpRequestMetric.INSTANCE; }
+ return new CombinableMetricImpl(type);
+ }
+
+}