You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2021/01/31 15:26:00 UTC

[phoenix] branch master updated (22f7d41 -> 68239ff)

This is an automated email from the ASF dual-hosted git repository.

stoty pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git.


    from 22f7d41  PHOENIX-6347 Remove maven-gpg-plugin invoication from release profile
     new e8bf61a  PHOENIX-5872 Close Internal Phoenix Connections that were running during cancel
     new 68239ff  PHOENIX-6078 Remove Internal Phoenix Connections from parent LinkedQueue when closed

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 phoenix-core/pom.xml                               |   4 +
 .../phoenix/query/MaxConcurrentConnectionsIT.java  | 163 +++++++++++++++++++++
 .../apache/phoenix/util/DelayedRegionServer.java   | 124 ++++++++++++++++
 .../compile/MutatingParallelIteratorFactory.java   |   5 +
 .../apache/phoenix/exception/SQLExceptionCode.java |   3 +
 .../org/apache/phoenix/jdbc/PhoenixConnection.java |  83 ++++++++++-
 .../phoenix/monitoring/GlobalClientMetrics.java    |   2 +
 .../org/apache/phoenix/monitoring/MetricType.java  |   1 +
 .../phoenix/query/ConnectionQueryServicesImpl.java |  57 +++++--
 .../org/apache/phoenix/query/QueryServices.java    |   3 +
 .../apache/phoenix/query/QueryServicesOptions.java |   2 +
 11 files changed, 424 insertions(+), 23 deletions(-)
 create mode 100644 phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
 create mode 100644 phoenix-core/src/it/java/org/apache/phoenix/util/DelayedRegionServer.java


[phoenix] 02/02: PHOENIX-6078 Remove Internal Phoenix Connections from parent LinkedQueue when closed

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 68239ffcb568cc0dbcb45ce7f1aa3f215fd1e652
Author: Saksham Gangwar <sa...@Sakshams-iMac.local>
AuthorDate: Fri Aug 14 16:33:58 2020 -0700

    PHOENIX-6078 Remove Internal Phoenix Connections from parent LinkedQueue when closed
---
 .../phoenix/query/MaxConcurrentConnectionsIT.java  | 41 +++++++++++++++++++---
 .../compile/MutatingParallelIteratorFactory.java   |  4 +++
 .../org/apache/phoenix/jdbc/PhoenixConnection.java | 24 +++++++++++++
 3 files changed, 64 insertions(+), 5 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
index 7da276c..611ef89 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.query;
 
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.util.DelayedRegionServer;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -31,22 +32,17 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Properties;
 
-import static org.apache.phoenix.exception.SQLExceptionCode.NEW_CONNECTION_THROTTLED;
 import static org.apache.phoenix.exception.SQLExceptionCode.NEW_INTERNAL_CONNECTION_THROTTLED;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
 import static org.apache.phoenix.query.QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS;
 import static org.apache.phoenix.query.QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS;
-import static org.apache.phoenix.query.QueryServices.RENEW_LEASE_ENABLED;
 import static org.apache.phoenix.query.QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -129,4 +125,39 @@ public class MaxConcurrentConnectionsIT extends BaseUniqueNamesOwnClusterIT {
         }
 
     }
