You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2021/01/31 15:26:01 UTC

[phoenix] 01/02: PHOENIX-5872 Close Internal Phoenix Connections that were running during cancel

This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit e8bf61ad4005ccd0aefbe234a38959897ce535c5
Author: Daniel Wong <da...@salesforce.com>
AuthorDate: Mon Apr 27 03:38:54 2020 -0700

    PHOENIX-5872 Close Internal Phoenix Connections that were running during cancel
    
    Signed-off-by: Xinyi Yan <ya...@apache.org>
---
 phoenix-core/pom.xml                               |   4 +
 .../phoenix/query/MaxConcurrentConnectionsIT.java  | 132 +++++++++++++++++++++
 .../apache/phoenix/util/DelayedRegionServer.java   | 124 +++++++++++++++++++
 .../compile/MutatingParallelIteratorFactory.java   |   1 +
 .../apache/phoenix/exception/SQLExceptionCode.java |   3 +
 .../org/apache/phoenix/jdbc/PhoenixConnection.java |  59 +++++++--
 .../phoenix/monitoring/GlobalClientMetrics.java    |   2 +
 .../org/apache/phoenix/monitoring/MetricType.java  |   1 +
 .../phoenix/query/ConnectionQueryServicesImpl.java |  57 ++++++---
 .../org/apache/phoenix/query/QueryServices.java    |   3 +
 .../apache/phoenix/query/QueryServicesOptions.java |   2 +
 11 files changed, 365 insertions(+), 23 deletions(-)

diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 33a8013..7b91753 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -318,6 +318,10 @@
       <groupId>org.apache.hbase.thirdparty</groupId>
       <artifactId>hbase-shaded-miscellaneous</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hbase.thirdparty</groupId>
