You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by pb...@apache.org on 2018/04/13 22:39:53 UTC
[04/20] phoenix git commit: PHOENIX-2715 Query Log (Ankit Singhal)
PHOENIX-2715 Query Log (Ankit Singhal)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b291068b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b291068b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b291068b
Branch: refs/heads/4.x-cdh5.11
Commit: b291068bc9c6e133c7bcb6dfe52dd4f96a76f2a1
Parents: 4d9cc92
Author: Ankit Singhal <an...@gmail.com>
Authored: Tue Apr 10 07:53:31 2018 +0100
Committer: Pedro Boado <pb...@apache.org>
Committed: Fri Apr 13 23:26:16 2018 +0100
----------------------------------------------------------------------
phoenix-core/pom.xml | 5 +
.../end2end/QueryDatabaseMetaDataIT.java | 4 +
.../apache/phoenix/end2end/QueryLoggerIT.java | 358 +++++++++++++++++++
.../end2end/TenantSpecificTablesDDLIT.java | 2 +
.../phoenix/compile/StatementContext.java | 10 +
.../phoenix/coprocessor/MetaDataProtocol.java | 5 +-
.../phoenix/iterate/ScanningResultIterator.java | 18 +-
.../apache/phoenix/jdbc/PhoenixConnection.java | 17 +-
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 18 +
.../phoenix/jdbc/PhoenixPreparedStatement.java | 11 +-
.../apache/phoenix/jdbc/PhoenixResultSet.java | 38 ++
.../apache/phoenix/jdbc/PhoenixStatement.java | 67 +++-
.../java/org/apache/phoenix/log/LogLevel.java | 22 ++
.../java/org/apache/phoenix/log/LogWriter.java | 51 +++
.../log/QueryLogDetailsEventHandler.java | 63 ++++
.../org/apache/phoenix/log/QueryLogInfo.java | 87 +++++
.../org/apache/phoenix/log/QueryLogState.java | 22 ++
.../org/apache/phoenix/log/QueryLogger.java | 145 ++++++++
.../log/QueryLoggerDefaultExceptionHandler.java | 51 +++
.../phoenix/log/QueryLoggerDisruptor.java | 117 ++++++
.../org/apache/phoenix/log/QueryLoggerUtil.java | 62 ++++
.../org/apache/phoenix/log/RingBufferEvent.java | 93 +++++
.../phoenix/log/RingBufferEventTranslator.java | 53 +++
.../org/apache/phoenix/log/TableLogWriter.java | 125 +++++++
.../phoenix/monitoring/ReadMetricQueue.java | 44 ++-
.../phoenix/monitoring/ScanMetricsHolder.java | 48 ++-
.../phoenix/query/ConnectionQueryServices.java | 6 +
.../query/ConnectionQueryServicesImpl.java | 35 +-
.../query/ConnectionlessQueryServicesImpl.java | 18 +
.../query/DelegateConnectionQueryServices.java | 14 +
.../apache/phoenix/query/QueryConstants.java | 45 +++
.../org/apache/phoenix/query/QueryServices.java | 4 +
.../phoenix/query/QueryServicesOptions.java | 9 +-
pom.xml | 6 +
34 files changed, 1612 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index b07cbbb..8fe8a10 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -526,5 +526,10 @@
<artifactId>i18n-util</artifactId>
<version>${i18n-util.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.lmax</groupId>
+ <artifactId>disruptor</artifactId>
+ <version>${disruptor.version}</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
index a1bcf40..54cb5da 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
@@ -166,6 +166,10 @@ public class QueryDatabaseMetaDataIT extends ParallelStatsDisabledIT {
assertEquals(PTableType.SYSTEM.toString(), rs.getString("TABLE_TYPE"));
assertTrue(rs.next());
assertEquals(SYSTEM_CATALOG_SCHEMA, rs.getString("TABLE_SCHEM"));
+ assertEquals(PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE, rs.getString("TABLE_NAME"));
+ assertEquals(PTableType.SYSTEM.toString(), rs.getString("TABLE_TYPE"));
+ assertTrue(rs.next());
+ assertEquals(SYSTEM_CATALOG_SCHEMA, rs.getString("TABLE_SCHEM"));
assertEquals(TYPE_SEQUENCE, rs.getString("TABLE_NAME"));
assertEquals(PTableType.SYSTEM.toString(), rs.getString("TABLE_TYPE"));
assertTrue(rs.next());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java
new file mode 100644
index 0000000..940ba6f
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BIND_PARAMETERS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLIENT_IP;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXCEPTION_TRACE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXPLAIN_PLAN;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GLOBAL_SCAN_DETAILS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NO_OF_RESULTS_ITERATED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_ID;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_STATUS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCAN_METRICS_JSON;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TOTAL_EXECUTION_TIME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.InetAddress;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.log.LogLevel;
+import org.apache.phoenix.log.QueryLogState;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
+
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ // Enable request metric collection at the driver level
+ props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
+ // disable renewing leases as this will force spooling to happen.
+ props.put(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ // need the non-test driver for some tests that check number of hconnections, etc.
+ DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+ }
+
+
+ @Test
+ public void testDebugLogs() throws Exception {
+ String tableName = generateUniqueName();
+ createTableAndInsertValues(tableName, true);
+ Properties props= new Properties();
+ props.setProperty(QueryServices.LOG_LEVEL, LogLevel.DEBUG.name());
+ Connection conn = DriverManager.getConnection(getUrl(),props);
+ assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),LogLevel.DEBUG);
+ String query = "SELECT * FROM " + tableName;
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ StatementContext context = ((PhoenixResultSet)rs).getContext();
+ String queryId = context.getQueryLogger().getQueryId();
+ while (rs.next()) {
+ rs.getString(1);
+ rs.getString(2);
+ }
+ ResultSet explainRS = conn.createStatement().executeQuery("Explain " + query);
+
+ String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"";
+ rs = conn.createStatement().executeQuery(logQuery);
+ boolean foundQueryLog = false;
+ int delay = 5000;
+
+ // sleep for sometime to let query log committed
+ Thread.sleep(delay);
+ while (rs.next()) {
+ if (rs.getString(QUERY_ID).equals(queryId)) {
+ foundQueryLog = true;
+ assertEquals(rs.getString(BIND_PARAMETERS), null);
+ assertEquals(rs.getString(USER), System.getProperty("user.name"));
+ assertEquals(rs.getString(CLIENT_IP), InetAddress.getLocalHost().getHostAddress());
+ assertEquals(rs.getString(EXPLAIN_PLAN), QueryUtil.getExplainPlan(explainRS));
+ assertEquals(rs.getString(GLOBAL_SCAN_DETAILS), context.getScan().toJSON());
+ assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 10);
+ assertEquals(rs.getString(QUERY), query);
+ assertEquals(rs.getString(QUERY_STATUS), QueryLogState.COMPLETED.toString());
+ assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay);
+ assertEquals(rs.getString(TENANT_ID), null);
+ assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null);
+ assertTrue(rs.getString(SCAN_METRICS_JSON).contains("scanMetrics"));
+ assertEquals(rs.getString(EXCEPTION_TRACE),null);
+ }else{
+ //confirm we are not logging system queries
+ assertFalse(rs.getString(QUERY).toString().contains(SYSTEM_CATALOG_SCHEMA));
+ }
+ }
+ assertTrue(foundQueryLog);
+ conn.close();
+ }
+
+ @Test
+ public void testLogSampling() throws Exception {
+ String tableName = generateUniqueName();
+ createTableAndInsertValues(tableName, true);
+ Properties props= new Properties();
+ props.setProperty(QueryServices.LOG_LEVEL, LogLevel.DEBUG.name());
+ props.setProperty(QueryServices.LOG_SAMPLE_RATE, "0.5");
+ Connection conn = DriverManager.getConnection(getUrl(),props);
+ assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),LogLevel.DEBUG);
+ String query = "SELECT * FROM " + tableName;
+ int count=100;
+ for (int i = 0; i < count; i++) {
+ conn.createStatement().executeQuery(query);
+ }
+
+ String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"";
+ ResultSet rs = conn.createStatement().executeQuery(logQuery);
+ int delay = 5000;
+
+ // sleep for sometime to let query log committed
+ Thread.sleep(delay);
+ int logCount=0;
+ while (rs.next()) {
+ logCount++;
+ }
+
+ //sampling rate is 0.5 , but with lesser count, uniformity of thread random may not be perfect, so taking 0.75 for comparison
+ assertTrue(logCount != 0 && logCount < count * 0.75);
+ conn.close();
+ }
+
+ @Test
+ public void testInfoLogs() throws Exception{
+ String tableName = generateUniqueName();
+ createTableAndInsertValues(tableName, true);
+ Properties props= new Properties();
+ props.setProperty(QueryServices.LOG_LEVEL, LogLevel.INFO.name());
+ Connection conn = DriverManager.getConnection(getUrl(),props);
+ assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),LogLevel.INFO);
+ String query = "SELECT * FROM " + tableName;
+
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ StatementContext context = ((PhoenixResultSet)rs).getContext();
+ String queryId = context.getQueryLogger().getQueryId();
+ while (rs.next()) {
+ rs.getString(1);
+ rs.getString(2);
+ }
+
+ String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"";
+ rs = conn.createStatement().executeQuery(logQuery);
+ boolean foundQueryLog = false;
+ int delay = 5000;
+
+ // sleep for sometime to let query log committed
+ Thread.sleep(delay);
+ while (rs.next()) {
+ if (rs.getString(QUERY_ID).equals(queryId)) {
+ foundQueryLog = true;
+ assertEquals(rs.getString(USER), System.getProperty("user.name"));
+ assertEquals(rs.getString(CLIENT_IP), InetAddress.getLocalHost().getHostAddress());
+ assertEquals(rs.getString(EXPLAIN_PLAN), null);
+ assertEquals(rs.getString(GLOBAL_SCAN_DETAILS),null);
+ assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 0);
+ assertEquals(rs.getString(QUERY), query);
+ assertEquals(rs.getString(QUERY_STATUS),null);
+ assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay);
+ assertEquals(rs.getString(TENANT_ID), null);
+ assertTrue(rs.getString(TOTAL_EXECUTION_TIME) == null);
+ }
+ }
+ assertTrue(foundQueryLog);
+ conn.close();
+ }
+
+ @Test
+ public void testWithLoggingOFF() throws Exception{
+ String tableName = generateUniqueName();
+ createTableAndInsertValues(tableName, true);
+ Properties props= new Properties();
+ props.setProperty(QueryServices.LOG_LEVEL, LogLevel.OFF.name());
+ Connection conn = DriverManager.getConnection(getUrl(),props);
+ assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),LogLevel.OFF);
+ String query = "SELECT * FROM " + tableName;
+
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ StatementContext context = ((PhoenixResultSet)rs).getContext();
+ String queryId = context.getQueryLogger().getQueryId();
+ while (rs.next()) {
+ rs.getString(1);
+ rs.getString(2);
+ }
+
+ String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"";
+ rs = conn.createStatement().executeQuery(logQuery);
+ boolean foundQueryLog = false;
+ int delay = 5000;
+
+ // sleep for sometime to let query log committed
+ Thread.sleep(delay);
+ while (rs.next()) {
+ if (rs.getString(QUERY_ID).equals(queryId)) {
+ foundQueryLog = true;
+ }
+ }
+ assertFalse(foundQueryLog);
+ conn.close();
+ }
+
+
+ @Test
+ public void testPreparedStatementWithTrace() throws Exception{
+ testPreparedStatement(LogLevel.TRACE);
+ }
+
+ @Test
+ public void testPreparedStatementWithDebug() throws Exception{
+ testPreparedStatement(LogLevel.DEBUG);
+ }
+
+ private void testPreparedStatement(LogLevel loglevel) throws Exception{
+ String tableName = generateUniqueName();
+ createTableAndInsertValues(tableName, true);
+ Properties props= new Properties();
+ props.setProperty(QueryServices.LOG_LEVEL, loglevel.name());
+ Connection conn = DriverManager.getConnection(getUrl(),props);
+ assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),loglevel);
+
+ String query = "SELECT * FROM " + tableName +" where V = ?";
+
+ PreparedStatement pstmt = conn.prepareStatement(query);
+ pstmt.setString(1, "value5");
+ ResultSet rs = pstmt.executeQuery();
+ StatementContext context = ((PhoenixResultSet)rs).getContext();
+ String queryId = context.getQueryLogger().getQueryId();
+ while (rs.next()) {
+ rs.getString(1);
+ rs.getString(2);
+ }
+ ResultSet explainRS = conn.createStatement()
+ .executeQuery("Explain " + "SELECT * FROM " + tableName + " where V = 'value5'");
+ String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"";
+ rs = conn.createStatement().executeQuery(logQuery);
+ boolean foundQueryLog = false;
+ int delay = 5000;
+
+ // sleep for sometime to let query log committed
+ Thread.sleep(delay);
+ while (rs.next()) {
+ if (rs.getString(QUERY_ID).equals(queryId)) {
+ foundQueryLog = true;
+ assertEquals(rs.getString(BIND_PARAMETERS), loglevel == LogLevel.TRACE ? "value5" : null);
+ assertEquals(rs.getString(USER), System.getProperty("user.name"));
+ assertEquals(rs.getString(CLIENT_IP), InetAddress.getLocalHost().getHostAddress());
+ assertEquals(rs.getString(EXPLAIN_PLAN), QueryUtil.getExplainPlan(explainRS));
+ assertEquals(rs.getString(GLOBAL_SCAN_DETAILS), context.getScan().toJSON());
+ assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 1);
+ assertEquals(rs.getString(QUERY), query);
+ assertEquals(rs.getString(QUERY_STATUS), QueryLogState.COMPLETED.toString());
+ assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay);
+ assertEquals(rs.getString(TENANT_ID), null);
+ assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null);
+ }
+ }
+ assertTrue(foundQueryLog);
+ conn.close();
+ }
+
+
+
+ @Test
+ public void testFailedQuery() throws Exception {
+ String tableName = generateUniqueName();
+ Properties props = new Properties();
+ props.setProperty(QueryServices.LOG_LEVEL, LogLevel.DEBUG.name());
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(), LogLevel.DEBUG);
+ // Table does not exists
+ String query = "SELECT * FROM " + tableName;
+
+ try {
+ conn.createStatement().executeQuery(query);
+ fail();
+ } catch (SQLException e) {
+ assertEquals(e.getErrorCode(), SQLExceptionCode.TABLE_UNDEFINED.getErrorCode());
+ }
+ String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"";
+ ResultSet rs = conn.createStatement().executeQuery(logQuery);
+ boolean foundQueryLog = false;
+ int delay = 5000;
+
+ // sleep for sometime to let query log committed
+ Thread.sleep(delay);
+ while (rs.next()) {
+ if (QueryLogState.FAILED.name().equals(rs.getString(QUERY_STATUS))) {
+ foundQueryLog = true;
+ assertEquals(rs.getString(USER), System.getProperty("user.name"));
+ assertEquals(rs.getString(CLIENT_IP), InetAddress.getLocalHost().getHostAddress());
+ assertEquals(rs.getString(EXPLAIN_PLAN), null);
+ assertEquals(rs.getString(GLOBAL_SCAN_DETAILS), null);
+ assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 0);
+ assertEquals(rs.getString(QUERY), query);
+ assertTrue(rs.getString(EXCEPTION_TRACE).contains(SQLExceptionCode.TABLE_UNDEFINED.getMessage()));
+ assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay);
+ assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null);
+ }
+ }
+ assertTrue(foundQueryLog);
+ conn.close();
+ }
+
+ private static void createTableAndInsertValues(String tableName, boolean resetGlobalMetricsAfterTableCreate)
+ throws Exception {
+ String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute(ddl);
+ // executing 10 upserts/mutations.
+ String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+ PreparedStatement stmt = conn.prepareStatement(dml);
+ for (int i = 1; i <= 10; i++) {
+ stmt.setString(1, "key" + i);
+ stmt.setString(2, "value" + i);
+ stmt.executeUpdate();
+ }
+ conn.commit();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
index f8dfd65..34a1312 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
@@ -493,6 +493,8 @@ public class TenantSpecificTablesDDLIT extends BaseTenantSpecificTablesIT {
assertTrue(rs.next());
assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, SYSTEM_FUNCTION_TABLE, SYSTEM);
assertTrue(rs.next());
+ assertTableMetaData(rs, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE, PTableType.SYSTEM);
+ assertTrue(rs.next());
assertTableMetaData(rs, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.TYPE_SEQUENCE, PTableType.SYSTEM);
assertTrue(rs.next());
assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE, PTableType.SYSTEM);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index 39d8525..c105046 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.log.QueryLogger;
import org.apache.phoenix.monitoring.OverAllQueryMetrics;
import org.apache.phoenix.monitoring.ReadMetricQueue;
import org.apache.phoenix.parse.SelectStatement;
@@ -83,6 +84,7 @@ public class StatementContext {
private Map<SelectStatement, Object> subqueryResults;
private final ReadMetricQueue readMetricsQueue;
private final OverAllQueryMetrics overAllQueryMetrics;
+ private QueryLogger queryLogger;
public StatementContext(PhoenixStatement statement) {
this(statement, new Scan());
@@ -306,5 +308,13 @@ public class StatementContext {
public OverAllQueryMetrics getOverallQueryMetrics() {
return overAllQueryMetrics;
}
+
+ public void setQueryLogger(QueryLogger queryLogger) {
+ this.queryLogger=queryLogger;
+ }
+
+ public QueryLogger getQueryLogger() {
+ return queryLogger;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index efad1e7..4c4c96f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -76,6 +76,8 @@ public abstract class MetaDataProtocol extends MetaDataService {
public static final boolean DEFAULT_META_DATA_KEEP_DELETED_CELLS = true;
public static final int DEFAULT_MAX_STAT_DATA_VERSIONS = 1;
public static final boolean DEFAULT_STATS_KEEP_DELETED_CELLS = false;
+ public static final int DEFAULT_LOG_VERSIONS = 10;
+ public static final int DEFAULT_LOG_TTL = 7 * 24 * 60 * 60; // 7 days
// Min system table timestamps for every release.
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0 = MIN_TABLE_TIMESTAMP + 3;
@@ -118,7 +120,8 @@ public abstract class MetaDataProtocol extends MetaDataService {
TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0, "4.14.x");
}
- public static final String CURRENT_CLIENT_VERSION = PHOENIX_MAJOR_VERSION + "." + PHOENIX_MINOR_VERSION + "." + PHOENIX_PATCH_NUMBER;
+ public static final String CURRENT_CLIENT_VERSION = PHOENIX_MAJOR_VERSION + "." + PHOENIX_MINOR_VERSION + "." + PHOENIX_PATCH_NUMBER;
+
// TODO: pare this down to minimum, as we don't need duplicates for both table and column errors, nor should we need
// a different code for every type of error.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
index 011feaa..9a31238 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -17,18 +17,18 @@
*/
package org.apache.phoenix.iterate;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_CALLS;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_REMOTE_RPC_CALLS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_IN_REMOTE_RESULTS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_REGION_SERVER_RESULTS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_NOT_SERVING_REGION_EXCEPTION;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_REGION_SERVER_RESULTS;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_IN_REMOTE_RESULTS;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_SCANNED_REGIONS;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_RETRIES;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_REMOTE_RPC_CALLS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_REMOTE_RPC_RETRIES;
-import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_ROWS_SCANNED;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_ROWS_FILTERED;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_ROWS_SCANNED;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_CALLS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_RETRIES;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_SCANNED_REGIONS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES;
import java.io.IOException;
import java.sql.SQLException;
@@ -93,7 +93,7 @@ public class ScanningResultIterator implements ResultIterator {
if(scanMetricsMap == null) {
return;
}
-
+ scanMetricsHolder.setScanMetricMap(scanMetricsMap);
changeMetric(scanMetricsHolder.getCountOfRPCcalls(),
scanMetricsMap.get(RPC_CALLS_METRIC_NAME));
changeMetric(scanMetricsHolder.getCountOfRemoteRPCcalls(),
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 2b428c9..d3626f8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -75,11 +75,11 @@ import org.apache.phoenix.iterate.ParallelIteratorFactory;
import org.apache.phoenix.iterate.TableResultIterator;
import org.apache.phoenix.iterate.TableResultIteratorFactory;
import org.apache.phoenix.jdbc.PhoenixStatement.PhoenixStatementParser;
+import org.apache.phoenix.log.LogLevel;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.parse.PSchema;
import org.apache.phoenix.query.ConnectionQueryServices;
-import org.apache.phoenix.query.ConnectionQueryServices.Feature;
import org.apache.phoenix.query.DelegateConnectionQueryServices;
import org.apache.phoenix.query.MetaDataMutated;
import org.apache.phoenix.query.PropertyPolicyProvider;
@@ -168,6 +168,8 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
private final LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue;
private TableResultIteratorFactory tableResultIteratorFactory;
private boolean isRunningUpgrade;
+ private LogLevel logLevel;
+ private Double logSamplingRate;
static {
Tracing.addTraceMetricsSource();
@@ -378,6 +380,10 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
this.scannerQueue = new LinkedBlockingQueue<>();
this.tableResultIteratorFactory = new DefaultTableResultIteratorFactory();
this.isRunningUpgrade = isRunningUpgrade;
+ this.logLevel= LogLevel.valueOf(this.services.getProps().get(QueryServices.LOG_LEVEL,
+ QueryServicesOptions.DEFAULT_LOGGING_LEVEL));
+ this.logSamplingRate = Double.parseDouble(this.services.getProps().get(QueryServices.LOG_SAMPLE_RATE,
+ QueryServicesOptions.DEFAULT_LOG_SAMPLE_RATE));
GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment();
}
@@ -648,6 +654,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
} finally {
services.removeConnection(this);
}
+
} finally {
isClosed = true;
GLOBAL_OPEN_PHOENIX_CONNECTIONS.decrement();
@@ -1274,4 +1281,12 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
this.isRunningUpgrade = isRunningUpgrade;
}
+ public LogLevel getLogLevel(){
+ return this.logLevel;
+ }
+
+ public Double getLogSamplingRate(){
+ return this.logSamplingRate;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index b88b381..9caf7fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -343,6 +343,24 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
public static final String USE_STATS_FOR_PARALLELIZATION = "USE_STATS_FOR_PARALLELIZATION";
public static final byte[] USE_STATS_FOR_PARALLELIZATION_BYTES = Bytes.toBytes(USE_STATS_FOR_PARALLELIZATION);
+
+ //SYSTEM:LOG
+ public static final String SYSTEM_LOG_TABLE = "LOG";
+ public static final String QUERY_ID = "QUERY_ID";
+ public static final String USER = "USER";
+ public static final String CLIENT_IP = "CLIENT_IP";
+ public static final String QUERY = "QUERY";
+ public static final String EXPLAIN_PLAN = "EXPLAIN_PLAN";
+ public static final String TOTAL_EXECUTION_TIME = "TOTAL_EXECUTION_TIME";
+ public static final String NO_OF_RESULTS_ITERATED = "NO_OF_RESULTS_ITERATED";
+ public static final String QUERY_STATUS = "QUERY_STATUS";
+ public static final String EXCEPTION_TRACE = "EXCEPTION_TRACE";
+ public static final String GLOBAL_SCAN_DETAILS = "GLOBAL_SCAN_DETAILS";
+ public static final String SCAN_METRICS_JSON = "SCAN_METRICS_JSON";
+ public static final String START_TIME = "START_TIME";
+ public static final String BIND_PARAMETERS = "BIND_PARAMETERS";
+
+
PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException {
this.emptyResultSet = new PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, new StatementContext(new PhoenixStatement(connection), false));
this.connection = connection;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
index 71ecb8d..914ea33 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
@@ -169,7 +169,13 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar
throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
.build().buildException();
}
- return execute(statement);
+ if (statement.getOperation().isMutation()) {
+ executeMutation(statement);
+ return false;
+ }
+ executeQuery(statement, createQueryLogger(statement,query));
+ return true;
+
}
@Override
@@ -183,7 +189,8 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar
if (statement.getOperation().isMutation()) {
throw new ExecuteQueryNotApplicableException(statement.getOperation());
}
- return executeQuery(statement);
+
+ return executeQuery(statement,createQueryLogger(statement,query));
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index d3ec151..153fa08 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -50,6 +50,9 @@ import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.log.QueryLogInfo;
+import org.apache.phoenix.log.QueryLogState;
+import org.apache.phoenix.log.QueryLogger;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.monitoring.OverAllQueryMetrics;
import org.apache.phoenix.monitoring.ReadMetricQueue;
@@ -72,6 +75,9 @@ import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.util.SQLCloseable;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
@@ -122,6 +128,14 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable {
private boolean isClosed = false;
private boolean wasNull = false;
private boolean firstRecordRead = false;
+
+ private QueryLogger queryLogger;
+
+ private Long count = 0L;
+
+ private QueryLogState logStatus = QueryLogState.COMPLETED;
+
+ private RuntimeException exception;
public PhoenixResultSet(ResultIterator resultIterator, RowProjector rowProjector, StatementContext ctx) throws SQLException {
this.rowProjector = rowProjector;
@@ -130,6 +144,7 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable {
this.statement = context.getStatement();
this.readMetricsQueue = context.getReadMetricsQueue();
this.overAllQueryMetrics = context.getOverallQueryMetrics();
+ this.queryLogger = context.getQueryLogger();
}
@Override
@@ -779,17 +794,39 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable {
currentRow = scanner.next();
if (currentRow == null) {
close();
+ }else{
+ count++;
}
rowProjector.reset();
} catch (RuntimeException e) {
+ this.logStatus=QueryLogState.FAILED;
// FIXME: Expression.evaluate does not throw SQLException
// so this will unwrap throws from that.
+ this.exception = e;
if (e.getCause() instanceof SQLException) {
throw (SQLException) e.getCause();
}
throw e;
+ }finally{
+ if (currentRow == null && queryLogger != null ) {
+ if (queryLogger.isDebugEnabled()) {
+ Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder();
+ queryLogBuilder.put(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, count);
+ queryLogBuilder.put(QueryLogInfo.TOTAL_EXECUTION_TIME_I,
+ System.currentTimeMillis() - queryLogger.getStartTime());
+ queryLogBuilder.put(QueryLogInfo.SCAN_METRICS_JSON_I,
+ readMetricsQueue.getScanMetricsHolderList().toString());
+ if (this.exception != null) {
+ queryLogBuilder.put(QueryLogInfo.EXCEPTION_TRACE_I,
+ Throwables.getStackTraceAsString(this.exception));
+ }
+ readMetricsQueue.getScanMetricsHolderList().clear();
+ queryLogger.log(logStatus, queryLogBuilder.build());
+ }
+ }
}
if (currentRow == null) {
+
overAllQueryMetrics.endQuery();
overAllQueryMetrics.stopResultSetWatch();
}
@@ -1301,6 +1338,7 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable {
public void resetMetrics() {
readMetricsQueue.clearMetrics();
+ readMetricsQueue.getScanMetricsHolderList().clear();
overAllQueryMetrics.reset();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 4a692d3..f526419 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -92,6 +92,10 @@ import org.apache.phoenix.expression.RowKeyColumnExpression;
import org.apache.phoenix.iterate.MaterializedResultIterator;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.log.QueryLogInfo;
+import org.apache.phoenix.log.QueryLogState;
+import org.apache.phoenix.log.QueryLogger;
+import org.apache.phoenix.log.QueryLoggerUtil;
import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.AddColumnStatement;
import org.apache.phoenix.parse.AddJarsStatement;
@@ -186,6 +190,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.math.IntMath;
@@ -269,26 +275,19 @@ public class PhoenixStatement implements Statement, SQLCloseable {
return new PhoenixResultSet(iterator, projector, context);
}
- protected boolean execute(final CompilableStatement stmt) throws SQLException {
- if (stmt.getOperation().isMutation()) {
- executeMutation(stmt);
- return false;
- }
- executeQuery(stmt);
- return true;
- }
-
protected QueryPlan optimizeQuery(CompilableStatement stmt) throws SQLException {
QueryPlan plan = stmt.compilePlan(this, Sequence.ValueOp.VALIDATE_SEQUENCE);
return connection.getQueryServices().getOptimizer().optimize(this, plan);
}
- protected PhoenixResultSet executeQuery(final CompilableStatement stmt) throws SQLException {
- return executeQuery(stmt,true);
+ protected PhoenixResultSet executeQuery(final CompilableStatement stmt, final QueryLogger queryLogger)
+ throws SQLException {
+ return executeQuery(stmt, true, queryLogger);
}
private PhoenixResultSet executeQuery(final CompilableStatement stmt,
- final boolean doRetryOnMetaNotFoundError) throws SQLException {
+ final boolean doRetryOnMetaNotFoundError, final QueryLogger queryLogger) throws SQLException {
GLOBAL_SELECT_SQL_COUNTER.increment();
+
try {
return CallRunner.run(
new CallRunner.CallableThrowable<PhoenixResultSet, SQLException>() {
@@ -297,6 +296,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
final long startTime = System.currentTimeMillis();
try {
PhoenixConnection conn = getConnection();
+
if (conn.getQueryServices().isUpgradeRequired() && !conn.isRunningUpgrade()
&& stmt.getOperation() != Operation.UPGRADE) {
throw new UpgradeRequiredException();
@@ -317,6 +317,13 @@ public class PhoenixStatement implements Statement, SQLCloseable {
logger.debug(LogUtil.addCustomAnnotations("Explain plan: " + explainPlan, connection));
}
StatementContext context = plan.getContext();
+ context.setQueryLogger(queryLogger);
+ if(queryLogger.isDebugEnabled()){
+ Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder();
+ queryLogBuilder.put(QueryLogInfo.EXPLAIN_PLAN_I, QueryUtil.getExplainPlan(resultIterator));
+ queryLogBuilder.put(QueryLogInfo.GLOBAL_SCAN_DETAILS_I, context.getScan()!=null?context.getScan().toString():null);
+ queryLogger.log(QueryLogState.COMPILED, queryLogBuilder.build());
+ }
context.getOverallQueryMetrics().startQuery();
PhoenixResultSet rs = newResultSet(resultIterator, plan.getProjector(), plan.getContext());
resultSets.add(rs);
@@ -338,7 +345,8 @@ public class PhoenixStatement implements Statement, SQLCloseable {
logger.debug("Reloading table "+ e.getTableName()+" data from server");
if(new MetaDataClient(connection).updateCache(connection.getTenantId(),
e.getSchemaName(), e.getTableName(), true).wasUpdated()){
- return executeQuery(stmt, false);
+ //TODO we can log retry count and error for debugging in LOG table
+ return executeQuery(stmt, false, queryLogger);
}
}
throw e;
@@ -358,6 +366,13 @@ public class PhoenixStatement implements Statement, SQLCloseable {
}
}, PhoenixContextExecutor.inContext());
}catch (Exception e) {
+ if (queryLogger.isDebugEnabled()) {
+ Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder();
+ queryLogBuilder.put(QueryLogInfo.TOTAL_EXECUTION_TIME_I,
+ System.currentTimeMillis() - queryLogger.getStartTime());
+ queryLogBuilder.put(QueryLogInfo.EXCEPTION_TRACE_I, Throwables.getStackTraceAsString(e));
+ queryLogger.log(QueryLogState.FAILED, queryLogBuilder.build());
+ }
Throwables.propagateIfInstanceOf(e, SQLException.class);
Throwables.propagate(e);
throw new IllegalStateException(); // Can't happen as Throwables.propagate() always throws
@@ -1750,16 +1765,37 @@ public class PhoenixStatement implements Statement, SQLCloseable {
return compileMutation(stmt, sql);
}
+ public QueryLogger createQueryLogger(CompilableStatement stmt, String sql) throws SQLException {
+ boolean isSystemTable=false;
+ if(stmt instanceof ExecutableSelectStatement){
+ TableNode from = ((ExecutableSelectStatement)stmt).getFrom();
+ if(from instanceof NamedTableNode){
+ String schemaName = ((NamedTableNode)from).getName().getSchemaName();
+ if(schemaName==null){
+ schemaName=connection.getSchema();
+ }
+ if(PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA.equals(schemaName)){
+ isSystemTable=true;
+ }
+ }
+ }
+ QueryLogger queryLogger = QueryLogger.getInstance(connection,isSystemTable);
+ QueryLoggerUtil.logInitialDetails(queryLogger, connection.getTenantId(),
+ connection.getQueryServices(), sql, queryLogger.getStartTime(), getParameters());
+ return queryLogger;
+ }
+
@Override
public ResultSet executeQuery(String sql) throws SQLException {
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Execute query: " + sql, connection));
}
+
CompilableStatement stmt = parseStatement(sql);
if (stmt.getOperation().isMutation()) {
throw new ExecuteQueryNotApplicableException(sql);
}
- return executeQuery(stmt);
+ return executeQuery(stmt,createQueryLogger(stmt,sql));
}
@Override
@@ -1795,7 +1831,8 @@ public class PhoenixStatement implements Statement, SQLCloseable {
flushIfNecessary();
return false;
}
- executeQuery(stmt);
+
+ executeQuery(stmt,createQueryLogger(stmt,sql));
return true;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java b/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java
new file mode 100644
index 0000000..5792658
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+public enum LogLevel {
+ OFF, INFO, DEBUG, TRACE
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java
new file mode 100644
index 0000000..817f9ec
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+/**
+ * Used by the event handler to write RingBufferEvent, this is done in a seperate thread from the application configured
+ * during disruptor
+ */
+public interface LogWriter {
+ /**
+ * Called by ring buffer event handler to write RingBufferEvent
+ *
+ * @param event
+ * @throws SQLException
+ * @throws IOException
+ */
+ void write(RingBufferEvent event) throws SQLException, IOException;
+
+ /**
+ * will be called when disruptor is getting shutdown
+ *
+ * @throws IOException
+ */
+
+ void close() throws IOException;
+
+ /**
+ * if writer is closed and cannot write further event
+ *
+ * @return
+ */
+ boolean isClosed();
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsEventHandler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsEventHandler.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsEventHandler.java
new file mode 100644
index 0000000..ee6b2d6
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsEventHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import java.sql.SQLException;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.lmax.disruptor.LifecycleAware;
+import com.lmax.disruptor.Sequence;
+import com.lmax.disruptor.SequenceReportingEventHandler;
+
+
+public class QueryLogDetailsEventHandler implements SequenceReportingEventHandler<RingBufferEvent>, LifecycleAware {
+ private Sequence sequenceCallback;
+ private LogWriter logWriter;
+
+ public QueryLogDetailsEventHandler(Configuration configuration) throws SQLException{
+ this.logWriter = new TableLogWriter(configuration);
+ }
+
+ @Override
+ public void setSequenceCallback(final Sequence sequenceCallback) {
+ this.sequenceCallback = sequenceCallback;
+ }
+
+ @Override
+ public void onEvent(final RingBufferEvent event, final long sequence, final boolean endOfBatch) throws Exception {
+ logWriter.write(event);
+ event.clear();
+ }
+
+ @Override
+ public void onStart() {
+ }
+
+ @Override
+ public void onShutdown() {
+ try {
+ if (logWriter != null) {
+ logWriter.close();
+ }
+ } catch (Exception e) {
+ //Ignore
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java
new file mode 100644
index 0000000..87de267
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BIND_PARAMETERS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLIENT_IP;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXCEPTION_TRACE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXPLAIN_PLAN;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GLOBAL_SCAN_DETAILS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NO_OF_RESULTS_ITERATED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_ID;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_STATUS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCAN_METRICS_JSON;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TOTAL_EXECUTION_TIME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER;
+
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PTimestamp;
+import org.apache.phoenix.schema.types.PVarchar;
+
+
+public enum QueryLogInfo {
+
+ CLIENT_IP_I(CLIENT_IP, QueryLogState.STARTED, LogLevel.INFO, PVarchar.INSTANCE),
+ QUERY_I(QUERY,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE),
+ BIND_PARAMETERS_I(BIND_PARAMETERS,QueryLogState.STARTED, LogLevel.TRACE,PVarchar.INSTANCE),
+ QUERY_ID_I(QUERY_ID,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE),
+ TENANT_ID_I(TENANT_ID,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE),
+ START_TIME_I(START_TIME,QueryLogState.STARTED, LogLevel.INFO,PTimestamp.INSTANCE),
+ USER_I(USER,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE),
+ EXPLAIN_PLAN_I(EXPLAIN_PLAN,QueryLogState.COMPILED, LogLevel.DEBUG,PVarchar.INSTANCE),
+ GLOBAL_SCAN_DETAILS_I(GLOBAL_SCAN_DETAILS,QueryLogState.COMPILED, LogLevel.DEBUG,PVarchar.INSTANCE),
+ NO_OF_RESULTS_ITERATED_I(NO_OF_RESULTS_ITERATED,QueryLogState.COMPLETED, LogLevel.DEBUG,PLong.INSTANCE),
+ EXCEPTION_TRACE_I(EXCEPTION_TRACE,QueryLogState.COMPLETED, LogLevel.DEBUG,PVarchar.INSTANCE),
+ QUERY_STATUS_I(QUERY_STATUS,QueryLogState.COMPLETED, LogLevel.DEBUG,PVarchar.INSTANCE),
+ TOTAL_EXECUTION_TIME_I(TOTAL_EXECUTION_TIME,QueryLogState.COMPLETED, LogLevel.DEBUG,PLong.INSTANCE),
+ SCAN_METRICS_JSON_I(SCAN_METRICS_JSON,QueryLogState.COMPLETED, LogLevel.DEBUG,PVarchar.INSTANCE);
+
+ public final String columnName;
+ public final QueryLogState logState;
+ public final LogLevel logLevel;
+ public final PDataType dataType;
+
+ private QueryLogInfo(String columnName, QueryLogState logState, LogLevel logLevel, PDataType dataType) {
+ this.columnName = columnName;
+ this.logState=logState;
+ this.logLevel=logLevel;
+ this.dataType=dataType;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public QueryLogState getLogState() {
+ return logState;
+ }
+
+ public LogLevel getLogLevel() {
+ return logLevel;
+ }
+
+ public PDataType getDataType() {
+ return dataType;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java
new file mode 100644
index 0000000..e27f0e8
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+public enum QueryLogState {
+ STARTED, PLAN, COMPILED, EXECUTION, COMPLETED,FAILED
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java
new file mode 100644
index 0000000..b2fb235
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+
+import com.google.common.collect.ImmutableMap;
+
+import io.netty.util.internal.ThreadLocalRandom;
+
+/*
+ * Wrapper for query translator
+ */
+public class QueryLogger {
+ private final ThreadLocal<RingBufferEventTranslator> threadLocalTranslator = new ThreadLocal<>();
+ private QueryLoggerDisruptor queryDisruptor;
+ private String queryId;
+ private Long startTime;
+ private LogLevel logLevel;
+ private static final Log LOG = LogFactory.getLog(QueryLoggerDisruptor.class);
+
+ private QueryLogger(PhoenixConnection connection) {
+ this.queryId = UUID.randomUUID().toString();
+ this.queryDisruptor = connection.getQueryServices().getQueryDisruptor();
+ this.startTime = System.currentTimeMillis();
+ logLevel = connection.getLogLevel();
+ }
+
+ private QueryLogger() {
+ logLevel = LogLevel.OFF;
+ }
+
+ private RingBufferEventTranslator getCachedTranslator() {
+ RingBufferEventTranslator result = threadLocalTranslator.get();
+ if (result == null) {
+ result = new RingBufferEventTranslator(queryId);
+ threadLocalTranslator.set(result);
+ }
+ return result;
+ }
+
+ private static final QueryLogger NO_OP_INSTANCE = new QueryLogger() {
+ @Override
+ public void log(QueryLogState logState, ImmutableMap<QueryLogInfo, Object> map) {
+
+ }
+
+ @Override
+ public boolean isDebugEnabled(){
+ return false;
+ }
+
+ @Override
+ public boolean isInfoEnabled(){
+ return false;
+ }
+ };
+
+ public static QueryLogger getInstance(PhoenixConnection connection, boolean isSystemTable) {
+ if (connection.getLogLevel() == LogLevel.OFF || isSystemTable || ThreadLocalRandom.current()
+ .nextDouble() > connection.getLogSamplingRate()) { return NO_OP_INSTANCE; }
+ return new QueryLogger(connection);
+ }
+
+ /**
+ * Add query log in the table, columns will be logged depending upon the connection logLevel
+ * @param logState State of the query
+ * @param map Value of the map should be in format of the corresponding data type
+ */
+ public void log(QueryLogState logState, ImmutableMap<QueryLogInfo, Object> map) {
+ final RingBufferEventTranslator translator = getCachedTranslator();
+ translator.setQueryInfo(logState, map, logLevel);
+ publishLogs(translator);
+ }
+
+ private boolean publishLogs(RingBufferEventTranslator translator) {
+ if (queryDisruptor == null) { return false; }
+ boolean isLogged = queryDisruptor.tryPublish(translator);
+ if (!isLogged && LOG.isDebugEnabled()) {
+ LOG.debug("Unable to write query log in table as ring buffer queue is full!!");
+ }
+ return isLogged;
+ }
+
+ /**
+ * Start time when the logger was started, if {@link LogLevel#OFF} then it's the current time
+ */
+ public Long getStartTime() {
+ return startTime != null ? startTime : System.currentTimeMillis();
+ }
+
+ /**
+ * Is debug logging currently enabled?
+ * Call this method to prevent having to perform expensive operations (for example, String concatenation) when the log level is more than debug.
+ */
+ public boolean isDebugEnabled(){
+ return isLevelEnabled(LogLevel.DEBUG);
+ }
+
+ private boolean isLevelEnabled(LogLevel logLevel){
+ return this.logLevel != null ? logLevel.ordinal() <= this.logLevel.ordinal() : false;
+ }
+
+ /**
+ * Is Info logging currently enabled?
+ * Call this method to prevent having to perform expensive operations (for example, String concatenation) when the log level is more than info.
+ * @return
+ */
+ public boolean isInfoEnabled(){
+ return isLevelEnabled(LogLevel.INFO);
+ }
+
+ /**
+ * Return queryId of the current query logger , needed by the application
+ * to correlate with the logging table.
+ * Eg(usage):-
+ * StatementContext context = ((PhoenixResultSet)rs).getContext();
+ * String queryId = context.getQueryLogger().getQueryId();
+ *
+ * @return
+ */
+ public String getQueryId() {
+ return this.queryId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDefaultExceptionHandler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDefaultExceptionHandler.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDefaultExceptionHandler.java
new file mode 100644
index 0000000..e9ae6bd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDefaultExceptionHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import com.lmax.disruptor.ExceptionHandler;
+
+class QueryLoggerDefaultExceptionHandler implements ExceptionHandler<RingBufferEvent> {
+
+ @Override
+ public void handleEventException(Throwable ex, long sequence, RingBufferEvent event) {
+ final StringBuilder sb = new StringBuilder(512);
+ sb.append("Query Logger error handling event seq=").append(sequence).append(", value='");
+ try {
+ sb.append(event);
+ } catch (final Exception ignored) {
+ sb.append("[ERROR calling ").append(event.getClass()).append(".toString(): ");
+ sb.append(ignored).append("]");
+ }
+ sb.append("':");
+ System.err.println(sb);
+ ex.printStackTrace();
+ }
+
+ @Override
+ public void handleOnStartException(final Throwable throwable) {
+ System.err.println("QueryLogger error starting:");
+ throwable.printStackTrace();
+ }
+
+ @Override
+ public void handleOnShutdownException(final Throwable throwable) {
+ System.err.println("QueryLogger error shutting down:");
+ throwable.printStackTrace();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDisruptor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDisruptor.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDisruptor.java
new file mode 100644
index 0000000..b548d6c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDisruptor.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.query.QueryServices;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventTranslator;
+import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.TimeoutException;
+import com.lmax.disruptor.WaitStrategy;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+
+public class QueryLoggerDisruptor implements Closeable{
+
+ private volatile Disruptor<RingBufferEvent> disruptor;
+ private boolean isClosed = false;
+ //number of elements to create within the ring buffer.
+ private static final int RING_BUFFER_SIZE = 256 * 1024;
+ private static final Log LOG = LogFactory.getLog(QueryLoggerDisruptor.class);
+ private static final String DEFAULT_WAIT_STRATEGY = BlockingWaitStrategy.class.getName();
+
+ public QueryLoggerDisruptor(Configuration configuration) throws SQLException{
+ WaitStrategy waitStrategy;
+ try {
+ waitStrategy = (WaitStrategy)Class
+ .forName(configuration.get(QueryServices.LOG_BUFFER_WAIT_STRATEGY, DEFAULT_WAIT_STRATEGY)).newInstance();
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+ throw new SQLException(e);
+ }
+
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("QueryLogger" + "-thread-%s")
+ .setDaemon(true)
+ .setThreadFactory(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ final Thread result = Executors.defaultThreadFactory().newThread(r);
+ result.setContextClassLoader(QueryLoggerDisruptor.class.getClass().getClassLoader());
+ return result;
+ }
+ })
+ .build();
+ disruptor = new Disruptor<RingBufferEvent>(RingBufferEvent.FACTORY,
+ configuration.getInt(QueryServices.LOG_BUFFER_SIZE, RING_BUFFER_SIZE), threadFactory, ProducerType.MULTI,
+ waitStrategy);
+ final ExceptionHandler<RingBufferEvent> errorHandler = new QueryLoggerDefaultExceptionHandler();
+ disruptor.setDefaultExceptionHandler(errorHandler);
+
+ final QueryLogDetailsEventHandler[] handlers = { new QueryLogDetailsEventHandler(configuration) };
+ disruptor.handleEventsWith(handlers);
+ LOG.info("Starting QueryLoggerDisruptor for with ringbufferSize=" + disruptor.getRingBuffer().getBufferSize()
+ + ", waitStrategy=" + waitStrategy.getClass().getSimpleName() + ", " + "exceptionHandler="
+ + errorHandler + "...");
+ disruptor.start();
+
+ }
+
+ /**
+ * Attempts to publish an event by translating (write) data representations into events claimed from the RingBuffer.
+ * @param translator
+ * @return
+ */
+ public boolean tryPublish(final EventTranslator<RingBufferEvent> translator) {
+ if(isClosed()){
+ return false;
+ }
+ return disruptor.getRingBuffer().tryPublishEvent(translator);
+ }
+
+
+ public boolean isClosed() {
+ return isClosed ;
+ }
+
+ @Override
+ public void close() throws IOException {
+ isClosed = true;
+ LOG.info("Shutting down QueryLoggerDisruptor..");
+ try {
+ //we can wait for 2 seconds, so that backlog can be committed
+ disruptor.shutdown(2, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ throw new IOException(e);
+ }
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java
new file mode 100644
index 0000000..2f22931
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.schema.PName;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
+
+public class QueryLoggerUtil {
+
+ public static void logInitialDetails(QueryLogger queryLogger, PName tenantId,
+ ConnectionQueryServices queryServices, String query, long startTime, List<Object> bindParameters) {
+ queryLogger.log(QueryLogState.STARTED,
+ getInitialDetails(tenantId, queryServices, query, startTime, bindParameters));
+
+ }
+
+ private static ImmutableMap<QueryLogInfo, Object> getInitialDetails(PName tenantId,
+ ConnectionQueryServices queryServices, String query, long startTime, List<Object> bindParameters) {
+ Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder();
+ String clientIP;
+ try {
+ clientIP = InetAddress.getLocalHost().getHostAddress();
+ } catch (UnknownHostException e) {
+ clientIP = "UnknownHost";
+ }
+ queryLogBuilder.put(QueryLogInfo.CLIENT_IP_I, clientIP);
+ queryLogBuilder.put(QueryLogInfo.QUERY_I, query);
+ queryLogBuilder.put(QueryLogInfo.START_TIME_I, startTime);
+ if (bindParameters != null) {
+ queryLogBuilder.put(QueryLogInfo.BIND_PARAMETERS_I, StringUtils.join(bindParameters,","));
+ }
+ if (tenantId != null) {
+ queryLogBuilder.put(QueryLogInfo.TENANT_ID_I, tenantId.getString());
+ }
+ queryLogBuilder.put(QueryLogInfo.USER_I, queryServices.getUserName() != null ? queryServices.getUserName()
+ : queryServices.getUser().getShortName());
+ return queryLogBuilder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java
new file mode 100644
index 0000000..96e4bf9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import com.google.common.collect.ImmutableMap;
+import com.lmax.disruptor.EventFactory;
+
+ class RingBufferEvent {
+ private String queryId;
+ private QueryLogState logState;
+ private LogLevel connectionLogLevel;
+ private ImmutableMap<QueryLogInfo, Object> queryInfo;
+
+ public static final Factory FACTORY = new Factory();
+
+ /**
+ * Creates the events that will be put in the RingBuffer.
+ */
+ private static class Factory implements EventFactory<RingBufferEvent> {
+ @Override
+ public RingBufferEvent newInstance() {
+ final RingBufferEvent result = new RingBufferEvent();
+ return result;
+ }
+ }
+
+ public void clear() {
+ this.logState=null;
+ this.queryInfo=null;
+ this.queryId=null;
+ }
+
+
+ public String getQueryId() {
+ return queryId;
+ }
+
+ public static Factory getFactory() {
+ return FACTORY;
+ }
+
+ public QueryLogState getLogState() {
+ return logState;
+ }
+
+ public void setQueryInfo(ImmutableMap<QueryLogInfo, Object> queryInfo) {
+ this.queryInfo=queryInfo;
+
+ }
+
+ public void setQueryId(String queryId) {
+ this.queryId=queryId;
+
+ }
+
+ public ImmutableMap<QueryLogInfo, Object> getQueryInfo() {
+ return queryInfo;
+
+ }
+
+ public void setLogState(QueryLogState logState) {
+ this.logState=logState;
+
+ }
+
+
+ public LogLevel getConnectionLogLevel() {
+ return connectionLogLevel;
+ }
+
+
+ public void setConnectionLogLevel(LogLevel connectionLogLevel) {
+ this.connectionLogLevel = connectionLogLevel;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java
new file mode 100644
index 0000000..653ddd6
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import com.google.common.collect.ImmutableMap;
+import com.lmax.disruptor.EventTranslator;
+
+class RingBufferEventTranslator implements EventTranslator<RingBufferEvent> {
+ private String queryId;
+ private QueryLogState logState;
+ private ImmutableMap<QueryLogInfo, Object> queryInfo;
+ private LogLevel connectionLogLevel;
+
+ public RingBufferEventTranslator(String queryId) {
+ this.queryId=queryId;
+ }
+
+ @Override
+ public void translateTo(RingBufferEvent event, long sequence) {
+ event.setQueryId(queryId);
+ event.setQueryInfo(queryInfo);
+ event.setLogState(logState);
+ event.setConnectionLogLevel(connectionLogLevel);
+ clear();
+ }
+
+ private void clear() {
+ setQueryInfo(null,null,null);
+ }
+
+ public void setQueryInfo(QueryLogState logState, ImmutableMap<QueryLogInfo, Object> queryInfo,
+ LogLevel connectionLogLevel) {
+ this.queryInfo = queryInfo;
+ this.logState = logState;
+ this.connectionLogLevel = connectionLogLevel;
+ }
+
+}