+
+    @Test public void testClosedChildConnectionsRemovedFromParentQueue() throws SQLException {
+        String tableName = generateUniqueName();
+        String connectionUrl = getUniqueUrl();
+        int NUMBER_OF_ROWS = 10;
+        String ddl = "CREATE TABLE " + tableName + " (V BIGINT PRIMARY KEY, K BIGINT)";
+        Properties props = new Properties();
+        props.setProperty(CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, String.valueOf(10));
+        props.setProperty(INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS, String.valueOf(10));
+        try (Connection conn = DriverManager.getConnection(connectionUrl, props);
+                Statement statement = conn.createStatement()) {
+            statement.execute(ddl);
+        }
+        PhoenixConnection
+                connection =
+                (PhoenixConnection) DriverManager.getConnection(connectionUrl, props);
+        for (int i = 0; i < NUMBER_OF_ROWS; i++) {
+            connection.createStatement()
+                    .execute("UPSERT INTO " + tableName + " VALUES (" + i + ", " + i + ")");
+            connection.commit();
+        }
+        connection.setAutoCommit(false);
+        try {
+            for (int i = 0; i < NUMBER_OF_ROWS; i++) {
+                connection.createStatement()
+                        .execute("DELETE FROM " + tableName + " WHERE K = " + i);
+            }
+        } catch (SQLException e) {
+            fail();
+        } finally {
+            connection.close();
+        }
+        // All 10 child connections should be removed successfully from the queue
+        assertEquals(0, connection.getChildConnectionsCount());
+    }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index 7113867..2c5af7a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@ -119,6 +119,8 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
                         MutatingParallelIteratorFactory.this.connection.getMutationState()
                                 .join(finalState);
                     } finally {
+                        //Removing to be closed connection from the parent connection queue.
+                        connection.removeChildConnection(clonedConnection);
                         clonedConnection.close();
                     }
                 }
@@ -131,6 +133,8 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
         } catch (Throwable ex) {
             // Catch just to make sure we close the cloned connection and then rethrow
             try {
+                //Removing to be closed connection from the parent connection queue.
+                connection.removeChildConnection(clonedConnection);
                 // closeQuietly only handles IOException
                 clonedConnection.close();
             } catch (SQLException sqlEx) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index ae47e7d..dab4c6a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -478,6 +478,30 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
         childConnections.add(connection);
     }
 
+    /**
+     * Method to remove child connection from childConnections Queue
+     *
+     * @param connection
+     */
+    public void removeChildConnection(PhoenixConnection connection) {
+        if (childConnections != null) {
+            childConnections.remove(connection);
+        }
+    }
+
+    /**
+     * Method to fetch child connections count from childConnections Queue
+     *
+     * @return int count
+     */
+    @VisibleForTesting
+    public int getChildConnectionsCount() {
+        if (childConnections != null) {
+            return childConnections.size();
+        }
+        return 0;
+    }
+
     public Sampler<?> getSampler() {
         return this.sampler;
     }


[phoenix] 01/02: PHOENIX-5872 Close Internal Phoenix Connections that were running during cancel

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit e8bf61ad4005ccd0aefbe234a38959897ce535c5
Author: Daniel Wong <da...@salesforce.com>
AuthorDate: Mon Apr 27 03:38:54 2020 -0700

    PHOENIX-5872 Close Internal Phoenix Connections that were running during cancel
    
    Signed-off-by: Xinyi Yan <ya...@apache.org>
---
 phoenix-core/pom.xml                               |   4 +
 .../phoenix/query/MaxConcurrentConnectionsIT.java  | 132 +++++++++++++++++++++
 .../apache/phoenix/util/DelayedRegionServer.java   | 124 +++++++++++++++++++
 .../compile/MutatingParallelIteratorFactory.java   |   1 +
 .../apache/phoenix/exception/SQLExceptionCode.java |   3 +
 .../org/apache/phoenix/jdbc/PhoenixConnection.java |  59 +++++++--
 .../phoenix/monitoring/GlobalClientMetrics.java    |   2 +
 .../org/apache/phoenix/monitoring/MetricType.java  |   1 +
 .../phoenix/query/ConnectionQueryServicesImpl.java |  57 ++++++---
 .../org/apache/phoenix/query/QueryServices.java    |   3 +
 .../apache/phoenix/query/QueryServicesOptions.java |   2 +
 11 files changed, 365 insertions(+), 23 deletions(-)

diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 33a8013..7b91753 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -318,6 +318,10 @@
       <groupId>org.apache.hbase.thirdparty</groupId>
       <artifactId>hbase-shaded-miscellaneous</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hbase.thirdparty</groupId>
