You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by pb...@apache.org on 2018/04/14 07:37:19 UTC
[03/21] phoenix git commit: PHOENIX-2715 Query Log (Ankit Singhal)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
new file mode 100644
index 0000000..c102855
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.Determinism;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Writes RingBuffer log event into table
+ *
+ */
+public class TableLogWriter implements LogWriter {
+ private static final Log LOG = LogFactory.getLog(LogWriter.class);
+ private Connection connection;
+ private boolean isClosed;
+ private Table table;
+ private Configuration config;
+
+ public TableLogWriter(Configuration configuration) {
+ this.config = configuration;
+ try {
+ this.connection = ConnectionFactory.createConnection(configuration);
+ table = this.connection.getTable(SchemaUtil.getPhysicalTableName(
+ SchemaUtil.getTableNameAsBytes(SYSTEM_CATALOG_SCHEMA, SYSTEM_LOG_TABLE), config));
+ } catch (Exception e) {
+ LOG.warn("Unable to initiate LogWriter for writing query logs to table");
+ }
+ }
+
+ @Override
+ public void write(RingBufferEvent event) throws SQLException, IOException {
+ if(isClosed()){
+ LOG.warn("Unable to commit query log as Log committer is already closed");
+ return;
+ }
+ if (table == null || connection == null) {
+ LOG.warn("Unable to commit query log as connection was not initiated ");
+ return;
+ }
+ ImmutableMap<QueryLogInfo, Object> queryInfo=event.getQueryInfo();
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ Put put =new Put(Bytes.toBytes(event.getQueryId()));
+ for (Entry<QueryLogInfo, Object> entry : queryInfo.entrySet()) {
+ if (entry.getKey().logLevel.ordinal() <= event.getConnectionLogLevel().ordinal()) {
+ LiteralExpression expression = LiteralExpression.newConstant(entry.getValue(), entry.getKey().dataType,
+ Determinism.ALWAYS);
+ expression.evaluate(null, ptr);
+ put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes(entry.getKey().columnName),
+ ByteUtil.copyKeyBytesIfNecessary(ptr));
+ }
+ }
+
+ if (QueryLogInfo.QUERY_STATUS_I.logLevel.ordinal() <= event.getConnectionLogLevel().ordinal()
+ && (event.getLogState() == QueryLogState.COMPLETED || event.getLogState() == QueryLogState.FAILED)) {
+ LiteralExpression expression = LiteralExpression.newConstant(event.getLogState().toString(),
+ QueryLogInfo.QUERY_STATUS_I.dataType, Determinism.ALWAYS);
+ expression.evaluate(null, ptr);
+ put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+ Bytes.toBytes(QueryLogInfo.QUERY_STATUS_I.columnName), ByteUtil.copyKeyBytesIfNecessary(ptr));
+ }
+ put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+ table.put(put);
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ if(isClosed()){
+ return;
+ }
+ isClosed=true;
+ try {
+ if (table != null) {
+ table.close();
+ }
+ if (connection != null && !connection.isClosed()) {
+ //It should internally close all the statements
+ connection.close();
+ }
+ } catch (IOException e) {
+ // TODO Ignore?
+ }
+ }
+
+ public boolean isClosed(){
+ return isClosed;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
index 4fd1194..c008635 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
@@ -1,27 +1,21 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
*/
package org.apache.phoenix.monitoring;
import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
@@ -44,6 +38,8 @@ public class ReadMetricQueue {
private final ConcurrentMap<MetricKey, Queue<CombinableMetric>> metricsMap = new ConcurrentHashMap<>();
+ private final List<ScanMetricsHolder> scanMetricsHolderList = new ArrayList<ScanMetricsHolder>();
+
private final boolean isRequestMetricsEnabled;
public ReadMetricQueue(boolean isRequestMetricsEnabled) {
@@ -85,7 +81,7 @@ public class ReadMetricQueue {
}
return publishedMetrics;
}
-
+
public void clearMetrics() {
metricsMap.clear(); // help gc
}
@@ -177,8 +173,18 @@ public class ReadMetricQueue {
return q;
}
- public boolean isRequestMetricsEnabled() {
- return isRequestMetricsEnabled;
- }
+ public boolean isRequestMetricsEnabled() {
+ return isRequestMetricsEnabled;
+ }
+
+ public void addScanHolder(ScanMetricsHolder holder){
+ scanMetricsHolderList.add(holder);
+ }
+
+ public List<ScanMetricsHolder> getScanMetricsHolderList() {
+ return scanMetricsHolderList;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
index 6bcd402..9125cd8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
@@ -17,20 +17,23 @@
*/
package org.apache.phoenix.monitoring;
-import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_CALLS;
-import static org.apache.phoenix.monitoring.MetricType.COUNT_REMOTE_RPC_CALLS;
+import static org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_IN_REMOTE_RESULTS;
+import static org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_REGION_SERVER_RESULTS;
import static org.apache.phoenix.monitoring.MetricType.COUNT_MILLS_BETWEEN_NEXTS;
import static org.apache.phoenix.monitoring.MetricType.COUNT_NOT_SERVING_REGION_EXCEPTION;
-import static org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_REGION_SERVER_RESULTS;
-import static org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_IN_REMOTE_RESULTS;
+import static org.apache.phoenix.monitoring.MetricType.COUNT_REMOTE_RPC_CALLS;
+import static org.apache.phoenix.monitoring.MetricType.COUNT_REMOTE_RPC_RETRIES;
+import static org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_FILTERED;
+import static org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_SCANNED;
+import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_CALLS;
+import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_RETRIES;
import static org.apache.phoenix.monitoring.MetricType.COUNT_SCANNED_REGIONS;
-import org.apache.hadoop.hbase.client.Scan;
+import java.io.IOException;
+import java.util.Map;
-import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_RETRIES;
-import static org.apache.phoenix.monitoring.MetricType.COUNT_REMOTE_RPC_RETRIES;
-import static org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_SCANNED;
-import static org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_FILTERED;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.JsonMapper;
public class ScanMetricsHolder {
@@ -45,9 +48,11 @@ public class ScanMetricsHolder {
private final CombinableMetric countOfRemoteRPCRetries;
private final CombinableMetric countOfRowsScanned;
private final CombinableMetric countOfRowsFiltered;
+ private Map<String, Long> scanMetricMap;
+ private Object scan;
private static final ScanMetricsHolder NO_OP_INSTANCE =
- new ScanMetricsHolder(new ReadMetricQueue(false), "");
+ new ScanMetricsHolder(new ReadMetricQueue(false), "",null);
public static ScanMetricsHolder getInstance(ReadMetricQueue readMetrics, String tableName,
Scan scan, boolean isRequestMetricsEnabled) {
@@ -55,10 +60,12 @@ public class ScanMetricsHolder {
return NO_OP_INSTANCE;
}
scan.setScanMetricsEnabled(true);
- return new ScanMetricsHolder(readMetrics, tableName);
+ return new ScanMetricsHolder(readMetrics, tableName, scan);
}
- private ScanMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
+ private ScanMetricsHolder(ReadMetricQueue readMetrics, String tableName,Scan scan) {
+ readMetrics.addScanHolder(this);
+ this.scan=scan;
countOfRPCcalls = readMetrics.allotMetric(COUNT_RPC_CALLS, tableName);
countOfRemoteRPCcalls = readMetrics.allotMetric(COUNT_REMOTE_RPC_CALLS, tableName);
sumOfMillisSecBetweenNexts = readMetrics.allotMetric(COUNT_MILLS_BETWEEN_NEXTS, tableName);
@@ -118,4 +125,21 @@ public class ScanMetricsHolder {
return countOfRowsScanned;
}
+ public Map<String, Long> getScanMetricMap() {
+ return scanMetricMap;
+ }
+
+ public void setScanMetricMap(Map<String, Long> scanMetricMap) {
+ this.scanMetricMap = scanMetricMap;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return "{\"scan\":" + scan + ", \"scanMetrics\":" + JsonMapper.writeObjectAsString(scanMetricMap) + "}";
+ } catch (IOException e) {
+ return "{\"Exception while converting scan metrics to Json\":\"" + e.getMessage() + "\"}";
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 90f8089..0b72ada 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -30,12 +30,14 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.MutationPlan;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.log.QueryLoggerDisruptor;
import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PName;
@@ -146,4 +148,8 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
void upgradeSystemTables(String url, Properties props) throws SQLException;
public Configuration getConfiguration();
+
+ public User getUser();
+
+ public QueryLoggerDisruptor getQueryDisruptor();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 6df2f80..8c7441a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -124,6 +124,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.IndexHalfStoreFileReaderGenerator;
import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -183,6 +184,7 @@ import org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.log.QueryLoggerDisruptor;
import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.parse.PSchema;
import org.apache.phoenix.protobuf.ProtobufUtil;
@@ -267,6 +269,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// don't need.
private final ReadOnlyProps props;
private final String userName;
+ private final User user;
private final ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices> childServices;
private final GuidePostsCache tableStatsCache;
@@ -336,6 +339,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return hbaseVersion >= PhoenixDatabaseMetaData.MIN_RENEW_LEASE_VERSION;
}
});
+ private QueryLoggerDisruptor queryDisruptor;
private PMetaData newEmptyMetaData() {
return new PSynchronizedMetaData(new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, getProps()));
@@ -372,6 +376,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
ConfigUtil.setReplicationConfigIfAbsent(this.config);
this.props = new ReadOnlyProps(this.config.iterator());
this.userName = connectionInfo.getPrincipal();
+ this.user = connectionInfo.getUser();
this.latestMetaData = newEmptyMetaData();
// TODO: should we track connection wide memory usage or just org-wide usage?
// If connection-wide, create a MemoryManager here, otherwise just use the one from the delegate
@@ -396,6 +401,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
this.maxConnectionsAllowed = config.getInt(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS,
QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS);
this.shouldThrottleNumConnections = (maxConnectionsAllowed > 0);
+ try {
+ this.queryDisruptor = new QueryLoggerDisruptor(this.config);
+ } catch (SQLException e) {
+ logger.warn("Unable to initiate qeuery logging service !!");
+ e.printStackTrace();
+ }
}
@@ -477,6 +488,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
closed = true;
GLOBAL_QUERY_SERVICES_COUNTER.decrement();
+ try {
+ if (this.queryDisruptor != null) {
+ this.queryDisruptor.close();
+ }
+ } catch (Exception e) {
+ // Ignore
+ }
SQLException sqlE = null;
try {
// Attempt to return any unused sequences.
@@ -2611,7 +2629,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
try {
metaConnection.createStatement().execute(QueryConstants.CREATE_FUNCTION_METADATA);
} catch (TableAlreadyExistsException ignore) {}
-
+ try {
+ metaConnection.createStatement().execute(QueryConstants.CREATE_LOG_METADATA);
+ } catch (TableAlreadyExistsException ignore) {}
// Catch the IOException to log the error message and then bubble it up for the client to retry.
try {
createSysMutexTableIfNotExists(hbaseAdmin, ConnectionQueryServicesImpl.this.getProps());
@@ -2966,6 +2986,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
try {
metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_FUNCTION_METADATA);
} catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {}
+ try {
+ metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_LOG_METADATA);
+ } catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {}
ConnectionQueryServicesImpl.this.upgradeRequired.set(false);
success = true;
} catch (UpgradeInProgressException | UpgradeNotRequiredException e) {
@@ -4063,6 +4086,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
public String getUserName() {
return userName;
}
+
+ @Override
+ public User getUser() {
+ return user;
+ }
private void checkClosed() {
if (closed) {
@@ -4488,4 +4516,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
public Configuration getConfiguration() {
return config;
}
+
+ @Override
+ public QueryLoggerDisruptor getQueryDisruptor() {
+ return this.queryDisruptor;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index c510b5a..ad354d1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -55,6 +56,7 @@ import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.log.QueryLoggerDisruptor;
import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.parse.PSchema;
import org.apache.phoenix.schema.FunctionNotFoundException;
@@ -112,10 +114,13 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
private final Map<String, List<HRegionLocation>> tableSplits = Maps.newHashMap();
private final GuidePostsCache guidePostsCache;
private final Configuration config;
+
+ private User user;
public ConnectionlessQueryServicesImpl(QueryServices services, ConnectionInfo connInfo, Properties info) {
super(services);
userName = connInfo.getPrincipal();
+ user = connInfo.getUser();
metaData = newEmptyMetaData();
// Use KeyValueBuilder that builds real KeyValues, as our test utils require this
@@ -328,6 +333,9 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_FUNCTION_METADATA);
} catch (NewerTableAlreadyExistsException ignore) {
}
+ try {
+ metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_LOG_METADATA);
+ } catch (NewerTableAlreadyExistsException ignore) {}
} catch (SQLException e) {
sqlE = e;
} finally {
@@ -664,4 +672,14 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
public Configuration getConfiguration() {
return config;
}
+
+ @Override
+ public User getUser() {
+ return user;
+ }
+
+ @Override
+ public QueryLoggerDisruptor getQueryDisruptor() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 05d1af6..f5c8a59 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -30,12 +30,14 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.MutationPlan;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.log.QueryLoggerDisruptor;
import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.parse.PSchema;
import org.apache.phoenix.schema.PColumn;
@@ -351,4 +353,16 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
public Configuration getConfiguration() {
return getDelegate().getConfiguration();
}
+
+ @Override
+ public User getUser() {
+ return getDelegate().getUser();
+ }
+
+ @Override
+ public QueryLoggerDisruptor getQueryDisruptor() {
+ return getDelegate().getQueryDisruptor();
+ }
+
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 7607388..ae12e01 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -109,6 +109,20 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BIND_PARAMETERS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLIENT_IP;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXCEPTION_TRACE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXPLAIN_PLAN;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GLOBAL_SCAN_DETAILS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NO_OF_RESULTS_ITERATED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_ID;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_STATUS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCAN_METRICS_JSON;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TOTAL_EXECUTION_TIME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE;
import java.math.BigDecimal;
@@ -124,6 +138,7 @@ import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableProperty;
/**
@@ -395,10 +410,40 @@ public interface QueryConstants {
// Install split policy to prevent a tenant's metadata from being split across regions.
HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "',\n" +
PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
+
+ public static final String CREATE_LOG_METADATA =
+ "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"(\n" +
+ // Pk columns
+ TENANT_ID + " VARCHAR ," +
+ QUERY_ID + " VARCHAR NOT NULL,\n" +
+ USER + " VARCHAR , \n" +
+ CLIENT_IP + " VARCHAR, \n" +
+ // Function metadata (will be null for argument row)
+ QUERY + " VARCHAR, \n" +
+ EXPLAIN_PLAN + " VARCHAR, \n" +
+ // Argument metadata (will be null for function row)
+ START_TIME + " TIMESTAMP, \n" +
+ TOTAL_EXECUTION_TIME + " BIGINT, \n" +
+ NO_OF_RESULTS_ITERATED + " BIGINT, \n" +
+ QUERY_STATUS + " VARCHAR, \n" +
+ EXCEPTION_TRACE + " VARCHAR, \n" +
+ GLOBAL_SCAN_DETAILS + " VARCHAR, \n" +
+ BIND_PARAMETERS + " VARCHAR, \n" +
+ SCAN_METRICS_JSON + " VARCHAR, \n" +
+ " CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (QUERY_ID))\n" +
+ HConstants.VERSIONS + "= " + MetaDataProtocol.DEFAULT_LOG_VERSIONS + ",\n" +
+ HColumnDescriptor.KEEP_DELETED_CELLS + "=" + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n"+
+ // Install split policy to prevent a tenant's metadata from being split across regions.
+ HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "',\n" +
+ PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE+ ",\n" +
+ HColumnDescriptor.TTL + "=" + MetaDataProtocol.DEFAULT_LOG_TTL+",\n"+
+ TableProperty.COLUMN_ENCODED_BYTES.toString()+" = 0";
+
public static final byte[] OFFSET_FAMILY = "f_offset".getBytes();
public static final byte[] OFFSET_COLUMN = "c_offset".getBytes();
public static final String LAST_SCAN = "LAST_SCAN";
public static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes();
public static final String HASH_JOIN_CACHE_RETRIES = "hashjoin.client.retries.number";
public static final int DEFAULT_HASH_JOIN_CACHE_RETRIES = 5;
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 0b18aaa..43b9e5a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -301,6 +301,10 @@ public interface QueryServices extends SQLCloseable {
// Whether to enable cost-based-decision in the query optimizer
public static final String COST_BASED_OPTIMIZER_ENABLED = "phoenix.costbased.optimizer.enabled";
public static final String SMALL_SCAN_THRESHOLD_ATTRIB = "phoenix.query.smallScanThreshold";
+ public static final String LOG_LEVEL = "phoenix.log.level";
+ public static final String LOG_BUFFER_SIZE = "phoenix.log.buffer.size";
+ public static final String LOG_BUFFER_WAIT_STRATEGY = "phoenix.log.wait.strategy";
+ public static final String LOG_SAMPLE_RATE = "phoenix.log.sample.rate";
/**
* Get executor service used for parallel scans
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 961ab9f..58c9812 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -47,6 +47,8 @@ import static org.apache.phoenix.query.QueryServices.IS_NAMESPACE_MAPPING_ENABLE
import static org.apache.phoenix.query.QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE;
import static org.apache.phoenix.query.QueryServices.KEEP_ALIVE_MS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.LOG_LEVEL;
+import static org.apache.phoenix.query.QueryServices.LOG_SAMPLE_RATE;
import static org.apache.phoenix.query.QueryServices.MASTER_INFO_PORT_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_PERC_ATTRIB;
@@ -107,6 +109,7 @@ import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.phoenix.log.LogLevel;
import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
import org.apache.phoenix.schema.PTableRefFactory;
@@ -347,6 +350,8 @@ public class QueryServicesOptions {
public static final boolean DEFAULT_ENABLE_SERVER_UPSERT_SELECT = false;
public static final boolean DEFAULT_COST_BASED_OPTIMIZER_ENABLED = false;
+ public static final String DEFAULT_LOGGING_LEVEL = LogLevel.OFF.name();
+ public static final String DEFAULT_LOG_SAMPLE_RATE = "1.0";
private final Configuration config;
@@ -428,7 +433,9 @@ public class QueryServicesOptions {
.setIfUnset(USE_STATS_FOR_PARALLELIZATION, DEFAULT_USE_STATS_FOR_PARALLELIZATION)
.setIfUnset(COST_BASED_OPTIMIZER_ENABLED, DEFAULT_COST_BASED_OPTIMIZER_ENABLED)
.setIfUnset(UPLOAD_BINARY_DATA_TYPE_ENCODING, DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING)
- .setIfUnset(PHOENIX_ACLS_ENABLED, DEFAULT_PHOENIX_ACLS_ENABLED);
+ .setIfUnset(PHOENIX_ACLS_ENABLED, DEFAULT_PHOENIX_ACLS_ENABLED)
+ .setIfUnset(LOG_LEVEL, DEFAULT_LOGGING_LEVEL)
+ .setIfUnset(LOG_SAMPLE_RATE, DEFAULT_LOG_SAMPLE_RATE);
// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user set
// it to 1, so we'll change it.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 73964c0..970428a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -120,6 +120,7 @@
<jackson.version>${cdh.jackson.version}</jackson.version>
<antlr.version>3.5.2</antlr.version>
<log4j.version>1.2.17</log4j.version>
+ <disruptor.version>3.3.6</disruptor.version>
<slf4j.version>${cdh.slf4j.version}</slf4j.version>
<protobuf-java.version>2.5.0</protobuf-java.version>
<commons-io.version>2.1</commons-io.version>
@@ -994,6 +995,11 @@
<artifactId>javax.servlet-api</artifactId>
<version>${servlet.api.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.lmax</groupId>
+ <artifactId>disruptor</artifactId>
+ <version>${disruptor.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>