You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2017/03/01 00:35:19 UTC
phoenix git commit: PHOENIX-3663 - Implement resource controls in
Phoenix JDBC driver.
Repository: phoenix
Updated Branches:
refs/heads/master 8e1d10b3f -> cbc43bbb6
PHOENIX-3663 - Implement resource controls in Phoenix JDBC driver.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cbc43bbb
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cbc43bbb
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cbc43bbb
Branch: refs/heads/master
Commit: cbc43bbb6fbcbd3bdcb5a246d02edecb0e939b43
Parents: 8e1d10b
Author: Geoffrey Jacoby <gj...@apache.org>
Authored: Tue Feb 28 16:34:26 2017 -0800
Committer: Geoffrey Jacoby <gj...@apache.org>
Committed: Tue Feb 28 16:34:26 2017 -0800
----------------------------------------------------------------------
.../phoenix/monitoring/PhoenixMetricsIT.java | 39 +++++++++++++++++++-
.../phoenix/exception/SQLExceptionCode.java | 5 ++-
.../apache/phoenix/jdbc/PhoenixConnection.java | 2 +
.../org/apache/phoenix/jdbc/PhoenixDriver.java | 3 --
.../phoenix/monitoring/GlobalClientMetrics.java | 8 +++-
.../apache/phoenix/monitoring/MetricType.java | 5 ++-
.../query/ConnectionQueryServicesImpl.java | 16 +++++++-
.../org/apache/phoenix/query/QueryServices.java | 5 ++-
.../phoenix/query/QueryServicesOptions.java | 4 +-
9 files changed, 75 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index 4d075ab..04d125a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -17,6 +17,8 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_NUM_PARALLEL_SCANS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_SERVICES_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
@@ -55,6 +57,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.jdbc.PhoenixResultSet;
@@ -850,7 +853,41 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
exec.shutdownNow();
}
}
-
+
+ @Test
+ public void testGetConnectionsThrottledForSameUrl() throws Exception {
+ int expectedPhoenixConnections = 11;
+ List<Connection> connections = Lists.newArrayList();
+ String zkQuorum = "localhost:" + getUtility().getZkCluster().getClientPort();
+ String url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum +
+ ':' + CUSTOM_URL_STRING + '=' + "throttletest";
+
+ Properties props = new Properties();
+ props.setProperty(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, "10");
+
+ GLOBAL_HCONNECTIONS_COUNTER.getMetric().reset();
+ GLOBAL_QUERY_SERVICES_COUNTER.getMetric().reset();
+ GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.getMetric().reset();
+ GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.getMetric().reset();
+ boolean wasThrottled = false;
+ try {
+ for (int k = 0; k < expectedPhoenixConnections; k++) {
+ connections.add(DriverManager.getConnection(url, props));
+ }
+ } catch (SQLException se) {
+ wasThrottled = true;
+ assertEquals(SQLExceptionCode.NEW_CONNECTION_THROTTLED.getErrorCode(), se.getErrorCode());
+ } finally {
+ for (Connection c : connections) {
+ c.close();
+ }
+ }
+ assertEquals(1, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue());
+ assertTrue("No connection was throttled!", wasThrottled);
+ assertEquals(1, GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.getMetric().getValue());
+ assertEquals(expectedPhoenixConnections, GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.getMetric().getValue());
+ }
+
@Test
public void testGetConnectionsForDifferentTenantsConcurrently() throws Exception {
// establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 8595eda..1e48640 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -426,7 +426,10 @@ public enum SQLExceptionCode {
"Cannot create schema because config " + QueryServices.IS_NAMESPACE_MAPPING_ENABLED
+ " for enabling name space mapping isn't enabled."), INCONSISTENET_NAMESPACE_MAPPING_PROPERTIES(
726, "43M10", " Inconsistent namespace mapping properites.."), ASYNC_NOT_ALLOWED(
- 727, "43M11", " ASYNC option is not allowed.. ");
+ 727, "43M11", " ASYNC option is not allowed.. "),
+ NEW_CONNECTION_THROTTLED(728, "410M1", "Could not create connection " +
+ "because this client already has the maximum number" +
+ " of connections to the target cluster.");
private final int errorCode;
private final String sqlState;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index cb2390e..5f5237f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.jdbc;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Collections.emptyMap;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER;
import java.io.EOFException;
import java.io.IOException;
@@ -214,6 +215,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
}
public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData, MutationState mutationState, boolean isDescVarLengthRowKeyUpgrade, boolean isRunningUpgrade) throws SQLException {
+ GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.increment();
this.url = url;
this.isDescVarLengthRowKeyUpgrade = isDescVarLengthRowKeyUpgrade;
// Copy so client cannot change
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
index b2acacf..67ac9c9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
@@ -149,8 +149,6 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
private Cache<ConnectionInfo, ConnectionQueryServices> initializeConnectionCache() {
Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
- int maxCacheSize = config.getInt(QueryServices.CLIENT_CONNECTION_CACHE_MAX_SIZE,
- QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_CACHE_MAX_SIZE);
int maxCacheDuration = config.getInt(QueryServices.CLIENT_CONNECTION_CACHE_MAX_DURATION_MILLISECONDS,
QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION);
RemovalListener<ConnectionInfo, ConnectionQueryServices> cacheRemovalListener =
@@ -170,7 +168,6 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
}
};
return CacheBuilder.newBuilder()
- .maximumSize(maxCacheSize)
.expireAfterAccess(maxCacheDuration, TimeUnit.MILLISECONDS)
.removalListener(cacheRemovalListener)
.build();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
index fab4d27..b5f9422 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
@@ -39,7 +39,8 @@ import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME;
import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
import static org.apache.phoenix.monitoring.MetricType.TASK_QUEUE_WAIT_TIME;
-
+import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -77,7 +78,10 @@ public enum GlobalClientMetrics {
GLOBAL_SPOOL_FILE_COUNTER(SPOOL_FILE_COUNTER),
GLOBAL_OPEN_PHOENIX_CONNECTIONS(OPEN_PHOENIX_CONNECTIONS_COUNTER),
GLOBAL_QUERY_SERVICES_COUNTER(QUERY_SERVICES_COUNTER),
- GLOBAL_HCONNECTIONS_COUNTER(HCONNECTIONS_COUNTER);
+ GLOBAL_HCONNECTIONS_COUNTER(HCONNECTIONS_COUNTER),
+ GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER(PHOENIX_CONNECTIONS_THROTTLED_COUNTER),
+ GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER(PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER);
+
private static final boolean isGlobalMetricsEnabled = QueryServicesOptions.withDefaults().isGlobalMetricsEnabled();
private GlobalMetric metric;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index b420b75..7b21de5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -43,7 +43,10 @@ public enum MetricType {
RESULT_SET_TIME_MS("Wall clock time elapsed for reading all records using resultSet.next()"),
OPEN_PHOENIX_CONNECTIONS_COUNTER("Number of open phoenix connections"),
QUERY_SERVICES_COUNTER("Number of ConnectionQueryServicesImpl instantiated"),
- HCONNECTIONS_COUNTER("Number of HConnections created by phoenix driver");
+ HCONNECTIONS_COUNTER("Number of HConnections created by phoenix driver"),
+ PHOENIX_CONNECTIONS_THROTTLED_COUNTER("Number of client Phoenix connections prevented from opening " +
+ "because there are already too many to that target cluster."),
+ PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("Number of requests for Phoenix connections, whether successful or not.");
private final String description;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 2329432..03a5e13 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -49,6 +49,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_SERVICES_COUNTER;
import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
@@ -307,6 +308,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private final boolean renewLeaseEnabled;
private final boolean isAutoUpgradeEnabled;
private final AtomicBoolean upgradeRequired = new AtomicBoolean(false);
+ private final int maxConnectionsAllowed;
+ private final boolean shouldThrottleNumConnections;
public static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes();
public static final byte[] UPGRADE_MUTEX_LOCKED = "UPGRADE_MUTEX_LOCKED".getBytes();
public static final byte[] UPGRADE_MUTEX_UNLOCKED = "UPGRADE_MUTEX_UNLOCKED".getBytes();
@@ -387,6 +390,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// A little bit of a smell to leak `this` here, but should not be a problem
this.tableStatsCache = new GuidePostsCache(this, config);
this.isAutoUpgradeEnabled = config.getBoolean(AUTO_UPGRADE_ENABLED, QueryServicesOptions.DEFAULT_AUTO_UPGRADE_ENABLED);
+ this.maxConnectionsAllowed = config.getInt(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS,
+ QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS);
+ this.shouldThrottleNumConnections = (maxConnectionsAllowed > 0);
+
}
@Override
@@ -3796,12 +3803,17 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
@Override
public void addConnection(PhoenixConnection connection) throws SQLException {
- connectionQueues.get(getQueueIndex(connection)).add(new WeakReference<PhoenixConnection>(connection));
- if (returnSequenceValues) {
+ if (returnSequenceValues || shouldThrottleNumConnections) {
synchronized (connectionCountLock) {
+ if (shouldThrottleNumConnections && connectionCount + 1 > maxConnectionsAllowed){
+ GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment();
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_CONNECTION_THROTTLED).
+ build().buildException();
+ }
connectionCount++;
}
}
+ connectionQueues.get(getQueueIndex(connection)).add(new WeakReference<PhoenixConnection>(connection));
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 1366add..0b7b737 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -238,9 +238,12 @@ public interface QueryServices extends SQLCloseable {
public static final String CLIENT_CACHE_ENCODING = "phoenix.table.client.cache.encoding";
public static final String AUTO_UPGRADE_ENABLED = "phoenix.autoupgrade.enabled";
- public static final String CLIENT_CONNECTION_CACHE_MAX_SIZE = "phoenix.client.connection.cache.max.size";
public static final String CLIENT_CONNECTION_CACHE_MAX_DURATION_MILLISECONDS =
"phoenix.client.connection.max.duration";
+
+ //max number of connections from a single client to a single cluster. 0 is unlimited.
+ public static final String CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS =
+ "phoenix.client.connection.max.allowed.connections";
public static final String DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB = "phoenix.default.column.encoded.bytes.attrib";
public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.immutable.storage.scheme";
public static final String DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.multitenant.immutable.storage.scheme";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index f885d5c..4fd1344 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -268,12 +268,14 @@ public class QueryServicesOptions {
public static final String DEFAULT_CLIENT_CACHE_ENCODING = PTableRefFactory.Encoding.OBJECT.toString();
public static final boolean DEFAULT_AUTO_UPGRADE_ENABLED = true;
- public static final int DEFAULT_CLIENT_CONNECTION_CACHE_MAX_SIZE = 100;
public static final int DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION = 86400000;
public static final int DEFAULT_COLUMN_ENCODED_BYTES = QualifierEncodingScheme.TWO_BYTE_QUALIFIERS.getSerializedMetadataValue();
public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME = ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS.toString();
public static final String DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME = ImmutableStorageScheme.ONE_CELL_PER_COLUMN.toString();
+ //by default, max connections from one client to one cluster is unlimited
+ public static final int DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS = 0;
+
@SuppressWarnings("serial")
public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>() {
{