+      <artifactId>hbase-shaded-protobuf</artifactId>
+    </dependency>
 
     <!-- HBase test dependencies -->
     <dependency>
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
new file mode 100644
index 0000000..7da276c
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.query;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.util.DelayedRegionServer;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.phoenix.exception.SQLExceptionCode.NEW_CONNECTION_THROTTLED;
+import static org.apache.phoenix.exception.SQLExceptionCode.NEW_INTERNAL_CONNECTION_THROTTLED;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
+import static org.apache.phoenix.query.QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS;
+import static org.apache.phoenix.query.QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS;
+import static org.apache.phoenix.query.QueryServices.RENEW_LEASE_ENABLED;
+import static org.apache.phoenix.query.QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Note that some tests for concurrentConnections live in PhoenixMetricsIT.java which also test the metric emission
+ */
+public class MaxConcurrentConnectionsIT extends BaseUniqueNamesOwnClusterIT {
+
+    private static HBaseTestingUtility hbaseTestUtil;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        hbaseTestUtil = new HBaseTestingUtility();
+
+        hbaseTestUtil.startMiniCluster(1,1,null,null,DelayedRegionServer.class);
+        // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
+        String zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
+        url = PhoenixRuntime.JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkQuorum +
+                JDBC_PROTOCOL_SEPARATOR + "uniqueConn=A";
+        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+    }
+
+    private String getUniqueUrl() {
+        return url + generateUniqueName();
+    }
+
+    //Have to shutdown our special delayed region server
+    @AfterClass
+    public static void tearDown() throws Exception {
+        hbaseTestUtil.shutdownMiniCluster();
+    }
+
+    /**
+     * This tests the delete path which creates a internal phoenix connection per region
+     * @throws Exception
+     */
+    @Test
+    public void testDeleteRuntimeFailureClosesConnections() throws Exception {
+        String tableName = generateUniqueName();
+        String connectionUrl = getUniqueUrl();
+        //table with lots of regions
+        String ddl = "create table " + tableName +  "  (i integer not null primary key, j integer) SALT_BUCKETS=256 ";
+
+        Properties props = new Properties();
+        props.setProperty(CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS,String.valueOf(10));
+        props.setProperty(INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS,String.valueOf(10));
+
+        //delay any task handeling as that causes additional connections
+        props.setProperty(TASK_HANDLING_INTERVAL_MS_ATTRIB,String.valueOf(600000));
+        props.setProperty(TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,String.valueOf(600000));
+
+        String deleteStmt = "DELETE FROM " + tableName + " WHERE 20 = j";
+
+        try(Connection conn = DriverManager.getConnection(connectionUrl, props); Statement statement = conn.createStatement()) {
+            statement.execute(ddl);
+        }
+
+        assertEquals(0, GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue());
+        assertEquals(0, GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.getMetric().getValue());
+        Connection conn = null;
+        try {
+            conn = DriverManager.getConnection(connectionUrl, props);
+            //Enable delay for the delete
+            DelayedRegionServer.setDelayEnabled(true);
+            try (Statement statement = conn.createStatement()) {
+                statement.execute(deleteStmt);
+            }
+            fail();
+        } catch (SQLException e) {
+            assertEquals(NEW_INTERNAL_CONNECTION_THROTTLED.getErrorCode(), e.getErrorCode());
+            assertEquals(NEW_INTERNAL_CONNECTION_THROTTLED.getSQLState(), e.getSQLState());
+        } finally {
+            DelayedRegionServer.setDelayEnabled(false);
+            if (conn != null) {
+                conn.close();
+            }
+            long connections = GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue();
+            assertEquals(String.format("Found %d connections still open.", connections),0,connections);
+            connections = GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.getMetric().getValue();
+            assertEquals(String.format("Found %d internal connections still open.", connections),0 ,connections);
+        }
+
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/util/DelayedRegionServer.java b/phoenix-core/src/it/java/org/apache/phoenix/util/DelayedRegionServer.java
new file mode 100644
index 0000000..95bf3f4
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/util/DelayedRegionServer.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * This is a extended MiniHbaseCluster Region Server whcih allows developer/tester to inject
+ * delay into specific server side operations for testing.
+ */
+public class DelayedRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(DelayedRegionServer.class);
+
+    static boolean doDelay = false;
+    // Activate the delays after table creation to test get/scan/put
+    private static int DELAY_GET = 0;
+    private static int DELAY_SCAN = 30000;
+    private static int DELAY_MUTATE = 0;
+
+    public static void setDelayEnabled(boolean delay) {
+        doDelay = delay;
+    }
+
+    public static void setDelayGet(int delayGet) {
+        DELAY_GET = delayGet;
+    }
+
+    public static void setDelayScan(int delayScan) {
+        DELAY_SCAN = delayScan;
+    }
+
+    public static void setDelayMutate(int delayMutate) {
+        DELAY_MUTATE = delayMutate;
+    }
+
+    public DelayedRegionServer(Configuration conf)
+            throws IOException, InterruptedException {
+        super(conf);
+    }
+
+    @Override protected RSRpcServices createRpcServices() throws IOException {
+        return new DelayedRSRpcServices(this);
+    }
+
+    /**
+     * This class injects delay for Rpc calls and after executes super methods is delay is set.
+     */
+    public static class DelayedRSRpcServices extends RSRpcServices {
+
+        DelayedRSRpcServices(HRegionServer rs) throws IOException {
+            super(rs);
+        }
+
+        @Override public GetResponse get(final RpcController controller,
+                final GetRequest request) throws ServiceException {
+            try {
+                if (doDelay) {
+                    Thread.sleep(DELAY_GET);
+                }
+            } catch (InterruptedException e) {
+                LOGGER.error("Sleep interrupted during get operation", e);
+            }
+            return super.get(controller, request);
+        }
+
+        @Override public MutateResponse mutate(final RpcController rpcc,
+                final MutateRequest request) throws ServiceException {
+            try {
+                if (doDelay) {
+                    Thread.sleep(DELAY_MUTATE);
+                }
+            } catch (InterruptedException e) {
+                LOGGER.error("Sleep interrupted during mutate operation", e);
+            }
+            return super.mutate(rpcc, request);
+        }
+
+        @Override public ScanResponse scan(final RpcController controller,
+                ScanRequest request) throws ServiceException {
+            try {
+                if (doDelay) {
+                    Thread.sleep(DELAY_SCAN);
+                }
+            } catch (InterruptedException e) {
+                LOGGER.error("Sleep interrupted during scan operation", e);
+            }
+            return super.scan(controller, request);
+        }
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index 4aaa72d..7113867 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@ -66,6 +66,7 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
             QueryPlan plan) throws SQLException {
 
         final PhoenixConnection clonedConnection = new PhoenixConnection(this.connection);
+        connection.addChildConnection(clonedConnection);
         try {
             MutationState state = mutate(parentContext, iterator, clonedConnection);
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index bb6d629..17157db 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -524,6 +524,9 @@ public enum SQLExceptionCode {
                     info.getMutationSizeBytes());
         }
     }),