+      <artifactId>hbase-shaded-protobuf</artifactId>
+    </dependency>
 
     <!-- HBase test dependencies -->
     <dependency>
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
new file mode 100644
index 0000000..7da276c
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.query;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.util.DelayedRegionServer;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.phoenix.exception.SQLExceptionCode.NEW_CONNECTION_THROTTLED;
+import static org.apache.phoenix.exception.SQLExceptionCode.NEW_INTERNAL_CONNECTION_THROTTLED;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
+import static org.apache.phoenix.query.QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS;
+import static org.apache.phoenix.query.QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS;
+import static org.apache.phoenix.query.QueryServices.RENEW_LEASE_ENABLED;
+import static org.apache.phoenix.query.QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Note that some tests for concurrentConnections live in PhoenixMetricsIT.java which also test the metric emission
+ */
+public class MaxConcurrentConnectionsIT extends BaseUniqueNamesOwnClusterIT {
+
+    private static HBaseTestingUtility hbaseTestUtil;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        hbaseTestUtil = new HBaseTestingUtility();
+
+        hbaseTestUtil.startMiniCluster(1,1,null,null,DelayedRegionServer.class);
+        // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
+        String zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
+        url = PhoenixRuntime.JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkQuorum +
+                JDBC_PROTOCOL_SEPARATOR + "uniqueConn=A";
+        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+    }
+
+    private String getUniqueUrl() {
+        return url + generateUniqueName();
+    }
+
+    //Have to shutdown our special delayed region server
+    @AfterClass
+    public static void tearDown() throws Exception {
+        hbaseTestUtil.shutdownMiniCluster();
+    }
+
+    /**
+     * This tests the delete path which creates a internal phoenix connection per region
+     * @throws Exception
+     */
+    @Test
+    public void testDeleteRuntimeFailureClosesConnections() throws Exception {
+        String tableName = generateUniqueName();
+        String connectionUrl = getUniqueUrl();
+        //table with lots of regions
+        String ddl = "create table " + tableName +  "  (i integer not null primary key, j integer) SALT_BUCKETS=256 ";
+
+        Properties props = new Properties();
+        props.setProperty(CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS,String.valueOf(10));
+        props.setProperty(INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS,String.valueOf(10));
+
+        //delay any task handeling as that causes additional connections
+        props.setProperty(TASK_HANDLING_INTERVAL_MS_ATTRIB,String.valueOf(600000));
+        props.setProperty(TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,String.valueOf(600000));
+
+        String deleteStmt = "DELETE FROM " + tableName + " WHERE 20 = j";
+
+        try(Connection conn = DriverManager.getConnection(connectionUrl, props); Statement statement = conn.createStatement()) {
+            statement.execute(ddl);
+        }
+
+        assertEquals(0, GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue());
+        assertEquals(0, GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.getMetric().getValue());
+        Connection conn = null;
+        try {
+            conn = DriverManager.getConnection(connectionUrl, props);
+            //Enable delay for the delete
+            DelayedRegionServer.setDelayEnabled(true);
+            try (Statement statement = conn.createStatement()) {
+                statement.execute(deleteStmt);
+            }
+            fail();
+        } catch (SQLException e) {
+            assertEquals(NEW_INTERNAL_CONNECTION_THROTTLED.getErrorCode(), e.getErrorCode());
+            assertEquals(NEW_INTERNAL_CONNECTION_THROTTLED.getSQLState(), e.getSQLState());
+        } finally {
+            DelayedRegionServer.setDelayEnabled(false);
+            if (conn != null) {
+                conn.close();
+            }
+            long connections = GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue();
+            assertEquals(String.format("Found %d connections still open.", connections),0,connections);
+            connections = GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.getMetric().getValue();
+            assertEquals(String.format("Found %d internal connections still open.", connections),0 ,connections);
+        }
+
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/util/DelayedRegionServer.java b/phoenix-core/src/it/java/org/apache/phoenix/util/DelayedRegionServer.java
new file mode 100644
index 0000000..95bf3f4
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/util/DelayedRegionServer.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * This is a extended MiniHbaseCluster Region Server whcih allows developer/tester to inject
+ * delay into specific server side operations for testing.
+ */
+public class DelayedRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(DelayedRegionServer.class);
+
+    static boolean doDelay = false;
+    // Activate the delays after table creation to test get/scan/put
+    private static int DELAY_GET = 0;
+    private static int DELAY_SCAN = 30000;
+    private static int DELAY_MUTATE = 0;
+
+    public static void setDelayEnabled(boolean delay) {
+        doDelay = delay;
+    }
+
+    public static void setDelayGet(int delayGet) {
+        DELAY_GET = delayGet;
+    }
+
+    public static void setDelayScan(int delayScan) {
+        DELAY_SCAN = delayScan;
+    }
+
+    public static void setDelayMutate(int delayMutate) {
+        DELAY_MUTATE = delayMutate;
+    }
+
+    public DelayedRegionServer(Configuration conf)
+            throws IOException, InterruptedException {
+        super(conf);
+    }
+
+    @Override protected RSRpcServices createRpcServices() throws IOException {
+        return new DelayedRSRpcServices(this);
+    }
+
+    /**
+     * This class injects delay for Rpc calls and after executes super methods is delay is set.
+     */
+    public static class DelayedRSRpcServices extends RSRpcServices {
+
+        DelayedRSRpcServices(HRegionServer rs) throws IOException {
+            super(rs);
+        }
+
+        @Override public GetResponse get(final RpcController controller,
+                final GetRequest request) throws ServiceException {
+            try {
+                if (doDelay) {
+                    Thread.sleep(DELAY_GET);
+                }
+            } catch (InterruptedException e) {
+                LOGGER.error("Sleep interrupted during get operation", e);
+            }
+            return super.get(controller, request);
+        }
+
+        @Override public MutateResponse mutate(final RpcController rpcc,
+                final MutateRequest request) throws ServiceException {
+            try {
+                if (doDelay) {
+                    Thread.sleep(DELAY_MUTATE);
+                }
+            } catch (InterruptedException e) {
+                LOGGER.error("Sleep interrupted during mutate operation", e);
+            }
+            return super.mutate(rpcc, request);
+        }
+
+        @Override public ScanResponse scan(final RpcController controller,
+                ScanRequest request) throws ServiceException {
+            try {
+                if (doDelay) {
+                    Thread.sleep(DELAY_SCAN);
+                }
+            } catch (InterruptedException e) {
+                LOGGER.error("Sleep interrupted during scan operation", e);
+            }
+            return super.scan(controller, request);
+        }
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index 4aaa72d..7113867 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@ -66,6 +66,7 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
             QueryPlan plan) throws SQLException {
 
         final PhoenixConnection clonedConnection = new PhoenixConnection(this.connection);
+        connection.addChildConnection(clonedConnection);
         try {
             MutationState state = mutate(parentContext, iterator, clonedConnection);
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index bb6d629..17157db 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -524,6 +524,9 @@ public enum SQLExceptionCode {
                     info.getMutationSizeBytes());
         }
     }),
