You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2017/01/27 01:18:58 UTC
[14/26] phoenix git commit: PHOENIX-3584 Expose metrics for
ConnectionQueryServices instances and their allocators in the JVM
PHOENIX-3584 Expose metrics for ConnectionQueryServices instances and their allocators in the JVM
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f5de28b6
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f5de28b6
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f5de28b6
Branch: refs/heads/calcite
Commit: f5de28b6d79141c555c994a94af7b4078872d845
Parents: d8f4594
Author: Samarth <sa...@salesforce.com>
Authored: Tue Jan 10 17:36:42 2017 -0800
Committer: Samarth <sa...@salesforce.com>
Committed: Tue Jan 10 17:36:42 2017 -0800
----------------------------------------------------------------------
.../phoenix/monitoring/PhoenixMetricsIT.java | 135 +++++++++++++++++++
.../phoenix/monitoring/GlobalClientMetrics.java | 6 +-
.../apache/phoenix/monitoring/MetricType.java | 4 +-
.../query/ConnectionQueryServicesImpl.java | 10 ++
4 files changed, 153 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5de28b6/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index 3af8ce7..16a66df 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -10,12 +10,14 @@
package org.apache.phoenix.monitoring;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_SIZE;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_COMMIT_TIME;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_NUM_PARALLEL_SCANS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_SERVICES_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_REJECTED_TASK_COUNTER;
@@ -28,6 +30,7 @@ import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -44,10 +47,16 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -67,6 +76,8 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
.newArrayList(MetricType.MUTATION_COMMIT_TIME.name());
private static final List<String> readMetricsToSkip = Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME.name(),
MetricType.TASK_EXECUTION_TIME.name(), MetricType.TASK_END_TO_END_TIME.name());
+ private static final String CUSTOM_URL_STRING = "SESSION";
+ private static final AtomicInteger numConnections = new AtomicInteger(0);
@BeforeClass
public static void doSetup() throws Exception {
@@ -76,6 +87,8 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
// disable renewing leases as this will force spooling to happen.
props.put(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ // need the non-test driver for some tests that check number of hconnections, etc.
+ DriverManager.registerDriver(PhoenixDriver.INSTANCE);
}
@Test
@@ -827,5 +840,127 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
}
}
}
+
+ @Test
+ public void testGetConnectionsForSameUrlConcurrently() throws Exception {
+ // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
+ String zkQuorum = "localhost:" + getUtility().getZkCluster().getClientPort();
+ String url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+ ExecutorService exec = Executors.newFixedThreadPool(10);
+ try {
+ GLOBAL_HCONNECTIONS_COUNTER.getMetric().reset();
+ GLOBAL_QUERY_SERVICES_COUNTER.getMetric().reset();
+ assertEquals(0, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue());
+ assertEquals(0, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue());
+ List<Callable<Connection>> callables = new ArrayList<>(100);
+ List<Future<Connection>> futures = new ArrayList<>(100);
+ int expectedHConnections = numConnections.get() > 0 ? 0 : 1;
+ for (int i = 1; i <= 100; i++) {
+ Callable<Connection> c = new GetConnectionCallable(url);
+ callables.add(c);
+ futures.add(exec.submit(c));
+ }
+ for (int i = 0; i < futures.size(); i++) {
+ Connection c = futures.get(i).get();
+ try {
+ c.close();
+ } catch (Exception ignore) {}
+ }
+ assertEquals(expectedHConnections, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue());
+ assertEquals(expectedHConnections, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue());
+ } finally {
+ exec.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testGetConnectionsForDifferentTenantsConcurrently() throws Exception {
+ // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
+ String zkQuorum = "localhost:" + getUtility().getZkCluster().getClientPort();
+ String url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+ ExecutorService exec = Executors.newFixedThreadPool(10);
+ try {
+ GLOBAL_HCONNECTIONS_COUNTER.getMetric().reset();
+ GLOBAL_QUERY_SERVICES_COUNTER.getMetric().reset();
+ assertEquals(0, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue());
+ assertEquals(0, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue());
+ int expectedHConnections = numConnections.get() > 0 ? 0 : 1;
+ List<Callable<Connection>> callables = new ArrayList<>(100);
+ List<Future<Connection>> futures = new ArrayList<>(100);
+ for (int i = 1; i <= 100; i++) {
+ String tenantUrl = url + ';' + TENANT_ID_ATTRIB + '=' + i;
+ Callable<Connection> c = new GetConnectionCallable(tenantUrl + ";");
+ callables.add(c);
+ futures.add(exec.submit(c));
+ }
+ for (int i = 0; i < futures.size(); i++) {
+ Connection c = futures.get(i).get();
+ try {
+ c.close();
+ } catch (Exception ignore) {}
+ }
+ assertEquals(expectedHConnections, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue());
+ assertEquals(expectedHConnections, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue());
+ } finally {
+ exec.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testGetConnectionsWithDifferentJDBCParamsConcurrently() throws Exception {
+ DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+ ExecutorService exec = Executors.newFixedThreadPool(4);
+ // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
+ String zkQuorum = "localhost:" + getUtility().getZkCluster().getClientPort();
+ String baseUrl = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+ int numConnections = 20;
+ List<Callable<Connection>> callables = new ArrayList<>(numConnections);
+ List<Future<Connection>> futures = new ArrayList<>(numConnections);
+ try {
+ GLOBAL_HCONNECTIONS_COUNTER.getMetric().reset();
+ GLOBAL_QUERY_SERVICES_COUNTER.getMetric().reset();
+ assertEquals(0, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue());
+ assertEquals(0, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue());
+ for (int i = 1; i <= numConnections; i++) {
+ String customUrl = baseUrl + ':' + CUSTOM_URL_STRING + '=' + i;
+ Callable<Connection> c = new GetConnectionCallable(customUrl + ";");
+ callables.add(c);
+ futures.add(exec.submit(c));
+ }
+ for (int i = 0; i < futures.size(); i++) {
+ futures.get(i).get();
+ }
+ assertEquals(numConnections, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue());
+ assertEquals(numConnections, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue());
+ } finally {
+ exec.shutdownNow();
+ for (int i = 0; i < futures.size(); i++) {
+ try {
+ Connection c = futures.get(i).get();
+ // close the query services instance because we created a lot of HConnections.
+ c.unwrap(PhoenixConnection.class).getQueryServices().close();
+ c.close();
+ } catch (Exception ignore) {}
+ }
+ }
+ }
+
+ private static class GetConnectionCallable implements Callable<Connection> {
+ private final String url;
+ GetConnectionCallable(String url) {
+ this.url = url;
+ }
+ @Override
+ public Connection call() throws Exception {
+ Connection c = DriverManager.getConnection(url);
+ if (!url.contains(CUSTOM_URL_STRING)) {
+ // check to detect whether a connection was established using the PhoenixDriver
+ // This is used in our tests to figure out whether a new hconnection and query
+ // services will be created.
+ numConnections.incrementAndGet();
+ }
+ return c;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5de28b6/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
index c3a7261..fab4d27 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.monitoring;
+import static org.apache.phoenix.monitoring.MetricType.HCONNECTIONS_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
@@ -26,6 +27,7 @@ import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_SERVICES_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.QUERY_TIME;
import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.TASK_REJECTED_COUNTER;
@@ -73,7 +75,9 @@ public enum GlobalClientMetrics {
GLOBAL_QUERY_TIMEOUT_COUNTER(QUERY_TIMEOUT_COUNTER),
GLOBAL_FAILED_QUERY_COUNTER(QUERY_FAILED_COUNTER),
GLOBAL_SPOOL_FILE_COUNTER(SPOOL_FILE_COUNTER),
- GLOBAL_OPEN_PHOENIX_CONNECTIONS(OPEN_PHOENIX_CONNECTIONS_COUNTER);
+ GLOBAL_OPEN_PHOENIX_CONNECTIONS(OPEN_PHOENIX_CONNECTIONS_COUNTER),
+ GLOBAL_QUERY_SERVICES_COUNTER(QUERY_SERVICES_COUNTER),
+ GLOBAL_HCONNECTIONS_COUNTER(HCONNECTIONS_COUNTER);
private static final boolean isGlobalMetricsEnabled = QueryServicesOptions.withDefaults().isGlobalMetricsEnabled();
private GlobalMetric metric;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5de28b6/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index 6cfe977..b420b75 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -41,7 +41,9 @@ public enum MetricType {
CACHE_REFRESH_SPLITS_COUNTER("Number of times cache was refreshed because of splits"),
WALL_CLOCK_TIME_MS("Wall clock time elapsed for the overall query execution"),
RESULT_SET_TIME_MS("Wall clock time elapsed for reading all records using resultSet.next()"),
- OPEN_PHOENIX_CONNECTIONS_COUNTER("Number of open phoenix connections");
+ OPEN_PHOENIX_CONNECTIONS_COUNTER("Number of open phoenix connections"),
+ QUERY_SERVICES_COUNTER("Number of ConnectionQueryServicesImpl instantiated"),
+ HCONNECTIONS_COUNTER("Number of HConnections created by phoenix driver");
private final String description;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5de28b6/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index be34f66..f2eb8e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -26,6 +26,8 @@ import static org.apache.phoenix.coprocessor.MetaDataProtocol.getVersion;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_SERVICES_COUNTER;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ENABLED;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE;
@@ -151,6 +153,8 @@ import org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.monitoring.GlobalClientMetrics;
+import org.apache.phoenix.monitoring.GlobalMetric;
import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.parse.PSchema;
import org.apache.phoenix.protobuf.ProtobufUtil;
@@ -392,6 +396,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
QueryServices.TRANSACTIONS_ENABLED,
QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
+ GLOBAL_HCONNECTIONS_COUNTER.increment();
+ logger.info("HConnnection established. Details: " + connection + " " + Throwables.getStackTraceAsString(new Exception()));
// only initialize the tx service client if needed and if we succeeded in getting a connection
// to HBase
if (transactionsEnabled) {
@@ -457,6 +463,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return;
}
closed = true;
+ GLOBAL_QUERY_SERVICES_COUNTER.decrement();
SQLException sqlE = null;
try {
// Attempt to return any unused sequences.
@@ -473,6 +480,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
try {
// close the HBase connection
if (connection != null) connection.close();
+ GLOBAL_HCONNECTIONS_COUNTER.decrement();
} finally {
if (renewLeaseExecutor != null) {
renewLeaseExecutor.shutdownNow();
@@ -2360,6 +2368,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
boolean hConnectionEstablished = false;
boolean success = false;
try {
+ GLOBAL_QUERY_SERVICES_COUNTER.increment();
+ logger.info("An instance of ConnectionQueryServices was created: " + Throwables.getStackTraceAsString(new Exception()));
openConnection();
hConnectionEstablished = true;
boolean isDoNotUpgradePropSet = UpgradeUtil.isNoUpgradeSet(props);