You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2017/03/01 00:35:19 UTC

phoenix git commit: PHOENIX-3663 - Implement resource controls in Phoenix JDBC driver.

Repository: phoenix
Updated Branches:
  refs/heads/master 8e1d10b3f -> cbc43bbb6


PHOENIX-3663 - Implement resource controls in Phoenix JDBC driver.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cbc43bbb
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cbc43bbb
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cbc43bbb

Branch: refs/heads/master
Commit: cbc43bbb6fbcbd3bdcb5a246d02edecb0e939b43
Parents: 8e1d10b
Author: Geoffrey Jacoby <gj...@apache.org>
Authored: Tue Feb 28 16:34:26 2017 -0800
Committer: Geoffrey Jacoby <gj...@apache.org>
Committed: Tue Feb 28 16:34:26 2017 -0800

----------------------------------------------------------------------
 .../phoenix/monitoring/PhoenixMetricsIT.java    | 39 +++++++++++++++++++-
 .../phoenix/exception/SQLExceptionCode.java     |  5 ++-
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  2 +
 .../org/apache/phoenix/jdbc/PhoenixDriver.java  |  3 --
 .../phoenix/monitoring/GlobalClientMetrics.java |  8 +++-
 .../apache/phoenix/monitoring/MetricType.java   |  5 ++-
 .../query/ConnectionQueryServicesImpl.java      | 16 +++++++-
 .../org/apache/phoenix/query/QueryServices.java |  5 ++-
 .../phoenix/query/QueryServicesOptions.java     |  4 +-
 9 files changed, 75 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index 4d075ab..04d125a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -17,6 +17,8 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_NUM_PARALLEL_SCANS;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_SERVICES_COUNTER;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
@@ -55,6 +57,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
@@ -850,7 +853,41 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
             exec.shutdownNow();
         }
     }
-    
+
+    @Test
+    public void testGetConnectionsThrottledForSameUrl() throws Exception {
+        int expectedPhoenixConnections = 11;
+        List<Connection> connections = Lists.newArrayList();
+        String zkQuorum = "localhost:" + getUtility().getZkCluster().getClientPort();
+        String url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum +
+        ':' +  CUSTOM_URL_STRING + '=' + "throttletest";
+
+        Properties props = new Properties();
+        props.setProperty(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, "10");
+
+        GLOBAL_HCONNECTIONS_COUNTER.getMetric().reset();
+        GLOBAL_QUERY_SERVICES_COUNTER.getMetric().reset();
+        GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.getMetric().reset();
+        GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.getMetric().reset();
+        boolean wasThrottled = false;
+        try {
+            for (int k = 0; k < expectedPhoenixConnections; k++) {
+                connections.add(DriverManager.getConnection(url, props));
+            }
+        } catch (SQLException se) {
+            wasThrottled = true;
+            assertEquals(SQLExceptionCode.NEW_CONNECTION_THROTTLED.getErrorCode(), se.getErrorCode());
+        } finally {
+            for (Connection c : connections) {
+                c.close();
+            }
+        }
+        assertEquals(1, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue());
+        assertTrue("No connection was throttled!", wasThrottled);
+        assertEquals(1, GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.getMetric().getValue());
+        assertEquals(expectedPhoenixConnections, GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.getMetric().getValue());
+    }
+
     @Test
     public void testGetConnectionsForDifferentTenantsConcurrently()  throws Exception {
         // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 8595eda..1e48640 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -426,7 +426,10 @@ public enum SQLExceptionCode {
                     "Cannot create schema because config " + QueryServices.IS_NAMESPACE_MAPPING_ENABLED
                             + " for enabling name space mapping isn't enabled."), INCONSISTENET_NAMESPACE_MAPPING_PROPERTIES(
                                     726, "43M10", " Inconsistent namespace mapping properites.."), ASYNC_NOT_ALLOWED(
-                                    727, "43M11", " ASYNC option is not allowed.. ");
+                                    727, "43M11", " ASYNC option is not allowed.. "),
+    NEW_CONNECTION_THROTTLED(728, "410M1", "Could not create connection " +
+        "because this client already has the maximum number" +
+        " of connections to the target cluster.");
 
     private final int errorCode;
     private final String sqlState;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index cb2390e..5f5237f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.jdbc;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.Collections.emptyMap;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -214,6 +215,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
     }
     
     public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData, MutationState mutationState, boolean isDescVarLengthRowKeyUpgrade, boolean isRunningUpgrade) throws SQLException {
+        GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.increment();
         this.url = url;
         this.isDescVarLengthRowKeyUpgrade = isDescVarLengthRowKeyUpgrade;
         // Copy so client cannot change

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
index b2acacf..67ac9c9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
@@ -149,8 +149,6 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
 
     private Cache<ConnectionInfo, ConnectionQueryServices> initializeConnectionCache() {
         Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
-        int maxCacheSize = config.getInt(QueryServices.CLIENT_CONNECTION_CACHE_MAX_SIZE,
-            QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_CACHE_MAX_SIZE);
         int maxCacheDuration = config.getInt(QueryServices.CLIENT_CONNECTION_CACHE_MAX_DURATION_MILLISECONDS,
             QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION);
         RemovalListener<ConnectionInfo, ConnectionQueryServices> cacheRemovalListener =
@@ -170,7 +168,6 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
                 }
             };
         return CacheBuilder.newBuilder()
-            .maximumSize(maxCacheSize)
             .expireAfterAccess(maxCacheDuration, TimeUnit.MILLISECONDS)
             .removalListener(cacheRemovalListener)
             .build();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
index fab4d27..b5f9422 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
@@ -39,7 +39,8 @@ import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME;
 import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
 import static org.apache.phoenix.monitoring.MetricType.TASK_QUEUE_WAIT_TIME;