+    NEW_INTERNAL_CONNECTION_THROTTLED(731, "410M1", "Could not create connection " +
+            "because the internal connections already has the maximum number" +
+            " of connections to the target cluster."),
     MAX_HBASE_CLIENT_KEYVALUE_MAXSIZE_EXCEEDED(732,
             "LIM03", "The Phoenix Column size is bigger than maximum " +
             "HBase client key value allowed size for ONE_CELL_PER_COLUMN table, " +
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 25f0f3e..ae47e7d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.jdbc;
 
 import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkNotNull;
 import static java.util.Collections.emptyMap;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER;
 
@@ -53,6 +54,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -173,6 +175,13 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
     private Double logSamplingRate;
     private String sourceOfOperation;
 
+    private Object queueCreationLock = new Object(); // lock for the lazy init path of childConnections structure
+    private ConcurrentLinkedQueue<PhoenixConnection> childConnections = null;
+
+    //For now just the copy constructor paths will have this as true as I don't want to change the
+    //public interfaces.
+    private final boolean isInternalConnection;
+
     static {
         Tracing.addTraceMetricsSource();
     }
@@ -189,7 +198,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
         this(connection.getQueryServices(), connection.getURL(), connection
                 .getClientInfo(), connection.metaData, connection
                 .getMutationState(), isDescRowKeyOrderUpgrade,
-                isRunningUpgrade, connection.buildingIndex);
+                isRunningUpgrade, connection.buildingIndex, true);
         this.isAutoCommit = connection.isAutoCommit;
         this.isAutoFlush = connection.isAutoFlush;
         this.sampler = connection.sampler;
@@ -206,7 +215,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
         this(connection.getQueryServices(), connection.getURL(), connection
                 .getClientInfo(), connection.getMetaDataCache(), mutationState,
                 connection.isDescVarLengthRowKeyUpgrade(), connection
-                .isRunningUpgrade(), connection.buildingIndex);
+                .isRunningUpgrade(), connection.buildingIndex, true);
     }
 
     public PhoenixConnection(PhoenixConnection connection, long scn)
@@ -217,7 +226,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
 	public PhoenixConnection(PhoenixConnection connection, Properties props) throws SQLException {
         this(connection.getQueryServices(), connection.getURL(), props, connection.metaData, connection
                 .getMutationState(), connection.isDescVarLengthRowKeyUpgrade(),
-                connection.isRunningUpgrade(), connection.buildingIndex);
+                connection.isRunningUpgrade(), connection.buildingIndex, true);
         this.isAutoCommit = connection.isAutoCommit;
         this.isAutoFlush = connection.isAutoFlush;
         this.sampler = connection.sampler;
@@ -226,7 +235,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
 
     public PhoenixConnection(ConnectionQueryServices services, String url,
             Properties info, PMetaData metaData) throws SQLException {
-        this(services, url, info, metaData, null, false, false, false);
+        this(services, url, info, metaData, null, false, false, false, false);
     }
 
     public PhoenixConnection(PhoenixConnection connection,
@@ -234,16 +243,17 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
                     throws SQLException {
         this(services, connection.url, info, connection.metaData, null,
                 connection.isDescVarLengthRowKeyUpgrade(), connection
-                .isRunningUpgrade(), connection.buildingIndex);
+                .isRunningUpgrade(), connection.buildingIndex, true);
     }
 
     private PhoenixConnection(ConnectionQueryServices services, String url,
             Properties info, PMetaData metaData, MutationState mutationState,
             boolean isDescVarLengthRowKeyUpgrade, boolean isRunningUpgrade,
-            boolean buildingIndex) throws SQLException {
+            boolean buildingIndex, boolean isInternalConnection) throws SQLException {
         GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.increment();
         this.url = url;
         this.isDescVarLengthRowKeyUpgrade = isDescVarLengthRowKeyUpgrade;
+        this.isInternalConnection = isInternalConnection;
 
         // Filter user provided properties based on property policy, if
         // provided and QueryServices.PROPERTY_POLICY_PROVIDER_ENABLED is true
@@ -389,9 +399,13 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
         
         this.logSamplingRate = Double.parseDouble(this.services.getProps().get(QueryServices.LOG_SAMPLE_RATE,
                 QueryServicesOptions.DEFAULT_LOG_SAMPLE_RATE));
+        if(isInternalConnection) {
+            GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.increment();
+        } else {
+            GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment();
+        }
         this.sourceOfOperation =
                 this.services.getProps().get(QueryServices.SOURCE_OPERATION_ATTRIB, null);
-        GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment();
     }
 
     private static void checkScn(Long scnParam) throws SQLException {
@@ -444,6 +458,26 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
         return result.build();
     }
 
