You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ya...@apache.org on 2020/04/30 03:23:04 UTC
[phoenix] branch 4.x updated: PHOENIX-5872 Close Internal Phoenix
Connections that were running during cancel
This is an automated email from the ASF dual-hosted git repository.
yanxinyi pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new 6372e95 PHOENIX-5872 Close Internal Phoenix Connections that were running during cancel
6372e95 is described below
commit 6372e95c35f5498e4a2091bffab0e3c77cd984a6
Author: Daniel Wong <da...@salesforce.com>
AuthorDate: Mon Apr 27 03:38:54 2020 -0700
PHOENIX-5872 Close Internal Phoenix Connections that were running during cancel
Signed-off-by: Xinyi Yan <ya...@apache.org>
---
.../phoenix/query/MaxConcurrentConnectionsIT.java | 132 +++++++++++++++++++++
.../apache/phoenix/util/DelayedRegionServer.java | 119 +++++++++++++++++++
.../compile/MutatingParallelIteratorFactory.java | 1 +
.../apache/phoenix/exception/SQLExceptionCode.java | 3 +
.../org/apache/phoenix/jdbc/PhoenixConnection.java | 59 +++++++--
.../phoenix/monitoring/GlobalClientMetrics.java | 2 +
.../org/apache/phoenix/monitoring/MetricType.java | 1 +
.../phoenix/query/ConnectionQueryServicesImpl.java | 57 ++++++---
.../org/apache/phoenix/query/QueryServices.java | 3 +
.../apache/phoenix/query/QueryServicesOptions.java | 2 +
10 files changed, 356 insertions(+), 23 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
new file mode 100644
index 0000000..7da276c
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.query;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.util.DelayedRegionServer;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.phoenix.exception.SQLExceptionCode.NEW_CONNECTION_THROTTLED;
+import static org.apache.phoenix.exception.SQLExceptionCode.NEW_INTERNAL_CONNECTION_THROTTLED;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
+import static org.apache.phoenix.query.QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS;
+import static org.apache.phoenix.query.QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS;
+import static org.apache.phoenix.query.QueryServices.RENEW_LEASE_ENABLED;
+import static org.apache.phoenix.query.QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Note that some tests for concurrentConnections live in PhoenixMetricsIT.java which also test the metric emission
+ */
+public class MaxConcurrentConnectionsIT extends BaseUniqueNamesOwnClusterIT {
+
+ private static HBaseTestingUtility hbaseTestUtil;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ hbaseTestUtil = new HBaseTestingUtility();
+
+ hbaseTestUtil.startMiniCluster(1,1,null,null,DelayedRegionServer.class);
+ // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
+ String zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
+ url = PhoenixRuntime.JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkQuorum +
+ JDBC_PROTOCOL_SEPARATOR + "uniqueConn=A";
+ DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+ }
+
+ private String getUniqueUrl() {
+ return url + generateUniqueName();
+ }
+
+ //Have to shutdown our special delayed region server
+ @AfterClass
+ public static void tearDown() throws Exception {
+ hbaseTestUtil.shutdownMiniCluster();
+ }
+
+ /**
+ * This tests the delete path which creates a internal phoenix connection per region
+ * @throws Exception
+ */
+ @Test
+ public void testDeleteRuntimeFailureClosesConnections() throws Exception {
+ String tableName = generateUniqueName();
+ String connectionUrl = getUniqueUrl();
+ //table with lots of regions
+ String ddl = "create table " + tableName + " (i integer not null primary key, j integer) SALT_BUCKETS=256 ";
+
+ Properties props = new Properties();
+ props.setProperty(CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS,String.valueOf(10));
+ props.setProperty(INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS,String.valueOf(10));
+
+ //delay any task handeling as that causes additional connections
+ props.setProperty(TASK_HANDLING_INTERVAL_MS_ATTRIB,String.valueOf(600000));
+ props.setProperty(TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,String.valueOf(600000));
+
+ String deleteStmt = "DELETE FROM " + tableName + " WHERE 20 = j";
+
+ try(Connection conn = DriverManager.getConnection(connectionUrl, props); Statement statement = conn.createStatement()) {
+ statement.execute(ddl);
+ }
+
+ assertEquals(0, GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue());
+ assertEquals(0, GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.getMetric().getValue());
+ Connection conn = null;
+ try {
+ conn = DriverManager.getConnection(connectionUrl, props);
+ //Enable delay for the delete
+ DelayedRegionServer.setDelayEnabled(true);
+ try (Statement statement = conn.createStatement()) {
+ statement.execute(deleteStmt);
+ }
+ fail();
+ } catch (SQLException e) {
+ assertEquals(NEW_INTERNAL_CONNECTION_THROTTLED.getErrorCode(), e.getErrorCode());
+ assertEquals(NEW_INTERNAL_CONNECTION_THROTTLED.getSQLState(), e.getSQLState());
+ } finally {
+ DelayedRegionServer.setDelayEnabled(false);
+ if (conn != null) {
+ conn.close();
+ }
+ long connections = GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue();
+ assertEquals(String.format("Found %d connections still open.", connections),0,connections);
+ connections = GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.getMetric().getValue();
+ assertEquals(String.format("Found %d internal connections still open.", connections),0 ,connections);
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/util/DelayedRegionServer.java b/phoenix-core/src/it/java/org/apache/phoenix/util/DelayedRegionServer.java
new file mode 100644
index 0000000..bf02621
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/util/DelayedRegionServer.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.util;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * This is a extended MiniHbaseCluster Region Server whcih allows developer/tester to inject
+ * delay into specific server side operations for testing.
+ */
+public class DelayedRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DelayedRegionServer.class);
+
+ static boolean doDelay = false;
+ // Activate the delays after table creation to test get/scan/put
+ private static int DELAY_GET = 0;
+ private static int DELAY_SCAN = 30000;
+ private static int DELAY_MUTATE = 0;
+
+ public static void setDelayEnabled(boolean delay) {
+ doDelay = delay;
+ }
+
+ public static void setDelayGet(int delayGet) {
+ DELAY_GET = delayGet;
+ }
+
+ public static void setDelayScan(int delayScan) {
+ DELAY_SCAN = delayScan;
+ }
+
+ public static void setDelayMutate(int delayMutate) {
+ DELAY_MUTATE = delayMutate;
+ }
+
+ public DelayedRegionServer(Configuration conf, CoordinatedStateManager cp)
+ throws IOException, InterruptedException {
+ super(conf, cp);
+ }
+
+ @Override protected RSRpcServices createRpcServices() throws IOException {
+ return new DelayedRSRpcServices(this);
+ }
+
+ /**
+ * This class injects delay for Rpc calls and after executes super methods is delay is set.
+ */
+ public static class DelayedRSRpcServices extends RSRpcServices {
+
+ DelayedRSRpcServices(HRegionServer rs) throws IOException {
+ super(rs);
+ }
+
+ @Override public ClientProtos.GetResponse get(RpcController controller,
+ ClientProtos.GetRequest request) throws ServiceException {
+ try {
+ if (doDelay) {
+ Thread.sleep(DELAY_GET);
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error("Sleep interrupted during get operation", e);
+ }
+ return super.get(controller, request);
+ }
+
+ @Override public ClientProtos.MutateResponse mutate(RpcController rpcc,
+ ClientProtos.MutateRequest request) throws ServiceException {
+ try {
+ if (doDelay) {
+ Thread.sleep(DELAY_MUTATE);
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error("Sleep interrupted during mutate operation", e);
+ }
+ return super.mutate(rpcc, request);
+ }
+
+ @Override public ClientProtos.ScanResponse scan(RpcController controller,
+ ClientProtos.ScanRequest request) throws ServiceException {
+ try {
+ if (doDelay) {
+ Thread.sleep(DELAY_SCAN);
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error("Sleep interrupted during scan operation", e);
+ }
+ return super.scan(controller, request);
+ }
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index c98862d..755f127 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@ -64,6 +64,7 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
QueryPlan plan) throws SQLException {
final PhoenixConnection clonedConnection = new PhoenixConnection(this.connection);
+ connection.addChildConnection(clonedConnection);
try {
MutationState state = mutate(parentContext, iterator, clonedConnection);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 3b4eeb0..055aa1f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -513,6 +513,9 @@ public enum SQLExceptionCode {
info.getMutationSizeBytes());
}
}),
+ NEW_INTERNAL_CONNECTION_THROTTLED(731, "410M1", "Could not create connection " +
+ "because the internal connections already has the maximum number" +
+ " of connections to the target cluster."),
INSUFFICIENT_MEMORY(999, "50M01", "Unable to allocate enough memory."),
HASH_JOIN_CACHE_NOT_FOUND(900, "HJ01", "Hash Join cache not found"),
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index d302bcf..7cb9982 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.jdbc;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Collections.emptyMap;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER;
@@ -53,6 +54,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
@@ -172,6 +174,13 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
private LogLevel logLevel;
private Double logSamplingRate;
+ private Object queueCreationLock = new Object(); // lock for the lazy init path of childConnections structure
+ private ConcurrentLinkedQueue<PhoenixConnection> childConnections = null;
+
+ //For now just the copy constructor paths will have this as true as I don't want to change the
+ //public interfaces.
+ private final boolean isInternalConnection;
+
static {
Tracing.addTraceMetricsSource();
}
@@ -188,7 +197,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
this(connection.getQueryServices(), connection.getURL(), connection
.getClientInfo(), connection.metaData, connection
.getMutationState(), isDescRowKeyOrderUpgrade,
- isRunningUpgrade, connection.buildingIndex);
+ isRunningUpgrade, connection.buildingIndex, true);
this.isAutoCommit = connection.isAutoCommit;
this.isAutoFlush = connection.isAutoFlush;
this.sampler = connection.sampler;
@@ -205,7 +214,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
this(connection.getQueryServices(), connection.getURL(), connection
.getClientInfo(), connection.getMetaDataCache(), mutationState,
connection.isDescVarLengthRowKeyUpgrade(), connection
- .isRunningUpgrade(), connection.buildingIndex);
+ .isRunningUpgrade(), connection.buildingIndex, true);
}
public PhoenixConnection(PhoenixConnection connection, long scn)
@@ -216,7 +225,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
public PhoenixConnection(PhoenixConnection connection, Properties props) throws SQLException {
this(connection.getQueryServices(), connection.getURL(), props, connection.metaData, connection
.getMutationState(), connection.isDescVarLengthRowKeyUpgrade(),
- connection.isRunningUpgrade(), connection.buildingIndex);
+ connection.isRunningUpgrade(), connection.buildingIndex, true);
this.isAutoCommit = connection.isAutoCommit;
this.isAutoFlush = connection.isAutoFlush;
this.sampler = connection.sampler;
@@ -225,7 +234,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
public PhoenixConnection(ConnectionQueryServices services, String url,
Properties info, PMetaData metaData) throws SQLException {
- this(services, url, info, metaData, null, false, false, false);
+ this(services, url, info, metaData, null, false, false, false, false);
}
public PhoenixConnection(PhoenixConnection connection,
@@ -233,16 +242,17 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
throws SQLException {
this(services, connection.url, info, connection.metaData, null,
connection.isDescVarLengthRowKeyUpgrade(), connection
- .isRunningUpgrade(), connection.buildingIndex);
+ .isRunningUpgrade(), connection.buildingIndex, true);
}
private PhoenixConnection(ConnectionQueryServices services, String url,
Properties info, PMetaData metaData, MutationState mutationState,
boolean isDescVarLengthRowKeyUpgrade, boolean isRunningUpgrade,
- boolean buildingIndex) throws SQLException {
+ boolean buildingIndex, boolean isInternalConnection) throws SQLException {
GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.increment();
this.url = url;
this.isDescVarLengthRowKeyUpgrade = isDescVarLengthRowKeyUpgrade;
+ this.isInternalConnection = isInternalConnection;
// Filter user provided properties based on property policy, if
// provided and QueryServices.PROPERTY_POLICY_PROVIDER_ENABLED is true
@@ -388,7 +398,11 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
this.logSamplingRate = Double.parseDouble(this.services.getProps().get(QueryServices.LOG_SAMPLE_RATE,
QueryServicesOptions.DEFAULT_LOG_SAMPLE_RATE));
- GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment();
+ if(isInternalConnection) {
+ GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.increment();
+ } else {
+ GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment();
+ }
}
private static void checkScn(Long scnParam) throws SQLException {
@@ -441,6 +455,26 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
return result.build();
}
+ public boolean isInternalConnection() {
+ return isInternalConnection;
+ }
+
+ /**
+ * This method, and *only* this method is thread safe
+ * @param connection
+ */
+ public void addChildConnection(PhoenixConnection connection) {
+ //double check for performance
+ if(childConnections == null) {
+ synchronized (queueCreationLock) {
+ if (childConnections == null) {
+ childConnections = new ConcurrentLinkedQueue<>();
+ }
+ }
+ }
+ childConnections.add(connection);
+ }
+
public Sampler<?> getSampler() {
return this.sampler;
}
@@ -655,13 +689,22 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
traceScope.close();
}
closeStatements();
+ synchronized (queueCreationLock) {
+ if (childConnections != null) {
+ SQLCloseables.closeAllQuietly(childConnections);
+ }
+ }
} finally {
services.removeConnection(this);
}
} finally {
isClosed = true;
- GLOBAL_OPEN_PHOENIX_CONNECTIONS.decrement();
+ if(isInternalConnection()){
+ GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.decrement();
+ } else {
+ GLOBAL_OPEN_PHOENIX_CONNECTIONS.decrement();
+ }
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
index f41ab52..ce73a79 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
@@ -26,6 +26,7 @@ import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.QUERY_SERVICES_COUNTER;
@@ -93,6 +94,7 @@ public enum GlobalClientMetrics {
GLOBAL_FAILED_QUERY_COUNTER(QUERY_FAILED_COUNTER),
GLOBAL_SPOOL_FILE_COUNTER(SPOOL_FILE_COUNTER),
GLOBAL_OPEN_PHOENIX_CONNECTIONS(OPEN_PHOENIX_CONNECTIONS_COUNTER),
+ GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS(OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER),
GLOBAL_QUERY_SERVICES_COUNTER(QUERY_SERVICES_COUNTER),
GLOBAL_HCONNECTIONS_COUNTER(HCONNECTIONS_COUNTER),
GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER(PHOENIX_CONNECTIONS_THROTTLED_COUNTER),
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index 8e1de66..41d9e19 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -57,6 +57,7 @@ public enum MetricType {
WALL_CLOCK_TIME_MS("tq", "Wall clock time elapsed for the overall query execution",LogLevel.INFO, PLong.INSTANCE),
RESULT_SET_TIME_MS("tn", "Wall clock time elapsed for reading all records using resultSet.next()",LogLevel.INFO, PLong.INSTANCE),
OPEN_PHOENIX_CONNECTIONS_COUNTER("o", "Number of open phoenix connections",LogLevel.OFF, PLong.INSTANCE),
+ OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER("io", "Number of open internal phoenix connections",LogLevel.OFF, PLong.INSTANCE),
QUERY_SERVICES_COUNTER("cqs", "Number of ConnectionQueryServicesImpl instantiated",LogLevel.OFF, PLong.INSTANCE),
HCONNECTIONS_COUNTER("h", "Number of HConnections created by phoenix driver",LogLevel.OFF, PLong.INSTANCE),
PHOENIX_CONNECTIONS_THROTTLED_COUNTER("ct", "Number of client Phoenix connections prevented from opening " +
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index b708b75..c504df4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -319,6 +319,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
@GuardedBy("connectionCountLock")
private int connectionCount = 0;
+
+ @GuardedBy("connectionCountLock")
+ private int internalConnectionCount = 0;
+
private final Object connectionCountLock = new Object();
private final boolean returnSequenceValues ;
@@ -350,6 +354,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private final boolean isAutoUpgradeEnabled;
private final AtomicBoolean upgradeRequired = new AtomicBoolean(false);
private final int maxConnectionsAllowed;
+ private final int maxInternalConnectionsAllowed;
private final boolean shouldThrottleNumConnections;
public static final byte[] MUTEX_LOCKED = "MUTEX_LOCKED".getBytes();
@@ -436,7 +441,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
this.isAutoUpgradeEnabled = config.getBoolean(AUTO_UPGRADE_ENABLED, QueryServicesOptions.DEFAULT_AUTO_UPGRADE_ENABLED);
this.maxConnectionsAllowed = config.getInt(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS,
QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS);
- this.shouldThrottleNumConnections = (maxConnectionsAllowed > 0);
+ this.maxInternalConnectionsAllowed = config.getInt(QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS,
+ QueryServicesOptions.DEFAULT_INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS);
+ this.shouldThrottleNumConnections = (maxConnectionsAllowed > 0) || (maxInternalConnectionsAllowed > 0);
if (!QueryUtil.isServerConnection(props)) {
//Start queryDistruptor everytime as log level can be change at connection level as well, but we can avoid starting for server connections.
try {
@@ -4957,12 +4964,30 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
public void addConnection(PhoenixConnection connection) throws SQLException {
if (returnSequenceValues || shouldThrottleNumConnections) {
synchronized (connectionCountLock) {
- if (shouldThrottleNumConnections && connectionCount + 1 > maxConnectionsAllowed){
- GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment();
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_CONNECTION_THROTTLED).
- build().buildException();
+
+ /*
+ * If we are throttling connections internal connections and client created connections
+ * are counted separately against each respective quota.
+ */
+ if(shouldThrottleNumConnections) {
+ int futureConnections = 1 + ( connection.isInternalConnection() ? internalConnectionCount : connectionCount);
+ int allowedConnections = connection.isInternalConnection() ? maxInternalConnectionsAllowed : maxConnectionsAllowed;
+ if(allowedConnections != 0 && futureConnections > allowedConnections) {
+ GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment();
+ if(connection.isInternalConnection()) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_INTERNAL_CONNECTION_THROTTLED).
+ build().buildException();
+ }
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_CONNECTION_THROTTLED).
+ build().buildException();
+ }
+ }
+
+ if(!connection.isInternalConnection()) {
+ connectionCount++;
+ } else {
+ internalConnectionCount++;
}
- connectionCount++;
}
}
// If lease renewal isn't enabled, these are never cleaned up. Tracking when renewals
@@ -4977,15 +5002,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (returnSequenceValues) {
ConcurrentMap<SequenceKey,Sequence> formerSequenceMap = null;
synchronized (connectionCountLock) {
- if (--connectionCount <= 0) {
- if (!this.sequenceMap.isEmpty()) {
- formerSequenceMap = this.sequenceMap;
- this.sequenceMap = Maps.newConcurrentMap();
+ if(!connection.isInternalConnection()) {
+ if (connectionCount + internalConnectionCount - 1 <= 0) {
+ if (!this.sequenceMap.isEmpty()) {
+ formerSequenceMap = this.sequenceMap;
+ this.sequenceMap = Maps.newConcurrentMap();
+ }
}
}
- if (connectionCount < 0) {
- connectionCount = 0;
- }
}
// Since we're using the former sequenceMap, we can do this outside
// the lock.
@@ -4993,9 +5017,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// When there are no more connections, attempt to return any sequences
returnAllSequences(formerSequenceMap);
}
- } else if (shouldThrottleNumConnections){ //still need to decrement connection count
+ }
+ if (returnSequenceValues || shouldThrottleNumConnections){ //still need to decrement connection count
synchronized (connectionCountLock) {
- if (connectionCount > 0) {
+ if(connection.isInternalConnection() && internalConnectionCount > 0) {
+ --internalConnectionCount;
+ } else if (connectionCount > 0) {
--connectionCount;
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 34af089..77adc4f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -304,6 +304,9 @@ public interface QueryServices extends SQLCloseable {
//max number of connections from a single client to a single cluster. 0 is unlimited.
public static final String CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS =
"phoenix.client.connection.max.allowed.connections";
+ //max number of connections from a single client to a single cluster. 0 is unlimited.
+ public static final String INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS =
+ "phoenix.internal.connection.max.allowed.connections";
public static final String DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB = "phoenix.default.column.encoded.bytes.attrib";
public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.immutable.storage.scheme";
public static final String DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.multitenant.immutable.storage.scheme";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 4e62e25..cfa0aec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -343,6 +343,8 @@ public class QueryServicesOptions {
//by default, max connections from one client to one cluster is unlimited
public static final int DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS = 0;
+ //by default, max internal connections from one client to one cluster is unlimited
+ public static final int DEFAULT_INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS = 0;
public static final boolean DEFAULT_STATS_COLLECTION_ENABLED = true;
public static final boolean DEFAULT_USE_STATS_FOR_PARALLELIZATION = true;