+    NEW_INTERNAL_CONNECTION_THROTTLED(731, "410M1", "Could not create connection " +
+            "because the internal connections already has the maximum number" +
+            " of connections to the target cluster."),
     MAX_HBASE_CLIENT_KEYVALUE_MAXSIZE_EXCEEDED(732,
             "LIM03", "The Phoenix Column size is bigger than maximum " +
             "HBase client key value allowed size for ONE_CELL_PER_COLUMN table, " +
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 25f0f3e..ae47e7d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.jdbc;
 
 import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkNotNull;
 import static java.util.Collections.emptyMap;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER;
 
@@ -53,6 +54,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -173,6 +175,13 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
     private Double logSamplingRate;
     private String sourceOfOperation;
 
+    private Object queueCreationLock = new Object(); // lock for the lazy init path of childConnections structure
+    private ConcurrentLinkedQueue<PhoenixConnection> childConnections = null;
+
+    //For now just the copy constructor paths will have this as true as I don't want to change the
+    //public interfaces.
+    private final boolean isInternalConnection;
+
     static {
         Tracing.addTraceMetricsSource();
     }
@@ -189,7 +198,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
         this(connection.getQueryServices(), connection.getURL(), connection
                 .getClientInfo(), connection.metaData, connection
                 .getMutationState(), isDescRowKeyOrderUpgrade,
-                isRunningUpgrade, connection.buildingIndex);
+                isRunningUpgrade, connection.buildingIndex, true);
         this.isAutoCommit = connection.isAutoCommit;
         this.isAutoFlush = connection.isAutoFlush;
         this.sampler = connection.sampler;