+    public boolean isInternalConnection() {
+        return isInternalConnection;
+    }
+
+    /**
+     * This method, and *only* this method is thread safe
+     * @param connection
+     */
+    public void addChildConnection(PhoenixConnection connection) {
+        //double check for performance
+        if(childConnections == null) {
+            synchronized (queueCreationLock) {
+                if (childConnections == null) {
+                    childConnections = new ConcurrentLinkedQueue<>();
+                }
+            }
+        }
+        childConnections.add(connection);
+    }
+
     public Sampler<?> getSampler() {
         return this.sampler;
     }
@@ -658,13 +692,22 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
                     traceScope.close();
                 }
                 closeStatements();
+                synchronized (queueCreationLock) {
+                    if (childConnections != null) {
+                        SQLCloseables.closeAllQuietly(childConnections);
+                    }
+                }
             } finally {
                 services.removeConnection(this);
             }
             
         } finally {
             isClosed = true;
-            GLOBAL_OPEN_PHOENIX_CONNECTIONS.decrement();
+            if(isInternalConnection()){
+                GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.decrement();
+            } else {
+                GLOBAL_OPEN_PHOENIX_CONNECTIONS.decrement();
+            }
         }
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
index 9ebc1af..dce01d5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
@@ -26,6 +26,7 @@ import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.INDEX_COMMIT_FAILURE_SIZE;
 import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