-
+import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -77,7 +78,10 @@ public enum GlobalClientMetrics {
     GLOBAL_SPOOL_FILE_COUNTER(SPOOL_FILE_COUNTER),
     GLOBAL_OPEN_PHOENIX_CONNECTIONS(OPEN_PHOENIX_CONNECTIONS_COUNTER),
     GLOBAL_QUERY_SERVICES_COUNTER(QUERY_SERVICES_COUNTER),
-    GLOBAL_HCONNECTIONS_COUNTER(HCONNECTIONS_COUNTER);
+    GLOBAL_HCONNECTIONS_COUNTER(HCONNECTIONS_COUNTER),
+    GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER(PHOENIX_CONNECTIONS_THROTTLED_COUNTER),
+    GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER(PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER);
+
     
     private static final boolean isGlobalMetricsEnabled = QueryServicesOptions.withDefaults().isGlobalMetricsEnabled();
     private GlobalMetric metric;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index b420b75..7b21de5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -43,7 +43,10 @@ public enum MetricType {
     RESULT_SET_TIME_MS("Wall clock time elapsed for reading all records using resultSet.next()"),
     OPEN_PHOENIX_CONNECTIONS_COUNTER("Number of open phoenix connections"),
     QUERY_SERVICES_COUNTER("Number of ConnectionQueryServicesImpl instantiated"),
-    HCONNECTIONS_COUNTER("Number of HConnections created by phoenix driver");
+    HCONNECTIONS_COUNTER("Number of HConnections created by phoenix driver"),
+    PHOENIX_CONNECTIONS_THROTTLED_COUNTER("Number of client Phoenix connections prevented from opening " +
+                                              "because there are already too many to that target cluster."),
+    PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("Number of requests for Phoenix connections, whether successful or not.");
     
     private final String description;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 2329432..03a5e13 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -49,6 +49,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_SERVICES_COUNTER;
 import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
@@ -307,6 +308,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     private final boolean renewLeaseEnabled;
     private final boolean isAutoUpgradeEnabled;
     private final AtomicBoolean upgradeRequired = new AtomicBoolean(false);
+    private final int maxConnectionsAllowed;
+    private final boolean shouldThrottleNumConnections;
     public static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes();
     public static final byte[] UPGRADE_MUTEX_LOCKED = "UPGRADE_MUTEX_LOCKED".getBytes();
     public static final byte[] UPGRADE_MUTEX_UNLOCKED = "UPGRADE_MUTEX_UNLOCKED".getBytes();
@@ -387,6 +390,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         // A little bit of a smell to leak `this` here, but should not be a problem
         this.tableStatsCache = new GuidePostsCache(this, config);
         this.isAutoUpgradeEnabled = config.getBoolean(AUTO_UPGRADE_ENABLED, QueryServicesOptions.DEFAULT_AUTO_UPGRADE_ENABLED);
+        this.maxConnectionsAllowed = config.getInt(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS,
+            QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS);
+        this.shouldThrottleNumConnections = (maxConnectionsAllowed > 0);
+
     }
 
     @Override
@@ -3796,12 +3803,17 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
     @Override
     public void addConnection(PhoenixConnection connection) throws SQLException {
-        connectionQueues.get(getQueueIndex(connection)).add(new WeakReference<PhoenixConnection>(connection));
-        if (returnSequenceValues) {
+        if (returnSequenceValues || shouldThrottleNumConnections) {
             synchronized (connectionCountLock) {
+                if (shouldThrottleNumConnections && connectionCount + 1 > maxConnectionsAllowed){
+                    GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment();
+                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_CONNECTION_THROTTLED).
+                        build().buildException();
+                }
                 connectionCount++;
             }
         }
+        connectionQueues.get(getQueueIndex(connection)).add(new WeakReference<PhoenixConnection>(connection));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 1366add..0b7b737 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -238,9 +238,12 @@ public interface QueryServices extends SQLCloseable {
     public static final String CLIENT_CACHE_ENCODING = "phoenix.table.client.cache.encoding";
     public static final String AUTO_UPGRADE_ENABLED = "phoenix.autoupgrade.enabled";
 
-    public static final String CLIENT_CONNECTION_CACHE_MAX_SIZE = "phoenix.client.connection.cache.max.size";
     public static final String CLIENT_CONNECTION_CACHE_MAX_DURATION_MILLISECONDS =
         "phoenix.client.connection.max.duration";
+
+    //max number of connections from a single client to a single cluster. 0 is unlimited.
+    public static final String CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS =
+        "phoenix.client.connection.max.allowed.connections";
     public static final String DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB  = "phoenix.default.column.encoded.bytes.attrib";
     public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB  = "phoenix.default.immutable.storage.scheme";
     public static final String DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB  = "phoenix.default.multitenant.immutable.storage.scheme";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index f885d5c..4fd1344 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -268,12 +268,14 @@ public class QueryServicesOptions {
     
     public static final String DEFAULT_CLIENT_CACHE_ENCODING = PTableRefFactory.Encoding.OBJECT.toString();
     public static final boolean DEFAULT_AUTO_UPGRADE_ENABLED = true;
-    public static final int DEFAULT_CLIENT_CONNECTION_CACHE_MAX_SIZE = 100;
     public static final int DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION = 86400000;
     public static final int DEFAULT_COLUMN_ENCODED_BYTES = QualifierEncodingScheme.TWO_BYTE_QUALIFIERS.getSerializedMetadataValue();
     public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME = ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS.toString();
     public static final String DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME = ImmutableStorageScheme.ONE_CELL_PER_COLUMN.toString();
 
+    //by default, max connections from one client to one cluster is unlimited
+    public static final int DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS = 0;
+
     @SuppressWarnings("serial")
     public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>() {
       {