You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2017/01/27 01:18:58 UTC

[14/26] phoenix git commit: PHOENIX-3584 Expose metrics for ConnectionQueryServices instances and their allocators in the JVM

PHOENIX-3584 Expose metrics for ConnectionQueryServices instances and their allocators in the JVM


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f5de28b6
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f5de28b6
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f5de28b6

Branch: refs/heads/calcite
Commit: f5de28b6d79141c555c994a94af7b4078872d845
Parents: d8f4594
Author: Samarth <sa...@salesforce.com>
Authored: Tue Jan 10 17:36:42 2017 -0800
Committer: Samarth <sa...@salesforce.com>
Committed: Tue Jan 10 17:36:42 2017 -0800

----------------------------------------------------------------------
 .../phoenix/monitoring/PhoenixMetricsIT.java    | 135 +++++++++++++++++++
 .../phoenix/monitoring/GlobalClientMetrics.java |   6 +-
 .../apache/phoenix/monitoring/MetricType.java   |   4 +-
 .../query/ConnectionQueryServicesImpl.java      |  10 ++
 4 files changed, 153 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5de28b6/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index 3af8ce7..16a66df 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -10,12 +10,14 @@
 package org.apache.phoenix.monitoring;
 
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_SIZE;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_COMMIT_TIME;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_NUM_PARALLEL_SCANS;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_SERVICES_COUNTER;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_REJECTED_TASK_COUNTER;
@@ -28,6 +30,7 @@ import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
 import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
 import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -44,10 +47,16 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -67,6 +76,8 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
             .newArrayList(MetricType.MUTATION_COMMIT_TIME.name());
     private static final List<String> readMetricsToSkip = Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME.name(),
             MetricType.TASK_EXECUTION_TIME.name(), MetricType.TASK_END_TO_END_TIME.name());
+    private static final String CUSTOM_URL_STRING = "SESSION";
+    private static final AtomicInteger numConnections = new AtomicInteger(0); 
 
     @BeforeClass
     public static void doSetup() throws Exception {
@@ -76,6 +87,8 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
         // disable renewing leases as this will force spooling to happen.
         props.put(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false));
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+        // need the non-test driver for some tests that check number of hconnections, etc.
+        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
     }
 
     @Test
@@ -827,5 +840,127 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
             }
         }
     }
