You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2021/01/31 15:26:01 UTC
[phoenix] 01/02: PHOENIX-5872 Close Internal Phoenix Connections
that were running during cancel
This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit e8bf61ad4005ccd0aefbe234a38959897ce535c5
Author: Daniel Wong <da...@salesforce.com>
AuthorDate: Mon Apr 27 03:38:54 2020 -0700
PHOENIX-5872 Close Internal Phoenix Connections that were running during cancel
Signed-off-by: Xinyi Yan <ya...@apache.org>
---
phoenix-core/pom.xml | 4 +
.../phoenix/query/MaxConcurrentConnectionsIT.java | 132 +++++++++++++++++++++
.../apache/phoenix/util/DelayedRegionServer.java | 124 +++++++++++++++++++
.../compile/MutatingParallelIteratorFactory.java | 1 +
.../apache/phoenix/exception/SQLExceptionCode.java | 3 +
.../org/apache/phoenix/jdbc/PhoenixConnection.java | 59 +++++++--
.../phoenix/monitoring/GlobalClientMetrics.java | 2 +
.../org/apache/phoenix/monitoring/MetricType.java | 1 +
.../phoenix/query/ConnectionQueryServicesImpl.java | 57 ++++++---
.../org/apache/phoenix/query/QueryServices.java | 3 +
.../apache/phoenix/query/QueryServicesOptions.java | 2 +
11 files changed, 365 insertions(+), 23 deletions(-)
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 33a8013..7b91753 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -318,6 +318,10 @@
<groupId>org.apache.hbase.thirdparty</groupId>
<artifactId>hbase-shaded-miscellaneous</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase.thirdparty</groupId>
+ <artifactId>hbase-shaded-protobuf</artifactId>
+ </dependency>
<!-- HBase test dependencies -->
<dependency>
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
new file mode 100644
index 0000000..7da276c
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.query;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.util.DelayedRegionServer;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.phoenix.exception.SQLExceptionCode.NEW_CONNECTION_THROTTLED;
+import static org.apache.phoenix.exception.SQLExceptionCode.NEW_INTERNAL_CONNECTION_THROTTLED;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
+import static org.apache.phoenix.query.QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS;
+import static org.apache.phoenix.query.QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS;
+import static org.apache.phoenix.query.QueryServices.RENEW_LEASE_ENABLED;
+import static org.apache.phoenix.query.QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Note that some tests for concurrentConnections live in PhoenixMetricsIT.java which also test the metric emission
+ */
+public class MaxConcurrentConnectionsIT extends BaseUniqueNamesOwnClusterIT {
+
+ private static HBaseTestingUtility hbaseTestUtil;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ hbaseTestUtil = new HBaseTestingUtility();
+
+ hbaseTestUtil.startMiniCluster(1,1,null,null,DelayedRegionServer.class);
+ // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
+ String zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
+ url = PhoenixRuntime.JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkQuorum +
+ JDBC_PROTOCOL_SEPARATOR + "uniqueConn=A";
+ DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+ }
+
+ private String getUniqueUrl() {
+ return url + generateUniqueName();
+ }
+
+ //Have to shutdown our special delayed region server
+ @AfterClass
+ public static void tearDown() throws Exception {
+ hbaseTestUtil.shutdownMiniCluster();
+ }
+
+ /**
+ * This tests the delete path which creates a internal phoenix connection per region
+ * @throws Exception
+ */
+ @Test
+ public void testDeleteRuntimeFailureClosesConnections() throws Exception {
+ String tableName = generateUniqueName();
+ String connectionUrl = getUniqueUrl();
+ //table with lots of regions
+ String ddl = "create table " + tableName + " (i integer not null primary key, j integer) SALT_BUCKETS=256 ";
+
+ Properties props = new Properties();
+ props.setProperty(CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS,String.valueOf(10));
+ props.setProperty(INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS,String.valueOf(10));
+
+ //delay any task handeling as that causes additional connections
+ props.setProperty(TASK_HANDLING_INTERVAL_MS_ATTRIB,String.valueOf(600000));
+ props.setProperty(TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,String.valueOf(600000));
+
+ String deleteStmt = "DELETE FROM " + tableName + " WHERE 20 = j";
+
+ try(Connection conn = DriverManager.getConnection(connectionUrl, props); Statement statement = conn.createStatement()) {
+ statement.execute(ddl);
+ }
+
+ assertEquals(0, GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue());
+ assertEquals(0, GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.getMetric().getValue());
+ Connection conn = null;
+ try {
+ conn = DriverManager.getConnection(connectionUrl, props);
+ //Enable delay for the delete
+ DelayedRegionServer.setDelayEnabled(true);
+ try (Statement statement = conn.createStatement()) {
+ statement.execute(deleteStmt);
+ }
+ fail();
+ } catch (SQLException e) {
+ assertEquals(NEW_INTERNAL_CONNECTION_THROTTLED.getErrorCode(), e.getErrorCode());
+ assertEquals(NEW_INTERNAL_CONNECTION_THROTTLED.getSQLState(), e.getSQLState());
+ } finally {
+ DelayedRegionServer.setDelayEnabled(false);
+ if (conn != null) {
+ conn.close();
+ }
+ long connections = GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue();
+ assertEquals(String.format("Found %d connections still open.", connections),0,connections);
+ connections = GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.getMetric().getValue();
+ assertEquals(String.format("Found %d internal connections still open.", connections),0 ,connections);
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/util/DelayedRegionServer.java b/phoenix-core/src/it/java/org/apache/phoenix/util/DelayedRegionServer.java
new file mode 100644
index 0000000..95bf3f4
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/util/DelayedRegionServer.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * This is a extended MiniHbaseCluster Region Server whcih allows developer/tester to inject
+ * delay into specific server side operations for testing.
+ */
+public class DelayedRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DelayedRegionServer.class);
+
+ static boolean doDelay = false;
+ // Activate the delays after table creation to test get/scan/put
+ private static int DELAY_GET = 0;
+ private static int DELAY_SCAN = 30000;
+ private static int DELAY_MUTATE = 0;
+
+ public static void setDelayEnabled(boolean delay) {
+ doDelay = delay;
+ }
+
+ public static void setDelayGet(int delayGet) {
+ DELAY_GET = delayGet;
+ }
+
+ public static void setDelayScan(int delayScan) {
+ DELAY_SCAN = delayScan;
+ }
+
+ public static void setDelayMutate(int delayMutate) {
+ DELAY_MUTATE = delayMutate;
+ }
+
+ public DelayedRegionServer(Configuration conf)
+ throws IOException, InterruptedException {
+ super(conf);
+ }
+
+ @Override protected RSRpcServices createRpcServices() throws IOException {
+ return new DelayedRSRpcServices(this);
+ }
+
+ /**
+ * This class injects delay for Rpc calls and after executes super methods is delay is set.
+ */
+ public static class DelayedRSRpcServices extends RSRpcServices {
+
+ DelayedRSRpcServices(HRegionServer rs) throws IOException {
+ super(rs);
+ }
+
+ @Override public GetResponse get(final RpcController controller,
+ final GetRequest request) throws ServiceException {
+ try {
+ if (doDelay) {
+ Thread.sleep(DELAY_GET);
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error("Sleep interrupted during get operation", e);
+ }
+ return super.get(controller, request);
+ }
+
+ @Override public MutateResponse mutate(final RpcController rpcc,
+ final MutateRequest request) throws ServiceException {
+ try {
+ if (doDelay) {
+ Thread.sleep(DELAY_MUTATE);
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error("Sleep interrupted during mutate operation", e);
+ }
+ return super.mutate(rpcc, request);
+ }
+
+ @Override public ScanResponse scan(final RpcController controller,
+ ScanRequest request) throws ServiceException {
+ try {
+ if (doDelay) {
+ Thread.sleep(DELAY_SCAN);
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error("Sleep interrupted during scan operation", e);
+ }
+ return super.scan(controller, request);
+ }
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index 4aaa72d..7113867 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@ -66,6 +66,7 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
QueryPlan plan) throws SQLException {
final PhoenixConnection clonedConnection = new PhoenixConnection(this.connection);
+ connection.addChildConnection(clonedConnection);
try {
MutationState state = mutate(parentContext, iterator, clonedConnection);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index bb6d629..17157db 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -524,6 +524,9 @@ public enum SQLExceptionCode {
info.getMutationSizeBytes());
}
}),
+ NEW_INTERNAL_CONNECTION_THROTTLED(731, "410M1", "Could not create connection " +
+ "because the internal connections already has the maximum number" +
+ " of connections to the target cluster."),
MAX_HBASE_CLIENT_KEYVALUE_MAXSIZE_EXCEEDED(732,
"LIM03", "The Phoenix Column size is bigger than maximum " +
"HBase client key value allowed size for ONE_CELL_PER_COLUMN table, " +
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 25f0f3e..ae47e7d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.jdbc;
import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkNotNull;
import static java.util.Collections.emptyMap;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER;
@@ -53,6 +54,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
@@ -173,6 +175,13 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
private Double logSamplingRate;
private String sourceOfOperation;
+ private Object queueCreationLock = new Object(); // lock for the lazy init path of childConnections structure
+ private ConcurrentLinkedQueue<PhoenixConnection> childConnections = null;
+
+ //For now just the copy constructor paths will have this as true as I don't want to change the
+ //public interfaces.
+ private final boolean isInternalConnection;
+
static {
Tracing.addTraceMetricsSource();
}
@@ -189,7 +198,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
this(connection.getQueryServices(), connection.getURL(), connection
.getClientInfo(), connection.metaData, connection
.getMutationState(), isDescRowKeyOrderUpgrade,
- isRunningUpgrade, connection.buildingIndex);
+ isRunningUpgrade, connection.buildingIndex, true);
this.isAutoCommit = connection.isAutoCommit;
this.isAutoFlush = connection.isAutoFlush;
this.sampler = connection.sampler;
@@ -206,7 +215,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
this(connection.getQueryServices(), connection.getURL(), connection
.getClientInfo(), connection.getMetaDataCache(), mutationState,
connection.isDescVarLengthRowKeyUpgrade(), connection
- .isRunningUpgrade(), connection.buildingIndex);
+ .isRunningUpgrade(), connection.buildingIndex, true);
}
public PhoenixConnection(PhoenixConnection connection, long scn)
@@ -217,7 +226,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
public PhoenixConnection(PhoenixConnection connection, Properties props) throws SQLException {
this(connection.getQueryServices(), connection.getURL(), props, connection.metaData, connection
.getMutationState(), connection.isDescVarLengthRowKeyUpgrade(),
- connection.isRunningUpgrade(), connection.buildingIndex);
+ connection.isRunningUpgrade(), connection.buildingIndex, true);
this.isAutoCommit = connection.isAutoCommit;
this.isAutoFlush = connection.isAutoFlush;
this.sampler = connection.sampler;
@@ -226,7 +235,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
public PhoenixConnection(ConnectionQueryServices services, String url,
Properties info, PMetaData metaData) throws SQLException {
- this(services, url, info, metaData, null, false, false, false);
+ this(services, url, info, metaData, null, false, false, false, false);
}
public PhoenixConnection(PhoenixConnection connection,
@@ -234,16 +243,17 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
throws SQLException {
this(services, connection.url, info, connection.metaData, null,
connection.isDescVarLengthRowKeyUpgrade(), connection
- .isRunningUpgrade(), connection.buildingIndex);
+ .isRunningUpgrade(), connection.buildingIndex, true);
}
private PhoenixConnection(ConnectionQueryServices services, String url,
Properties info, PMetaData metaData, MutationState mutationState,
boolean isDescVarLengthRowKeyUpgrade, boolean isRunningUpgrade,
- boolean buildingIndex) throws SQLException {
+ boolean buildingIndex, boolean isInternalConnection) throws SQLException {
GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.increment();
this.url = url;
this.isDescVarLengthRowKeyUpgrade = isDescVarLengthRowKeyUpgrade;
+ this.isInternalConnection = isInternalConnection;
// Filter user provided properties based on property policy, if
// provided and QueryServices.PROPERTY_POLICY_PROVIDER_ENABLED is true
@@ -389,9 +399,13 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
this.logSamplingRate = Double.parseDouble(this.services.getProps().get(QueryServices.LOG_SAMPLE_RATE,
QueryServicesOptions.DEFAULT_LOG_SAMPLE_RATE));
+ if(isInternalConnection) {
+ GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.increment();
+ } else {
+ GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment();
+ }
this.sourceOfOperation =
this.services.getProps().get(QueryServices.SOURCE_OPERATION_ATTRIB, null);
- GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment();
}
private static void checkScn(Long scnParam) throws SQLException {
@@ -444,6 +458,26 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
return result.build();
}
+ public boolean isInternalConnection() {
+ return isInternalConnection;
+ }
+
+ /**
+ * This method, and *only* this method is thread safe
+ * @param connection
+ */
+ public void addChildConnection(PhoenixConnection connection) {
+ //double check for performance
+ if(childConnections == null) {
+ synchronized (queueCreationLock) {
+ if (childConnections == null) {
+ childConnections = new ConcurrentLinkedQueue<>();
+ }
+ }
+ }
+ childConnections.add(connection);
+ }
+
public Sampler<?> getSampler() {
return this.sampler;
}
@@ -658,13 +692,22 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
traceScope.close();
}
closeStatements();
+ synchronized (queueCreationLock) {
+ if (childConnections != null) {
+ SQLCloseables.closeAllQuietly(childConnections);
+ }
+ }
} finally {
services.removeConnection(this);
}
} finally {
isClosed = true;
- GLOBAL_OPEN_PHOENIX_CONNECTIONS.decrement();
+ if(isInternalConnection()){
+ GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.decrement();
+ } else {
+ GLOBAL_OPEN_PHOENIX_CONNECTIONS.decrement();
+ }
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
index 9ebc1af..dce01d5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
@@ -26,6 +26,7 @@ import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.INDEX_COMMIT_FAILURE_SIZE;
import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
@@ -99,6 +100,7 @@ public enum GlobalClientMetrics {
GLOBAL_FAILED_QUERY_COUNTER(QUERY_FAILED_COUNTER),
GLOBAL_SPOOL_FILE_COUNTER(SPOOL_FILE_COUNTER),
GLOBAL_OPEN_PHOENIX_CONNECTIONS(OPEN_PHOENIX_CONNECTIONS_COUNTER),
+ GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS(OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER),
GLOBAL_QUERY_SERVICES_COUNTER(QUERY_SERVICES_COUNTER),
GLOBAL_HCONNECTIONS_COUNTER(HCONNECTIONS_COUNTER),
GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER(PHOENIX_CONNECTIONS_THROTTLED_COUNTER),
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index 1396a89..40fcad0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -58,6 +58,7 @@ public enum MetricType {
WALL_CLOCK_TIME_MS("tq", "Wall clock time elapsed for the overall query execution",LogLevel.INFO, PLong.INSTANCE),
RESULT_SET_TIME_MS("tn", "Wall clock time elapsed for reading all records using resultSet.next()",LogLevel.INFO, PLong.INSTANCE),
OPEN_PHOENIX_CONNECTIONS_COUNTER("o", "Number of open phoenix connections",LogLevel.OFF, PLong.INSTANCE),
+ OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER("io", "Number of open internal phoenix connections",LogLevel.OFF, PLong.INSTANCE),
QUERY_SERVICES_COUNTER("cqs", "Number of ConnectionQueryServicesImpl instantiated",LogLevel.OFF, PLong.INSTANCE),
HCONNECTIONS_COUNTER("h", "Number of HConnections created by phoenix driver",LogLevel.OFF, PLong.INSTANCE),
PHOENIX_CONNECTIONS_THROTTLED_COUNTER("ct", "Number of client Phoenix connections prevented from opening " +
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 597ae27..585832e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -339,6 +339,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
@GuardedBy("connectionCountLock")
private int connectionCount = 0;
+
+ @GuardedBy("connectionCountLock")
+ private int internalConnectionCount = 0;
+
private final Object connectionCountLock = new Object();
private final boolean returnSequenceValues ;
@@ -370,6 +374,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private final boolean isAutoUpgradeEnabled;
private final AtomicBoolean upgradeRequired = new AtomicBoolean(false);
private final int maxConnectionsAllowed;
+ private final int maxInternalConnectionsAllowed;
private final boolean shouldThrottleNumConnections;
public static final byte[] MUTEX_LOCKED = "MUTEX_LOCKED".getBytes();
@@ -455,7 +460,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
this.isAutoUpgradeEnabled = config.getBoolean(AUTO_UPGRADE_ENABLED, QueryServicesOptions.DEFAULT_AUTO_UPGRADE_ENABLED);
this.maxConnectionsAllowed = config.getInt(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS,
QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS);
- this.shouldThrottleNumConnections = (maxConnectionsAllowed > 0);
+ this.maxInternalConnectionsAllowed = config.getInt(QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS,
+ QueryServicesOptions.DEFAULT_INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS);
+ this.shouldThrottleNumConnections = (maxConnectionsAllowed > 0) || (maxInternalConnectionsAllowed > 0);
if (!QueryUtil.isServerConnection(props)) {
//Start queryDistruptor everytime as log level can be change at connection level as well, but we can avoid starting for server connections.
try {
@@ -5221,12 +5228,30 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
public void addConnection(PhoenixConnection connection) throws SQLException {
if (returnSequenceValues || shouldThrottleNumConnections) {
synchronized (connectionCountLock) {
- if (shouldThrottleNumConnections && connectionCount + 1 > maxConnectionsAllowed){
- GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment();
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_CONNECTION_THROTTLED).
- build().buildException();
+
+ /*
+ * If we are throttling connections internal connections and client created connections
+ * are counted separately against each respective quota.
+ */
+ if(shouldThrottleNumConnections) {
+ int futureConnections = 1 + ( connection.isInternalConnection() ? internalConnectionCount : connectionCount);
+ int allowedConnections = connection.isInternalConnection() ? maxInternalConnectionsAllowed : maxConnectionsAllowed;
+ if(allowedConnections != 0 && futureConnections > allowedConnections) {
+ GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment();
+ if(connection.isInternalConnection()) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_INTERNAL_CONNECTION_THROTTLED).
+ build().buildException();
+ }
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_CONNECTION_THROTTLED).
+ build().buildException();
+ }
+ }
+
+ if(!connection.isInternalConnection()) {
+ connectionCount++;
+ } else {
+ internalConnectionCount++;
}
- connectionCount++;
}
}
// If lease renewal isn't enabled, these are never cleaned up. Tracking when renewals
@@ -5241,15 +5266,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (returnSequenceValues) {
ConcurrentMap<SequenceKey,Sequence> formerSequenceMap = null;
synchronized (connectionCountLock) {
- if (--connectionCount <= 0) {
- if (!this.sequenceMap.isEmpty()) {
- formerSequenceMap = this.sequenceMap;
- this.sequenceMap = Maps.newConcurrentMap();
+ if(!connection.isInternalConnection()) {
+ if (connectionCount + internalConnectionCount - 1 <= 0) {
+ if (!this.sequenceMap.isEmpty()) {
+ formerSequenceMap = this.sequenceMap;
+ this.sequenceMap = Maps.newConcurrentMap();
+ }
}
}
- if (connectionCount < 0) {
- connectionCount = 0;
- }
}
// Since we're using the former sequenceMap, we can do this outside
// the lock.
@@ -5257,9 +5281,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// When there are no more connections, attempt to return any sequences
returnAllSequences(formerSequenceMap);
}
- } else if (shouldThrottleNumConnections){ //still need to decrement connection count
+ }
+ if (returnSequenceValues || shouldThrottleNumConnections){ //still need to decrement connection count
synchronized (connectionCountLock) {
- if (connectionCount > 0) {
+ if(connection.isInternalConnection() && internalConnectionCount > 0) {
+ --internalConnectionCount;
+ } else if (connectionCount > 0) {
--connectionCount;
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index c869f2b..ac9a396 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -280,6 +280,9 @@ public interface QueryServices extends SQLCloseable {
//max number of connections from a single client to a single cluster. 0 is unlimited.
public static final String CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS =
"phoenix.client.connection.max.allowed.connections";
+ //max number of connections from a single client to a single cluster. 0 is unlimited.
+ public static final String INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS =
+ "phoenix.internal.connection.max.allowed.connections";
public static final String DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB = "phoenix.default.column.encoded.bytes.attrib";
public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.immutable.storage.scheme";
public static final String DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.multitenant.immutable.storage.scheme";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 7bc34e6..85f932b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -320,6 +320,8 @@ public class QueryServicesOptions {
//by default, max connections from one client to one cluster is unlimited
public static final int DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS = 0;
+ //by default, max internal connections from one client to one cluster is unlimited
+ public static final int DEFAULT_INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS = 0;
public static final boolean DEFAULT_STATS_COLLECTION_ENABLED = true;
public static final boolean DEFAULT_USE_STATS_FOR_PARALLELIZATION = true;