@@ -99,6 +100,7 @@ public enum GlobalClientMetrics {
     GLOBAL_FAILED_QUERY_COUNTER(QUERY_FAILED_COUNTER),
     GLOBAL_SPOOL_FILE_COUNTER(SPOOL_FILE_COUNTER),
     GLOBAL_OPEN_PHOENIX_CONNECTIONS(OPEN_PHOENIX_CONNECTIONS_COUNTER),
+    GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS(OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER),
     GLOBAL_QUERY_SERVICES_COUNTER(QUERY_SERVICES_COUNTER),
     GLOBAL_HCONNECTIONS_COUNTER(HCONNECTIONS_COUNTER),
     GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER(PHOENIX_CONNECTIONS_THROTTLED_COUNTER),
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index 1396a89..40fcad0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -58,6 +58,7 @@ public enum MetricType {
     WALL_CLOCK_TIME_MS("tq", "Wall clock time elapsed for the overall query execution",LogLevel.INFO, PLong.INSTANCE),
     RESULT_SET_TIME_MS("tn", "Wall clock time elapsed for reading all records using resultSet.next()",LogLevel.INFO, PLong.INSTANCE),
     OPEN_PHOENIX_CONNECTIONS_COUNTER("o", "Number of open phoenix connections",LogLevel.OFF, PLong.INSTANCE),
+    OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER("io", "Number of open internal phoenix connections",LogLevel.OFF, PLong.INSTANCE),
     QUERY_SERVICES_COUNTER("cqs", "Number of ConnectionQueryServicesImpl instantiated",LogLevel.OFF, PLong.INSTANCE),
     HCONNECTIONS_COUNTER("h", "Number of HConnections created by phoenix driver",LogLevel.OFF, PLong.INSTANCE),
     PHOENIX_CONNECTIONS_THROTTLED_COUNTER("ct", "Number of client Phoenix connections prevented from opening " +
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 597ae27..585832e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -339,6 +339,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
     @GuardedBy("connectionCountLock")
     private int connectionCount = 0;
+
+    @GuardedBy("connectionCountLock")
+    private int internalConnectionCount = 0;
+
     private final Object connectionCountLock = new Object();
     private final boolean returnSequenceValues ;
 
@@ -370,6 +374,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     private final boolean isAutoUpgradeEnabled;
     private final AtomicBoolean upgradeRequired = new AtomicBoolean(false);
     private final int maxConnectionsAllowed;
+    private final int maxInternalConnectionsAllowed;
     private final boolean shouldThrottleNumConnections;
     public static final byte[] MUTEX_LOCKED = "MUTEX_LOCKED".getBytes();
 
@@ -455,7 +460,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         this.isAutoUpgradeEnabled = config.getBoolean(AUTO_UPGRADE_ENABLED, QueryServicesOptions.DEFAULT_AUTO_UPGRADE_ENABLED);
         this.maxConnectionsAllowed = config.getInt(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS,
             QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS);
-        this.shouldThrottleNumConnections = (maxConnectionsAllowed > 0);
+        this.maxInternalConnectionsAllowed = config.getInt(QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS,
+                QueryServicesOptions.DEFAULT_INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS);
+        this.shouldThrottleNumConnections = (maxConnectionsAllowed > 0) || (maxInternalConnectionsAllowed > 0);
         if (!QueryUtil.isServerConnection(props)) {
             //Start queryDistruptor everytime as log level can be change at connection level as well, but we can avoid starting for server connections.
             try {
@@ -5221,12 +5228,30 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     public void addConnection(PhoenixConnection connection) throws SQLException {
         if (returnSequenceValues || shouldThrottleNumConnections) {
             synchronized (connectionCountLock) {
-                if (shouldThrottleNumConnections && connectionCount + 1 > maxConnectionsAllowed){
-                    GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment();
-                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_CONNECTION_THROTTLED).
-                        build().buildException();
+
+                /*
+                 * If we are throttling connections internal connections and client created connections
+                 *   are counted separately against each respective quota.
+                 */
+                if(shouldThrottleNumConnections) {
+                    int futureConnections = 1 + ( connection.isInternalConnection() ? internalConnectionCount : connectionCount);
+                    int allowedConnections = connection.isInternalConnection() ? maxInternalConnectionsAllowed : maxConnectionsAllowed;
+                    if(allowedConnections != 0 && futureConnections > allowedConnections) {
+                        GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment();
+                        if(connection.isInternalConnection()) {
+                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_INTERNAL_CONNECTION_THROTTLED).
+                                    build().buildException();
+                        }
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_CONNECTION_THROTTLED).
+                                build().buildException();
+                    }
+                }
+
+                if(!connection.isInternalConnection()) {
+                    connectionCount++;
+                } else {
+                    internalConnectionCount++;
                 }
-                connectionCount++;
             }
         }
         // If lease renewal isn't enabled, these are never cleaned up. Tracking when renewals
@@ -5241,15 +5266,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         if (returnSequenceValues) {
             ConcurrentMap<SequenceKey,Sequence> formerSequenceMap = null;
             synchronized (connectionCountLock) {
-                if (--connectionCount <= 0) {
-                    if (!this.sequenceMap.isEmpty()) {
-                        formerSequenceMap = this.sequenceMap;
-                        this.sequenceMap = Maps.newConcurrentMap();
+                if(!connection.isInternalConnection()) {
+                    if (connectionCount + internalConnectionCount - 1 <= 0) {
+                        if (!this.sequenceMap.isEmpty()) {
+                            formerSequenceMap = this.sequenceMap;
+                            this.sequenceMap = Maps.newConcurrentMap();
+                        }
                     }
                 }
-                if (connectionCount < 0) {
-                    connectionCount = 0;
-                }
             }
             // Since we're using the former sequenceMap, we can do this outside
             // the lock.
@@ -5257,9 +5281,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 // When there are no more connections, attempt to return any sequences
                 returnAllSequences(formerSequenceMap);
             }
-        } else if (shouldThrottleNumConnections){ //still need to decrement connection count
+        }
+        if (returnSequenceValues || shouldThrottleNumConnections){ //still need to decrement connection count
             synchronized (connectionCountLock) {
-                if (connectionCount > 0) {
+                if(connection.isInternalConnection() && internalConnectionCount > 0) {
+                    --internalConnectionCount;
+                } else if (connectionCount > 0) {
                     --connectionCount;
                 }
             }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index c869f2b..ac9a396 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -280,6 +280,9 @@ public interface QueryServices extends SQLCloseable {
     //max number of connections from a single client to a single cluster. 0 is unlimited.
     public static final String CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS =
         "phoenix.client.connection.max.allowed.connections";
+    //max number of connections from a single client to a single cluster. 0 is unlimited.
+    public static final String INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS =
+            "phoenix.internal.connection.max.allowed.connections";
     public static final String DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB  = "phoenix.default.column.encoded.bytes.attrib";
     public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB  = "phoenix.default.immutable.storage.scheme";
     public static final String DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB  = "phoenix.default.multitenant.immutable.storage.scheme";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 7bc34e6..85f932b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -320,6 +320,8 @@ public class QueryServicesOptions {
 
     //by default, max connections from one client to one cluster is unlimited
     public static final int DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS = 0;
+    //by default, max internal connections from one client to one cluster is unlimited
+    public static final int DEFAULT_INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS = 0;
     public static final boolean DEFAULT_STATS_COLLECTION_ENABLED = true;
     public static final boolean DEFAULT_USE_STATS_FOR_PARALLELIZATION = true;