@@ -206,7 +215,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
         this(connection.getQueryServices(), connection.getURL(), connection
                 .getClientInfo(), connection.getMetaDataCache(), mutationState,
                 connection.isDescVarLengthRowKeyUpgrade(), connection
-                .isRunningUpgrade(), connection.buildingIndex);
+                .isRunningUpgrade(), connection.buildingIndex, true);
     }
 
     public PhoenixConnection(PhoenixConnection connection, long scn)
@@ -217,7 +226,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
 	public PhoenixConnection(PhoenixConnection connection, Properties props) throws SQLException {
         this(connection.getQueryServices(), connection.getURL(), props, connection.metaData, connection
                 .getMutationState(), connection.isDescVarLengthRowKeyUpgrade(),
-                connection.isRunningUpgrade(), connection.buildingIndex);
+                connection.isRunningUpgrade(), connection.buildingIndex, true);
         this.isAutoCommit = connection.isAutoCommit;
         this.isAutoFlush = connection.isAutoFlush;
         this.sampler = connection.sampler;
@@ -226,7 +235,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
 
     public PhoenixConnection(ConnectionQueryServices services, String url,
             Properties info, PMetaData metaData) throws SQLException {
-        this(services, url, info, metaData, null, false, false, false);
+        this(services, url, info, metaData, null, false, false, false, false);
     }
 
     public PhoenixConnection(PhoenixConnection connection,
@@ -234,16 +243,17 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
                     throws SQLException {
         this(services, connection.url, info, connection.metaData, null,
                 connection.isDescVarLengthRowKeyUpgrade(), connection
-                .isRunningUpgrade(), connection.buildingIndex);
+                .isRunningUpgrade(), connection.buildingIndex, true);
     }
 
     private PhoenixConnection(ConnectionQueryServices services, String url,
             Properties info, PMetaData metaData, MutationState mutationState,
             boolean isDescVarLengthRowKeyUpgrade, boolean isRunningUpgrade,
-            boolean buildingIndex) throws SQLException {
+            boolean buildingIndex, boolean isInternalConnection) throws SQLException {
         GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.increment();
         this.url = url;
         this.isDescVarLengthRowKeyUpgrade = isDescVarLengthRowKeyUpgrade;
+        this.isInternalConnection = isInternalConnection;
 
         // Filter user provided properties based on property policy, if
         // provided and QueryServices.PROPERTY_POLICY_PROVIDER_ENABLED is true
@@ -389,9 +399,13 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
         
         this.logSamplingRate = Double.parseDouble(this.services.getProps().get(QueryServices.LOG_SAMPLE_RATE,
                 QueryServicesOptions.DEFAULT_LOG_SAMPLE_RATE));
+        if(isInternalConnection) {
+            GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.increment();
+        } else {
+            GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment();
+        }
         this.sourceOfOperation =
                 this.services.getProps().get(QueryServices.SOURCE_OPERATION_ATTRIB, null);
-        GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment();
     }
 
     private static void checkScn(Long scnParam) throws SQLException {
@@ -444,6 +458,26 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
         return result.build();
     }
 
+    public boolean isInternalConnection() {
+        return isInternalConnection;
+    }
+
+    /**
+     * This method, and *only* this method is thread safe
+     * @param connection
+     */
+    public void addChildConnection(PhoenixConnection connection) {
+        //double check for performance
+        if(childConnections == null) {
+            synchronized (queueCreationLock) {
+                if (childConnections == null) {
+                    childConnections = new ConcurrentLinkedQueue<>();
+                }
+            }
+        }
+        childConnections.add(connection);
+    }
+
     public Sampler<?> getSampler() {
         return this.sampler;
     }
@@ -658,13 +692,22 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
                     traceScope.close();
                 }
                 closeStatements();