+    
+    @Test
+    public void testGetConnectionsForSameUrlConcurrently()  throws Exception {
+        // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
+        String zkQuorum = "localhost:" + getUtility().getZkCluster().getClientPort();
+        String url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+        ExecutorService exec = Executors.newFixedThreadPool(10);
+        try {
+            GLOBAL_HCONNECTIONS_COUNTER.getMetric().reset();
+            GLOBAL_QUERY_SERVICES_COUNTER.getMetric().reset();
+            assertEquals(0, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue());
+            assertEquals(0, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue());
+            List<Callable<Connection>> callables = new ArrayList<>(100);
+            List<Future<Connection>> futures = new ArrayList<>(100);
+            int expectedHConnections = numConnections.get() > 0 ? 0 : 1;
+            for (int i = 1; i <= 100; i++) {
+                Callable<Connection> c = new GetConnectionCallable(url);
+                callables.add(c);
+                futures.add(exec.submit(c));
+            }
+            for (int i = 0; i < futures.size(); i++) {
+                Connection c = futures.get(i).get();
+                try {
+                    c.close();
+                } catch (Exception ignore) {}
+            }
+            assertEquals(expectedHConnections, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue());
+            assertEquals(expectedHConnections, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue());
+        } finally {
+            exec.shutdownNow();
+        }
+    }
+    
+    @Test
+    public void testGetConnectionsForDifferentTenantsConcurrently()  throws Exception {
+        // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
+        String zkQuorum = "localhost:" + getUtility().getZkCluster().getClientPort();
+        String url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+        ExecutorService exec = Executors.newFixedThreadPool(10);
+        try {
+            GLOBAL_HCONNECTIONS_COUNTER.getMetric().reset();
+            GLOBAL_QUERY_SERVICES_COUNTER.getMetric().reset();
+            assertEquals(0, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue());
+            assertEquals(0, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue());
+            int expectedHConnections = numConnections.get() > 0 ? 0 : 1;
+            List<Callable<Connection>> callables = new ArrayList<>(100);
+            List<Future<Connection>> futures = new ArrayList<>(100);
+            for (int i = 1; i <= 100; i++) {
+                String tenantUrl = url + ';' + TENANT_ID_ATTRIB + '=' + i;
+                Callable<Connection> c = new GetConnectionCallable(tenantUrl + ";");
+                callables.add(c);
+                futures.add(exec.submit(c));
+            }
+            for (int i = 0; i < futures.size(); i++) {
+                Connection c = futures.get(i).get();
+                try {
+                    c.close();
+                } catch (Exception ignore) {}
+            }
+            assertEquals(expectedHConnections, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue());
+            assertEquals(expectedHConnections, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue());
+        } finally {
+            exec.shutdownNow();
+        }
+    }
+    
+    @Test
+    public void testGetConnectionsWithDifferentJDBCParamsConcurrently()  throws Exception {
+        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+        ExecutorService exec = Executors.newFixedThreadPool(4);
+        // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
+        String zkQuorum = "localhost:" + getUtility().getZkCluster().getClientPort();
+        String baseUrl = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+        int numConnections = 20;
+        List<Callable<Connection>> callables = new ArrayList<>(numConnections);
+        List<Future<Connection>> futures = new ArrayList<>(numConnections);
+        try {
+            GLOBAL_HCONNECTIONS_COUNTER.getMetric().reset();
+            GLOBAL_QUERY_SERVICES_COUNTER.getMetric().reset();
+            assertEquals(0, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue());
+            assertEquals(0, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue());
+            for (int i = 1; i <= numConnections; i++) {
+                String customUrl = baseUrl + ':' +  CUSTOM_URL_STRING + '=' + i;
+                Callable<Connection> c = new GetConnectionCallable(customUrl + ";");
+                callables.add(c);
+                futures.add(exec.submit(c));
+            }
+            for (int i = 0; i < futures.size(); i++) {
+                futures.get(i).get();
+            }
+            assertEquals(numConnections, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue());
+            assertEquals(numConnections, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue());
+        } finally {
+            exec.shutdownNow();
+            for (int i = 0; i < futures.size(); i++) {
+                try {
+                    Connection c = futures.get(i).get();
+                    // close the query services instance because we created a lot of HConnections.
+                    c.unwrap(PhoenixConnection.class).getQueryServices().close();
+                    c.close();
+                } catch (Exception ignore) {}
+            }
+        } 
+    }
+    
+    private static class GetConnectionCallable implements Callable<Connection> {
+        private final String url;
+        GetConnectionCallable(String url) {
+            this.url = url;
+        }
+        @Override
+        public Connection call() throws Exception {
+            Connection c = DriverManager.getConnection(url);
+            if (!url.contains(CUSTOM_URL_STRING)) {
+                // check to detect whether a connection was established using the PhoenixDriver
+                // This is used in our tests to figure out whether a new hconnection and query
+                // services will be created.
+                numConnections.incrementAndGet();
+            }
+            return c;
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5de28b6/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
index c3a7261..fab4d27 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.monitoring;
 
+import static org.apache.phoenix.monitoring.MetricType.HCONNECTIONS_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
@@ -26,6 +27,7 @@ import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.MetricType.QUERY_SERVICES_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.QUERY_TIME;
 import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.TASK_REJECTED_COUNTER;
@@ -73,7 +75,9 @@ public enum GlobalClientMetrics {
     GLOBAL_QUERY_TIMEOUT_COUNTER(QUERY_TIMEOUT_COUNTER),
     GLOBAL_FAILED_QUERY_COUNTER(QUERY_FAILED_COUNTER),
     GLOBAL_SPOOL_FILE_COUNTER(SPOOL_FILE_COUNTER),
-    GLOBAL_OPEN_PHOENIX_CONNECTIONS(OPEN_PHOENIX_CONNECTIONS_COUNTER);
+    GLOBAL_OPEN_PHOENIX_CONNECTIONS(OPEN_PHOENIX_CONNECTIONS_COUNTER),
+    GLOBAL_QUERY_SERVICES_COUNTER(QUERY_SERVICES_COUNTER),
+    GLOBAL_HCONNECTIONS_COUNTER(HCONNECTIONS_COUNTER);
     
     private static final boolean isGlobalMetricsEnabled = QueryServicesOptions.withDefaults().isGlobalMetricsEnabled();
     private GlobalMetric metric;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5de28b6/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index 6cfe977..b420b75 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -41,7 +41,9 @@ public enum MetricType {
     CACHE_REFRESH_SPLITS_COUNTER("Number of times cache was refreshed because of splits"),
     WALL_CLOCK_TIME_MS("Wall clock time elapsed for the overall query execution"),
     RESULT_SET_TIME_MS("Wall clock time elapsed for reading all records using resultSet.next()"),
-    OPEN_PHOENIX_CONNECTIONS_COUNTER("Number of open phoenix connections");
+    OPEN_PHOENIX_CONNECTIONS_COUNTER("Number of open phoenix connections"),
+    QUERY_SERVICES_COUNTER("Number of ConnectionQueryServicesImpl instantiated"),
+    HCONNECTIONS_COUNTER("Number of HConnections created by phoenix driver");
     
     private final String description;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5de28b6/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index be34f66..f2eb8e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -26,6 +26,8 @@ import static org.apache.phoenix.coprocessor.MetaDataProtocol.getVersion;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_SERVICES_COUNTER;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ENABLED;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE;
@@ -151,6 +153,8 @@ import org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.monitoring.GlobalClientMetrics;
+import org.apache.phoenix.monitoring.GlobalMetric;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.PSchema;
 import org.apache.phoenix.protobuf.ProtobufUtil;
@@ -392,6 +396,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     QueryServices.TRANSACTIONS_ENABLED,
                     QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
             this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
+            GLOBAL_HCONNECTIONS_COUNTER.increment();
+            logger.info("HConnnection established. Details: " + connection + " " +  Throwables.getStackTraceAsString(new Exception()));
             // only initialize the tx service client if needed and if we succeeded in getting a connection
             // to HBase
             if (transactionsEnabled) {
@@ -457,6 +463,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 return;
             }
             closed = true;
+            GLOBAL_QUERY_SERVICES_COUNTER.decrement();
             SQLException sqlE = null;
             try {
                 // Attempt to return any unused sequences.
@@ -473,6 +480,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     try {
                         // close the HBase connection
                         if (connection != null) connection.close();
+                        GLOBAL_HCONNECTIONS_COUNTER.decrement();
                     } finally {
                         if (renewLeaseExecutor != null) {
                             renewLeaseExecutor.shutdownNow();
@@ -2360,6 +2368,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                         boolean hConnectionEstablished = false;
                         boolean success = false;
                         try {
+                            GLOBAL_QUERY_SERVICES_COUNTER.increment();
+                            logger.info("An instance of ConnectionQueryServices was created: " + Throwables.getStackTraceAsString(new Exception()));
                             openConnection();
                             hConnectionEstablished = true;
                             boolean isDoNotUpgradePropSet = UpgradeUtil.isNoUpgradeSet(props);