+                synchronized (queueCreationLock) {
+                    if (childConnections != null) {
+                        SQLCloseables.closeAllQuietly(childConnections);
+                    }
+                }
             } finally {
                 services.removeConnection(this);
             }
             
         } finally {
             isClosed = true;
-            GLOBAL_OPEN_PHOENIX_CONNECTIONS.decrement();
+            if(isInternalConnection()){
+                GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.decrement();
+            } else {
+                GLOBAL_OPEN_PHOENIX_CONNECTIONS.decrement();
+            }
         }
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
index 9ebc1af..dce01d5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
@@ -26,6 +26,7 @@ import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS;
+import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.INDEX_COMMIT_FAILURE_SIZE;
 import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
@@ -99,6 +100,7 @@ public enum GlobalClientMetrics {
     GLOBAL_FAILED_QUERY_COUNTER(QUERY_FAILED_COUNTER),
     GLOBAL_SPOOL_FILE_COUNTER(SPOOL_FILE_COUNTER),
     GLOBAL_OPEN_PHOENIX_CONNECTIONS(OPEN_PHOENIX_CONNECTIONS_COUNTER),
+    GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS(OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER),
     GLOBAL_QUERY_SERVICES_COUNTER(QUERY_SERVICES_COUNTER),
     GLOBAL_HCONNECTIONS_COUNTER(HCONNECTIONS_COUNTER),
     GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER(PHOENIX_CONNECTIONS_THROTTLED_COUNTER),
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index 1396a89..40fcad0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -58,6 +58,7 @@ public enum MetricType {
     WALL_CLOCK_TIME_MS("tq", "Wall clock time elapsed for the overall query execution",LogLevel.INFO, PLong.INSTANCE),
     RESULT_SET_TIME_MS("tn", "Wall clock time elapsed for reading all records using resultSet.next()",LogLevel.INFO, PLong.INSTANCE),
     OPEN_PHOENIX_CONNECTIONS_COUNTER("o", "Number of open phoenix connections",LogLevel.OFF, PLong.INSTANCE),
+    OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER("io", "Number of open internal phoenix connections",LogLevel.OFF, PLong.INSTANCE),
     QUERY_SERVICES_COUNTER("cqs", "Number of ConnectionQueryServicesImpl instantiated",LogLevel.OFF, PLong.INSTANCE),
     HCONNECTIONS_COUNTER("h", "Number of HConnections created by phoenix driver",LogLevel.OFF, PLong.INSTANCE),
     PHOENIX_CONNECTIONS_THROTTLED_COUNTER("ct", "Number of client Phoenix connections prevented from opening " +
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 597ae27..585832e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -339,6 +339,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
     @GuardedBy("connectionCountLock")
     private int connectionCount = 0;
+
+    @GuardedBy("connectionCountLock")
+    private int internalConnectionCount = 0;
+
     private final Object connectionCountLock = new Object();
     private final boolean returnSequenceValues ;
 
@@ -370,6 +374,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     private final boolean isAutoUpgradeEnabled;
     private final AtomicBoolean upgradeRequired = new AtomicBoolean(false);
     private final int maxConnectionsAllowed;
+    private final int maxInternalConnectionsAllowed;
     private final boolean shouldThrottleNumConnections;
     public static final byte[] MUTEX_LOCKED = "MUTEX_LOCKED".getBytes();
 
@@ -455,7 +460,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         this.isAutoUpgradeEnabled = config.getBoolean(AUTO_UPGRADE_ENABLED, QueryServicesOptions.DEFAULT_AUTO_UPGRADE_ENABLED);
         this.maxConnectionsAllowed = config.getInt(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS,
             QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS);
-        this.shouldThrottleNumConnections = (maxConnectionsAllowed > 0);
+        this.maxInternalConnectionsAllowed = config.getInt(QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS,
+                QueryServicesOptions.DEFAULT_INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS);
+        this.shouldThrottleNumConnections = (maxConnectionsAllowed > 0) || (maxInternalConnectionsAllowed > 0);
         if (!QueryUtil.isServerConnection(props)) {
             //Start queryDistruptor everytime as log level can be change at connection level as well, but we can avoid starting for server connections.
             try {
@@ -5221,12 +5228,30 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     public void addConnection(PhoenixConnection connection) throws SQLException {
         if (returnSequenceValues || shouldThrottleNumConnections) {
             synchronized (connectionCountLock) {
-                if (shouldThrottleNumConnections && connectionCount + 1 > maxConnectionsAllowed){
-                    GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment();
-                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_CONNECTION_THROTTLED).
-                        build().buildException();
+
+                /*
+                 * If we are throttling connections internal connections and client created connections
+                 *   are counted separately against each respective quota.
+                 */
+                if(shouldThrottleNumConnections) {
+                    int futureConnections = 1 + ( connection.isInternalConnection() ? internalConnectionCount : connectionCount);
+                    int allowedConnections = connection.isInternalConnection() ? maxInternalConnectionsAllowed : maxConnectionsAllowed;
+                    if(allowedConnections != 0 && futureConnections > allowedConnections) {
+                        GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment();
+                        if(connection.isInternalConnection()) {
+                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_INTERNAL_CONNECTION_THROTTLED).
+                                    build().buildException();
+                        }
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_CONNECTION_THROTTLED).
+                                build().buildException();
+                    }
+                }
+
+                if(!connection.isInternalConnection()) {
+                    connectionCount++;
+                } else {
+                    internalConnectionCount++;
                 }
-                connectionCount++;
             }
         }
         // If lease renewal isn't enabled, these are never cleaned up. Tracking when renewals
@@ -5241,15 +5266,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         if (returnSequenceValues) {
             ConcurrentMap<SequenceKey,Sequence> formerSequenceMap = null;
             synchronized (connectionCountLock) {
-                if (--connectionCount <= 0) {
-                    if (!this.sequenceMap.isEmpty()) {
-                        formerSequenceMap = this.sequenceMap;
-                        this.sequenceMap = Maps.newConcurrentMap();
+                if(!connection.isInternalConnection()) {
+                    if (connectionCount + internalConnectionCount - 1 <= 0) {
+                        if (!this.sequenceMap.isEmpty()) {
+                            formerSequenceMap = this.sequenceMap;
+                            this.sequenceMap = Maps.newConcurrentMap();
+                        }
                     }
                 }
-                if (connectionCount < 0) {
-                    connectionCount = 0;
-                }
             }
             // Since we're using the former sequenceMap, we can do this outside
             // the lock.
@@ -5257,9 +5281,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 // When there are no more connections, attempt to return any sequences
                 returnAllSequences(formerSequenceMap);
             }
-        } else if (shouldThrottleNumConnections){ //still need to decrement connection count
+        }
+        if (returnSequenceValues || shouldThrottleNumConnections){ //still need to decrement connection count
             synchronized (connectionCountLock) {
-                if (connectionCount > 0) {
+                if(connection.isInternalConnection() && internalConnectionCount > 0) {
+                    --internalConnectionCount;
+                } else if (connectionCount > 0) {
                     --connectionCount;
                 }
             }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index c869f2b..ac9a396 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -280,6 +280,9 @@ public interface QueryServices extends SQLCloseable {
     //max number of connections from a single client to a single cluster. 0 is unlimited.
     public static final String CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS =
         "phoenix.client.connection.max.allowed.connections";
+    //max number of connections from a single client to a single cluster. 0 is unlimited.
+    public static final String INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS =
+            "phoenix.internal.connection.max.allowed.connections";
     public static final String DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB  = "phoenix.default.column.encoded.bytes.attrib";
     public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB  = "phoenix.default.immutable.storage.scheme";
     public static final String DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB  = "phoenix.default.multitenant.immutable.storage.scheme";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 7bc34e6..85f932b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -320,6 +320,8 @@ public class QueryServicesOptions {
 
     //by default, max connections from one client to one cluster is unlimited
     public static final int DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS = 0;
+    //by default, max internal connections from one client to one cluster is unlimited
+    public static final int DEFAULT_INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS = 0;
     public static final boolean DEFAULT_STATS_COLLECTION_ENABLED = true;
     public static final boolean DEFAULT_USE_STATS_FOR_PARALLELIZATION = true;