You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2022/07/21 16:08:20 UTC

[phoenix] branch master updated: PHOENIX-6687 The region server hosting the SYSTEM.CATALOG fails to serve any metadata requests as default handler pool threads are exhausted (#1440)

This is an automated email from the ASF dual-hosted git repository.

gjacoby pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f4d0863cc PHOENIX-6687 The region server hosting the SYSTEM.CATALOG fails to serve any metadata requests as default handler pool threads are exhausted (#1440)
4f4d0863cc is described below

commit 4f4d0863cca2ea7233aa871484be0763d32f0b33
Author: Jacob Isaac <ja...@gmail.com>
AuthorDate: Thu Jul 21 09:08:15 2022 -0700

    PHOENIX-6687 The region server hosting the SYSTEM.CATALOG fails to serve any metadata requests as default handler pool threads are exhausted (#1440)
    
    Co-authored-by: Jacob Isaac <ji...@salesforce.com>
    Co-authored-by: Geoffrey Jacoby <gj...@apache.org>
---
 .../end2end/ConnectionQueryServicesTestImpl.java   |   5 +
 .../SystemTablesCreationOnConnectionIT.java        |  45 ++++++
 .../hadoop/hbase/ipc/PhoenixRpcScheduler.java      |  44 +++++-
 .../hbase/ipc/PhoenixRpcSchedulerFactory.java      |  11 +-
 .../ipc/controller/MetadataRpcController.java      |   3 +
 .../controller/ServerSideRPCControllerFactory.java |  41 ++++++
 .../controller/ServerToServerRpcController.java    |  42 ++++++
 ...r.java => ServerToServerRpcControllerImpl.java} |  74 +++++-----
 .../query/ChildLinkMetaDataServiceCallBack.java    |  26 +++-
 .../phoenix/query/ConnectionQueryServicesImpl.java | 156 +++++++++++++--------
 .../org/apache/phoenix/query/QueryServices.java    |   4 +-
 .../apache/phoenix/query/QueryServicesOptions.java |   2 +
 .../java/org/apache/phoenix/util/QueryUtil.java    |   3 +-
 .../hbase/ipc/PhoenixIndexRpcSchedulerTest.java    |  51 ++++++-
 14 files changed, 394 insertions(+), 113 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
index 7e7b6b1236..4fdf47c538 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
@@ -28,6 +28,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.google.protobuf.RpcController;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.query.ConnectionQueryServicesImpl;
@@ -76,6 +77,10 @@ public class ConnectionQueryServicesTestImpl extends ConnectionQueryServicesImpl
         super.removeConnection(connection);
     }
 
+    public RpcController getController() {
+        return super.getController();
+    }
+
     @Override
     public void close() throws SQLException {
         try {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
index 76af21346a..3633617e1c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
@@ -49,6 +49,9 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.NamespaceNotFoundException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.ipc.controller.ServerToServerRpcController;
 import org.apache.phoenix.compat.hbase.CompatUtil;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -63,6 +66,7 @@ import org.apache.phoenix.query.ConnectionQueryServicesImpl;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesTestImpl;
+import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.UpgradeUtil;
 import org.junit.After;
@@ -217,7 +221,48 @@ public class SystemTablesCreationOnConnectionIT {
         assertEquals(1, countUpgradeAttempts);
     }
 
+    // Conditions: isDoNotUpgradePropSet is true
+    // Conditions: IS_SERVER_CONNECTION is true
+    // Expected: We do not create SYSTEM.CATALOG even if this is the first connection to the server
+    @Test
+    public void testGetConnectionOnServer() throws Exception {
+        startMiniClusterWithToggleNamespaceMapping(Boolean.FALSE.toString());
+        verifyCQSIUsingAppropriateRPCContoller(true);
+    }
+
+    // Conditions: isDoNotUpgradePropSet is true
+    // Conditions: IS_SERVER_CONNECTION is false
+    // Expected: We do not create SYSTEM.CATALOG even if this is the first connection to the server
+    @Test
+    public void testGetRegularConnection() throws Exception {
+        startMiniClusterWithToggleNamespaceMapping(Boolean.FALSE.toString());
+        verifyCQSIUsingAppropriateRPCContoller(false);
+    }
+
+    private void verifyCQSIUsingAppropriateRPCContoller(boolean isServerSideConnection) throws Exception {
+        Properties serverSideProperties = new Properties();
+        // Set doNotUpgradeProperty to true
+        UpgradeUtil.doNotUpgradeOnFirstConnection(serverSideProperties);
+        if (isServerSideConnection) {
+            QueryUtil.setServerConnection(serverSideProperties);
+        }
+        PhoenixTestDriver driver = new PhoenixTestDriver(ReadOnlyProps.EMPTY_PROPS);
+
+        ConnectionQueryServices cqsi = driver.getConnectionQueryServices(getJdbcUrl(), serverSideProperties);
+        assertTrue(cqsi instanceof ConnectionQueryServicesTestImpl);
+        ConnectionQueryServicesTestImpl testCQSI = (ConnectionQueryServicesTestImpl)cqsi;
+        if (isServerSideConnection) {
+            assertTrue( testCQSI.getController() instanceof ServerToServerRpcController);
+        } else {
+            assertTrue( testCQSI.getController() instanceof ServerRpcController);
+        }
+        assertTrue(testCQSI.isUpgradeRequired());
+        hbaseTables = getHBaseTables();
+        assertFalse(hbaseTables.contains(PHOENIX_SYSTEM_CATALOG) ||
+                hbaseTables.contains(PHOENIX_NAMESPACE_MAPPED_SYSTEM_CATALOG));
+        assertEquals(0, hbaseTables.size());
 
+    }
     /********************* Testing SYSTEM.CATALOG/SYSTEM:CATALOG creation/upgrade behavior for subsequent connections *********************/
 
     // We are ignoring this test because we aren't testing SYSCAT timestamp anymore if
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
index b2a7503538..6d816ec17d 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
@@ -40,23 +40,29 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
 
     private int indexPriority;
     private int metadataPriority;
+    private int serverSidePriority;
     private RpcExecutor indexCallExecutor;
     private RpcExecutor metadataCallExecutor;
+    private RpcExecutor serverSideCallExecutor;
     private int port;
     
 
-    public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority, int metadataPriority, PriorityFunction priorityFunction, Abortable abortable) {
+    public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority, int metadataPriority, int serversidePriority, PriorityFunction priorityFunction, Abortable abortable) {
         // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4
     	int indexHandlerCount = conf.getInt(QueryServices.INDEX_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
-    	int metadataHandlerCount = conf.getInt(QueryServices.METADATA_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
+        int metadataHandlerCount = conf.getInt(QueryServices.METADATA_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_METADATA_HANDLER_COUNT);
+        int serverSideHandlerCount = conf.getInt(QueryServices.SERVER_SIDE_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_SERVERSIDE_HANDLER_COUNT);
         int maxIndexQueueLength =  conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, indexHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
         int maxMetadataQueueLength =  conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, metadataHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+        int maxServerSideQueueLength =  conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, serverSideHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
 
         this.indexPriority = indexPriority;
         this.metadataPriority = metadataPriority;
+        this.serverSidePriority = serversidePriority;
         this.delegate = delegate;
         this.indexCallExecutor = new BalancedQueueRpcExecutor("Index", indexHandlerCount, maxIndexQueueLength, priorityFunction,conf,abortable);
         this.metadataCallExecutor = new BalancedQueueRpcExecutor("Metadata", metadataHandlerCount, maxMetadataQueueLength, priorityFunction,conf,abortable);
+        this.serverSideCallExecutor = new BalancedQueueRpcExecutor("ServerSide", serverSideHandlerCount, maxServerSideQueueLength, priorityFunction,conf,abortable);
     }
 
     @Override
@@ -70,6 +76,7 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
         delegate.start();
         indexCallExecutor.start(port);
         metadataCallExecutor.start(port);
+        serverSideCallExecutor.start(port);
     }
 
     @Override
@@ -77,6 +84,7 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
         delegate.stop();
         indexCallExecutor.stop();
         metadataCallExecutor.stop();
+        serverSideCallExecutor.stop();
     }
 
     @Override
@@ -87,6 +95,8 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
             return indexCallExecutor.dispatch(callTask);
         } else if (metadataPriority == priority) {
             return metadataCallExecutor.dispatch(callTask);
+        } else if (serverSidePriority == priority) {
+            return serverSideCallExecutor.dispatch(callTask);
         } else {
             return delegate.dispatch(callTask);
         }
@@ -101,7 +111,10 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
     public int getGeneralQueueLength() {
         // not the best way to calculate, but don't have a better way to hook
         // into metrics at the moment
-        return this.delegate.getGeneralQueueLength() + this.indexCallExecutor.getQueueLength() + this.metadataCallExecutor.getQueueLength();
+        return this.delegate.getGeneralQueueLength()
+                + this.indexCallExecutor.getQueueLength()
+                + this.metadataCallExecutor.getQueueLength()
+                + this.serverSideCallExecutor.getQueueLength();
     }
 
     @Override
@@ -116,7 +129,10 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
 
     @Override
     public int getActiveRpcHandlerCount() {
-        return this.delegate.getActiveRpcHandlerCount() + this.indexCallExecutor.getActiveHandlerCount() + this.metadataCallExecutor.getActiveHandlerCount();
+        return this.delegate.getActiveRpcHandlerCount()
+                + this.indexCallExecutor.getActiveHandlerCount()
+                + this.metadataCallExecutor.getActiveHandlerCount()
+                + this.serverSideCallExecutor.getActiveHandlerCount();
     }
 
     @Override
@@ -139,6 +155,26 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
         this.metadataCallExecutor = executor;
     }
 
+    @VisibleForTesting
+    public void setServerSideExecutorForTesting(RpcExecutor executor) {
+        this.serverSideCallExecutor = executor;
+    }
+
+    @VisibleForTesting
+    public RpcExecutor getIndexExecutorForTesting() {
+        return this.indexCallExecutor;
+    }
+
+    @VisibleForTesting
+    public RpcExecutor getMetadataExecutorForTesting() {
+        return this.metadataCallExecutor;
+    }
+
+    @VisibleForTesting
+    public RpcExecutor getServerSideExecutorForTesting() {
+        return this.serverSideCallExecutor;
+    }
+
     @Override
     public int getWriteQueueLength() {
         return delegate.getWriteQueueLength();
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
index 57c0d19685..74adf01573 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
@@ -58,6 +58,9 @@ public class PhoenixRpcSchedulerFactory implements RpcSchedulerFactory {
         // get the metadata priority configs
         int metadataPriority = getMetadataPriority(conf);
         validatePriority(metadataPriority);
+        // get the server side priority configs
+        int serverSidePriority = getServerSidePriority(conf);
+        validatePriority(serverSidePriority);
 
         // validate index and metadata priorities are not the same
         Preconditions.checkArgument(indexPriority != metadataPriority, "Index and Metadata priority must not be same "+ indexPriority);
@@ -65,7 +68,7 @@ public class PhoenixRpcSchedulerFactory implements RpcSchedulerFactory {
                 + indexPriority + " and metadata rpc priority " + metadataPriority);
 
         PhoenixRpcScheduler scheduler =
-                new PhoenixRpcScheduler(conf, delegate, indexPriority, metadataPriority, priorityFunction,abortable);
+                new PhoenixRpcScheduler(conf, delegate, indexPriority, metadataPriority, serverSidePriority, priorityFunction,abortable);
         return scheduler;
     }
 
@@ -89,5 +92,9 @@ public class PhoenixRpcSchedulerFactory implements RpcSchedulerFactory {
     public static int getMetadataPriority(Configuration conf) {
         return conf.getInt(QueryServices.METADATA_PRIOIRTY_ATTRIB, QueryServicesOptions.DEFAULT_METADATA_PRIORITY);
     }
-    
+
+    public static int getServerSidePriority(Configuration conf) {
+        return conf.getInt(QueryServices.SERVER_SIDE_PRIOIRTY_ATTRIB, QueryServicesOptions.DEFAULT_SERVER_SIDE_PRIORITY);
+    }
+
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
index ef135f0cb1..2800ca461d 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
@@ -43,6 +43,7 @@ class MetadataRpcController extends DelegatingHBaseRpcController {
 			.add(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)
 			.add(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME)
 			.add(PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME)
+			.add(PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME)
             .add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, true)
                     .getNameAsString())
             .add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES, true)
@@ -51,6 +52,8 @@ class MetadataRpcController extends DelegatingHBaseRpcController {
                     .getNameAsString())
             .add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES, true)
                     .getNameAsString())
+			.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES, true)
+					.getNameAsString())
             .build();
 
 	public MetadataRpcController(HBaseRpcController delegate,
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java
new file mode 100644
index 0000000000..ba7fb6339d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc.controller;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link RpcControllerFactory} that should only be used when
+ * making server-server remote RPCs to the region servers hosting Phoenix SYSTEM tables.
+ */
+public class ServerSideRPCControllerFactory  {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ServerSideRPCControllerFactory.class);
+    protected final Configuration conf;
+
+    public ServerSideRPCControllerFactory(Configuration conf) {
+        this.conf = conf;
+    }
+
+    public ServerToServerRpcController newController() {
+        return new ServerToServerRpcControllerImpl(this.conf);
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerToServerRpcController.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerToServerRpcController.java
new file mode 100644
index 0000000000..4916168b9d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerToServerRpcController.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.hadoop.hbase.ipc.controller;
+
+import com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+
+public interface ServerToServerRpcController extends RpcController {
+
+    /**
+     * @param priority Priority for this request; should fall roughly in the range
+     *          {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS}
+     */
+    void setPriority(int priority);
+
+    /**
+     * @param tn Set priority based off the table we are going against.
+     */
+    void setPriority(final TableName tn);
+
+    /**
+     * @return The priority of this request
+     */
+    int getPriority();
+}
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerToServerRpcControllerImpl.java
similarity index 53%
copy from phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
copy to phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerToServerRpcControllerImpl.java
index ef135f0cb1..b6a27cd4b0 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerToServerRpcControllerImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,57 +15,61 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+ 
 package org.apache.hadoop.hbase.ipc.controller;
 
-import java.util.List;
-
+import com.google.protobuf.RpcController;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.phoenix.util.SchemaUtil;
 
-import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
-import com.google.protobuf.RpcController;
+import java.util.List;
 
 /**
- * {@link RpcController} that sets the appropriate priority of RPC calls destined for Phoenix SYSTEM
- * tables
+ * {@link RpcController} that sets the appropriate priority of server-server RPC calls destined
+ * for Phoenix SYSTEM tables.
  */
-class MetadataRpcController extends DelegatingHBaseRpcController {
+public class ServerToServerRpcControllerImpl extends ServerRpcController implements
+        ServerToServerRpcController {
 
-	private int priority;
-	// list of system tables
-	private static final List<String> SYSTEM_TABLE_NAMES = new ImmutableList.Builder<String>()
-			.add(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME)
-			.add(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)
-			.add(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME)
-			.add(PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME)
+    private int priority;
+    // list of system tables that can possibly have server-server rpc's
+    private static final List<String> SYSTEM_TABLE_NAMES = new ImmutableList.Builder<String>()
+            .add(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME)
+            .add(PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME)
+            .add(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME)
             .add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, true)
                     .getNameAsString())
-            .add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES, true)
-                    .getNameAsString())
-            .add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES, true)
+            .add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES, true)
                     .getNameAsString())
-            .add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES, true)
+            .add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME_BYTES, true)
                     .getNameAsString())
             .build();
 
-	public MetadataRpcController(HBaseRpcController delegate,
-			Configuration conf) {
-		super(delegate);
-		this.priority = PhoenixRpcSchedulerFactory.getMetadataPriority(conf);
-	}
+    public ServerToServerRpcControllerImpl(
+            Configuration conf) {
+        super();
+        this.priority = PhoenixRpcSchedulerFactory.getServerSidePriority(conf);
+    }
+
+    @Override
+    public void setPriority(final TableName tn) {
+        if (SYSTEM_TABLE_NAMES.contains(tn.getNameAsString())) {
+            setPriority(this.priority);
+        }
+    }
+
+
+    @Override public void setPriority(int priority) {
+        this.priority = priority;
+    }
 
-	@Override
-	public void setPriority(final TableName tn) {
-		if (SYSTEM_TABLE_NAMES.contains(tn.getNameAsString())) {
-			setPriority(this.priority);
-		} else {
-			super.setPriority(tn);
-		}
-	}
 
+    @Override public int getPriority() {
+        return this.priority;
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ChildLinkMetaDataServiceCallBack.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ChildLinkMetaDataServiceCallBack.java
index 73373ef256..ab16a02c63 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ChildLinkMetaDataServiceCallBack.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ChildLinkMetaDataServiceCallBack.java
@@ -18,9 +18,11 @@
 
 package org.apache.phoenix.query;
 
+import com.google.protobuf.RpcController;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.phoenix.coprocessor.generated.ChildLinkMetaDataProtos
@@ -41,15 +43,16 @@ class ChildLinkMetaDataServiceCallBack
     implements Batch.Call<ChildLinkMetaDataService, MetaDataResponse> {
 
     private final List<Mutation> childLinkMutations;
+    private final RpcController controller;
 
-    public ChildLinkMetaDataServiceCallBack(List<Mutation> childLinkMutations) {
+    public ChildLinkMetaDataServiceCallBack(RpcController controller, List<Mutation> childLinkMutations) {
+        this.controller = controller;
         this.childLinkMutations = childLinkMutations;
     }
 
     @Override
     public MetaDataResponse call(ChildLinkMetaDataService instance)
             throws IOException {
-        ServerRpcController controller = new ServerRpcController();
         BlockingRpcCallback<MetaDataResponse> rpcCallback =
             new BlockingRpcCallback<>();
         CreateViewAddChildLinkRequest.Builder builder =
@@ -60,9 +63,22 @@ class ChildLinkMetaDataServiceCallBack
         }
         CreateViewAddChildLinkRequest build = builder.build();
         instance.createViewAddChildLink(controller, build, rpcCallback);
-        if (controller.getFailedOn() != null) {
-            throw controller.getFailedOn();
-        }
+        checkForRemoteExceptions(controller);
         return rpcCallback.get();
     }
+
+    private void checkForRemoteExceptions(RpcController controller) throws IOException {
+        if (controller != null) {
+            if (controller instanceof ServerRpcController) {
+                if (((ServerRpcController)controller).getFailedOn() != null) {
+                    throw ((ServerRpcController)controller).getFailedOn();
+                }
+            } else {
+                if (((HBaseRpcController)controller).getFailed() != null) {
+                    throw ((HBaseRpcController)controller).getFailed();
+                }
+            }
+        }
+    }
+
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 4fc2f1ede3..1a5cb1cf52 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -49,6 +49,8 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAM
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_HBASE_TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_MUTEX_TABLE_NAME;
@@ -63,6 +65,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_FOR_MUTEX;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_SERVICES_COUNTER;
@@ -124,6 +127,7 @@ import java.util.regex.Pattern;
 
 import javax.annotation.concurrent.GuardedBy;
 
+import com.google.protobuf.RpcController;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
@@ -152,8 +156,11 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.ipc.controller.ServerToServerRpcController;
+import org.apache.hadoop.hbase.ipc.controller.ServerSideRPCControllerFactory;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.regionserver.IndexHalfStoreFileReaderGenerator;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
@@ -382,6 +389,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     private final int maxInternalConnectionsAllowed;
     private final boolean shouldThrottleNumConnections;
     public static final byte[] MUTEX_LOCKED = "MUTEX_LOCKED".getBytes(StandardCharsets.UTF_8);
+    private ServerSideRPCControllerFactory serverSideRPCControllerFactory;
 
     private static interface FeatureSupported {
         boolean isSupported(ConnectionQueryServices services);
@@ -434,6 +442,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         // Without making a copy of the configuration we cons up, we lose some of our properties
         // on the server side during testing.
         this.config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config);
+        //Set the rpcControllerFactory if it is a server side connnection.
+        boolean isServerSideConnection = config.getBoolean(QueryUtil.IS_SERVER_CONNECTION, false);
+        if (isServerSideConnection) {
+            this.serverSideRPCControllerFactory = new ServerSideRPCControllerFactory(config);
+        }
         // set replication required parameter
         ConfigUtil.setReplicationConfigIfAbsent(this.config);
         this.props = new ReadOnlyProps(this.config.iterator());
@@ -1662,7 +1675,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                     @Override
                                     public GetVersionResponse call(MetaDataService instance)
                                             throws IOException {
-                                        ServerRpcController controller = new ServerRpcController();
+                                        RpcController controller = getController();
                                         BlockingRpcCallback<GetVersionResponse> rpcCallback =
                                                 new BlockingRpcCallback<>();
                                         GetVersionRequest.Builder builder =
@@ -1671,9 +1684,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                                 VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION,
                                                         PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                                         instance.getVersion(controller, builder.build(), rpcCallback);
-                                        if (controller.getFailedOn() != null) {
-                                            throw controller.getFailedOn();
-                                        }
+                                        checkForRemoteExceptions(controller);
                                         return rpcCallback.get();
                                     }
                                 });
@@ -1784,6 +1795,50 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         }
     }
 
+    @VisibleForTesting
+    protected RpcController getController() {
+        return getController(SYSTEM_CATALOG_HBASE_TABLE_NAME);
+    }
+
+        /**
+         * If configured to use the server-server metadata handler pool for server side connections,
+         * use the {@link org.apache.hadoop.hbase.ipc.controller.ServerToServerRpcController}
+         * else use the ordinary handler pool {@link ServerRpcController}
+         *
+         * return the rpcController to use
+         * @return
+         */
+    @VisibleForTesting
+    protected RpcController getController(TableName systemTableName) {
+        if (serverSideRPCControllerFactory != null) {
+            ServerToServerRpcController controller = serverSideRPCControllerFactory.newController();
+            controller.setPriority(systemTableName);
+            return controller;
+        } else {
+            return new ServerRpcController();
+        }
+    }
+
+    /**
+     * helper function to return the exception from the RPC
+     * @param controller
+     * @throws IOException
+     */
+
+    private void checkForRemoteExceptions(RpcController controller) throws IOException {
+        if (controller != null) {
+            if (controller instanceof ServerRpcController) {
+                if (((ServerRpcController)controller).getFailedOn() != null) {
+                    throw ((ServerRpcController)controller).getFailedOn();
+                }
+            } else {
+                if (((HBaseRpcController)controller).getFailed() != null) {
+                    throw ((HBaseRpcController)controller).getFailed();
+                }
+            }
+        }
+    }
+
     /**
      * Invoke meta data coprocessor with one retry if the key was found to not be in the regions
      * (due to a table split)
@@ -1812,8 +1867,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
                 Table ht = this.getTable(SchemaUtil.getPhysicalName(systemTableName, this.getProps()).getName());
                 try {
-                   results =
-                            ht.coprocessorService(MetaDataService.class, tableKey, tableKey, callable);
+                    results = ht.coprocessorService(MetaDataService.class, tableKey, tableKey, callable);
 
                     assert(results.size() == 1);
                     MetaDataResponse result = results.values().iterator().next();
@@ -2003,9 +2057,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             // We invoke this using rowKey available in the first element
             // of childLinkMutations.
             final byte[] rowKey = childLinkMutations.get(0).getRow();
+            final RpcController controller = getController(PhoenixDatabaseMetaData.SYSTEM_LINK_HBASE_TABLE_NAME);
             final MetaDataMutationResult result =
                 childLinkMetaDataCoprocessorExec(rowKey,
-                    new ChildLinkMetaDataServiceCallBack(childLinkMutations));
+                    new ChildLinkMetaDataServiceCallBack(controller, childLinkMutations));
 
             switch (result.getMutationCode()) {
                 case UNABLE_TO_CREATE_CHILD_LINK:
@@ -2025,7 +2080,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 new Batch.Call<MetaDataService, MetaDataResponse>() {
             @Override
             public MetaDataResponse call(MetaDataService instance) throws IOException {
-                ServerRpcController controller = new ServerRpcController();
+                RpcController controller = getController();
                 BlockingRpcCallback<MetaDataResponse> rpcCallback =
                         new BlockingRpcCallback<>();
                 CreateTableRequest.Builder builder = CreateTableRequest.newBuilder();
@@ -2042,9 +2097,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 }
                 CreateTableRequest build = builder.build();
                 instance.createTable(controller, build, rpcCallback);
-                if (controller.getFailedOn() != null) {
-                    throw controller.getFailedOn();
-                }
+                checkForRemoteExceptions(controller);
                 return rpcCallback.get();
             }
         });
@@ -2061,7 +2114,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 new Batch.Call<MetaDataService, MetaDataResponse>() {
             @Override
             public MetaDataResponse call(MetaDataService instance) throws IOException {
-                ServerRpcController controller = new ServerRpcController();
+                RpcController controller = getController();
                 BlockingRpcCallback<MetaDataResponse> rpcCallback =
                         new BlockingRpcCallback<MetaDataResponse>();
                 GetTableRequest.Builder builder = GetTableRequest.newBuilder();
@@ -2072,9 +2125,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 builder.setClientTimestamp(clientTimestamp);
                 builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                 instance.getTable(controller, builder.build(), rpcCallback);
-                if (controller.getFailedOn() != null) {
-                    throw controller.getFailedOn();
-                }
+                checkForRemoteExceptions(controller);
                 return rpcCallback.get();
             }
         });
@@ -2096,7 +2147,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 new Batch.Call<MetaDataService, MetaDataResponse>() {
             @Override
             public MetaDataResponse call(MetaDataService instance) throws IOException {
-                ServerRpcController controller = new ServerRpcController();
+                RpcController controller = getController();
                 BlockingRpcCallback<MetaDataResponse> rpcCallback =
                         new BlockingRpcCallback<MetaDataResponse>();
                 DropTableRequest.Builder builder = DropTableRequest.newBuilder();
@@ -2108,9 +2159,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 builder.setCascade(cascade);
                 builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                 instance.dropTable(controller, builder.build(), rpcCallback);
-                if (controller.getFailedOn() != null) {
-                    throw controller.getFailedOn();
-                }
+                checkForRemoteExceptions(controller);
                 return rpcCallback.get();
             }
         });
@@ -2183,7 +2232,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 new Batch.Call<MetaDataService, MetaDataResponse>() {
             @Override
             public MetaDataResponse call(MetaDataService instance) throws IOException {
-                ServerRpcController controller = new ServerRpcController();
+                RpcController controller = getController(SYSTEM_FUNCTION_HBASE_TABLE_NAME);
                 BlockingRpcCallback<MetaDataResponse> rpcCallback =
                         new BlockingRpcCallback<MetaDataResponse>();
                 DropFunctionRequest.Builder builder = DropFunctionRequest.newBuilder();
@@ -2194,12 +2243,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 builder.setIfExists(ifExists);
                 builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                 instance.dropFunction(controller, builder.build(), rpcCallback);
-                if (controller.getFailedOn() != null) {
-                    throw controller.getFailedOn();
-                }
+                checkForRemoteExceptions(controller);
                 return rpcCallback.get();
             }
-        }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES);
+        }, SYSTEM_FUNCTION_NAME_BYTES);
         return result;
     }
 
@@ -2404,7 +2451,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     new Batch.Call<MetaDataService, MetaDataResponse>() {
                 @Override
                 public MetaDataResponse call(MetaDataService instance) throws IOException {
-                    ServerRpcController controller = new ServerRpcController();
+                    RpcController controller = getController();
                     BlockingRpcCallback<MetaDataResponse> rpcCallback =
                             new BlockingRpcCallback<MetaDataResponse>();
                     AddColumnRequest.Builder builder = AddColumnRequest.newBuilder();
@@ -2420,9 +2467,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     }
                     builder.setAddingColumns(addingColumns);
                     instance.addColumn(controller, builder.build(), rpcCallback);
-                    if (controller.getFailedOn() != null) {
-                        throw controller.getFailedOn();
-                    }
+                    checkForRemoteExceptions(controller);
                     return rpcCallback.get();
                 }
             });
@@ -3122,7 +3167,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 new Batch.Call<MetaDataService, MetaDataResponse>() {
             @Override
             public MetaDataResponse call(MetaDataService instance) throws IOException {
-                ServerRpcController controller = new ServerRpcController();
+                RpcController controller = getController();
                 BlockingRpcCallback<MetaDataResponse> rpcCallback =
                         new BlockingRpcCallback<MetaDataResponse>();
                 DropColumnRequest.Builder builder = DropColumnRequest.newBuilder();
@@ -3134,9 +3179,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 if (parentTable!=null)
                     builder.setParentTable(PTableImpl.toProto(parentTable));
                 instance.dropColumn(controller, builder.build(), rpcCallback);
-                if (controller.getFailedOn() != null) {
-                    throw controller.getFailedOn();
-                }
+                checkForRemoteExceptions(controller);
                 return rpcCallback.get();
             }
         });
@@ -4937,12 +4980,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                         this.getProps()).getName())) {
             try {
                 startTime = EnvironmentEdgeManager.currentTimeMillis();
-                results =
-                        htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW,
+                results = htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW,
                                 HConstants.EMPTY_END_ROW, new Batch.Call<MetaDataService, Long>() {
                                     @Override
                                     public Long call(MetaDataService instance) throws IOException {
-                                        ServerRpcController controller = new ServerRpcController();
+                                        RpcController controller = getController();
                                         BlockingRpcCallback<ClearCacheResponse> rpcCallback =
                                                 new BlockingRpcCallback<ClearCacheResponse>();
                                         ClearCacheRequest.Builder builder = ClearCacheRequest.newBuilder();
@@ -4950,13 +4992,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                                 VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION,
                                                         PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                                         instance.clearCache(controller, builder.build(), rpcCallback);
-                                        if (controller.getFailedOn() != null) {
-                                            throw controller.getFailedOn();
-                                        }
+                                        checkForRemoteExceptions(controller);
                                         return rpcCallback.get().getUnfreedBytes();
                                     }
                                 });
-                TableMetricsManager.updateMetricsForSystemCatalogTableMethod(null, NUM_SYSTEM_TABLE_RPC_SUCCESS, 1);
+                TableMetricsManager.updateMetricsForSystemCatalogTableMethod(null,NUM_SYSTEM_TABLE_RPC_SUCCESS, 1);
             } catch(Throwable e) {
                 TableMetricsManager.updateMetricsForSystemCatalogTableMethod(null, NUM_SYSTEM_TABLE_RPC_FAILURES, 1);
                 throw ServerUtil.parseServerException(e);
@@ -5023,7 +5063,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 new Batch.Call<MetaDataService, MetaDataResponse>() {
             @Override
             public MetaDataResponse call(MetaDataService instance) throws IOException {
-                ServerRpcController controller = new ServerRpcController();
+                RpcController controller = getController();
                 BlockingRpcCallback<MetaDataResponse> rpcCallback =
                         new BlockingRpcCallback<MetaDataResponse>();
                 UpdateIndexStateRequest.Builder builder = UpdateIndexStateRequest.newBuilder();
@@ -5033,9 +5073,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 }
                 builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                 instance.updateIndexState(controller, builder.build(), rpcCallback);
-                if (controller.getFailedOn() != null) {
-                    throw controller.getFailedOn();
-                }
+                checkForRemoteExceptions(controller);
                 return rpcCallback.get();
             }
         });
@@ -5295,7 +5333,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                         new Batch.Call<MetaDataService, ClearTableFromCacheResponse>() {
                     @Override
                     public ClearTableFromCacheResponse call(MetaDataService instance) throws IOException {
-                        ServerRpcController controller = new ServerRpcController();
+                        RpcController controller = getController();
                         BlockingRpcCallback<ClearTableFromCacheResponse> rpcCallback = new BlockingRpcCallback<ClearTableFromCacheResponse>();
                         ClearTableFromCacheRequest.Builder builder = ClearTableFromCacheRequest.newBuilder();
                         builder.setTenantId(ByteStringer.wrap(tenantId));
@@ -5304,7 +5342,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                         builder.setClientTimestamp(clientTS);
                         builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                         instance.clearTableFromCache(controller, builder.build(), rpcCallback);
-                        if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
+                        checkForRemoteExceptions(controller);
                         return rpcCallback.get();
                     }
                 });
@@ -5615,7 +5653,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 new Batch.Call<MetaDataService, MetaDataResponse>() {
             @Override
             public MetaDataResponse call(MetaDataService instance) throws IOException {
-                ServerRpcController controller = new ServerRpcController();
+                RpcController controller = getController(SYSTEM_FUNCTION_HBASE_TABLE_NAME);
                 BlockingRpcCallback<MetaDataResponse> rpcCallback =
                         new BlockingRpcCallback<MetaDataResponse>();
                 GetFunctionsRequest.Builder builder = GetFunctionsRequest.newBuilder();
@@ -5627,12 +5665,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 builder.setClientTimestamp(clientTimestamp);
                 builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                 instance.getFunctions(controller, builder.build(), rpcCallback);
-                if (controller.getFailedOn() != null) {
-                    throw controller.getFailedOn();
-                }
+                checkForRemoteExceptions(controller);
                 return rpcCallback.get();
             }
-        }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES);
+        }, SYSTEM_FUNCTION_NAME_BYTES);
 
     }
 
@@ -5642,7 +5678,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 new Batch.Call<MetaDataService, MetaDataResponse>() {
             @Override
             public MetaDataResponse call(MetaDataService instance) throws IOException {
-                ServerRpcController controller = new ServerRpcController();
+                RpcController controller = getController();
                 BlockingRpcCallback<MetaDataResponse> rpcCallback = new BlockingRpcCallback<MetaDataResponse>();
                 GetSchemaRequest.Builder builder = GetSchemaRequest.newBuilder();
                 builder.setSchemaName(schemaName);
@@ -5650,7 +5686,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION,
                         PHOENIX_PATCH_NUMBER));
                 instance.getSchema(controller, builder.build(), rpcCallback);
-                if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
+                checkForRemoteExceptions(controller);
                 return rpcCallback.get();
             }
         });
@@ -5672,7 +5708,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 new Batch.Call<MetaDataService, MetaDataResponse>() {
             @Override
             public MetaDataResponse call(MetaDataService instance) throws IOException {
-                ServerRpcController controller = new ServerRpcController();
+                RpcController controller = getController(SYSTEM_FUNCTION_HBASE_TABLE_NAME);
                 BlockingRpcCallback<MetaDataResponse> rpcCallback =
                         new BlockingRpcCallback<MetaDataResponse>();
                 CreateFunctionRequest.Builder builder = CreateFunctionRequest.newBuilder();
@@ -5684,12 +5720,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 builder.setReplace(function.isReplace());
                 builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                 instance.createFunction(controller, builder.build(), rpcCallback);
-                if (controller.getFailedOn() != null) {
-                    throw controller.getFailedOn();
-                }
+                checkForRemoteExceptions(controller);
                 return rpcCallback.get();
             }
-        }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES);
+        }, SYSTEM_FUNCTION_NAME_BYTES);
         return result;
     }
 
@@ -5861,7 +5895,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 new Batch.Call<MetaDataService, MetaDataResponse>() {
             @Override
             public MetaDataResponse call(MetaDataService instance) throws IOException {
-                ServerRpcController controller = new ServerRpcController();
+                RpcController controller = getController();
                 BlockingRpcCallback<MetaDataResponse> rpcCallback = new BlockingRpcCallback<MetaDataResponse>();
                 CreateSchemaRequest.Builder builder = CreateSchemaRequest.newBuilder();
                 for (Mutation m : schemaMutations) {
@@ -5872,7 +5906,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION,
                         PHOENIX_PATCH_NUMBER));
                 instance.createSchema(controller, builder.build(), rpcCallback);
-                if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
+                checkForRemoteExceptions(controller);
                 return rpcCallback.get();
             }
         });
@@ -5896,7 +5930,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 new Batch.Call<MetaDataService, MetaDataResponse>() {
             @Override
             public MetaDataResponse call(MetaDataService instance) throws IOException {
-                ServerRpcController controller = new ServerRpcController();
+                RpcController controller = getController();
                 BlockingRpcCallback<MetaDataResponse> rpcCallback = new BlockingRpcCallback<MetaDataResponse>();
                 DropSchemaRequest.Builder builder = DropSchemaRequest.newBuilder();
                 for (Mutation m : schemaMetaData) {
@@ -5907,7 +5941,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION,
                         PHOENIX_PATCH_NUMBER));
                 instance.dropSchema(controller, builder.build(), rpcCallback);
-                if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
+                checkForRemoteExceptions(controller);
                 return rpcCallback.get();
             }
         });
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index e51c680862..406c4a1a65 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -179,6 +179,7 @@ public interface QueryServices extends SQLCloseable {
             "phoenix.index.failure.handling.rebuild.overlap.forward.time";
     public static final String INDEX_PRIOIRTY_ATTRIB = "phoenix.index.rpc.priority";
     public static final String METADATA_PRIOIRTY_ATTRIB = "phoenix.metadata.rpc.priority";
+    public static final String SERVER_SIDE_PRIOIRTY_ATTRIB = "phoenix.serverside.rpc.priority";
     public static final String ALLOW_LOCAL_INDEX_ATTRIB = "phoenix.index.allowLocalIndex";
 
     // Retries when doing server side writes to SYSTEM.CATALOG
@@ -243,7 +244,8 @@ public interface QueryServices extends SQLCloseable {
     // rpc queue configs
     public static final String INDEX_HANDLER_COUNT_ATTRIB = "phoenix.rpc.index.handler.count";
     public static final String METADATA_HANDLER_COUNT_ATTRIB = "phoenix.rpc.metadata.handler.count";
-    
+    public static final String SERVER_SIDE_HANDLER_COUNT_ATTRIB = "phoenix.rpc.serverside.handler.count";
+
     public static final String FORCE_ROW_KEY_ORDER_ATTRIB = "phoenix.query.force.rowkeyorder";
     public static final String ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB = "phoenix.functions.allowUserDefinedFunctions";
     public static final String COLLECT_REQUEST_LEVEL_METRICS = "phoenix.query.request.metrics.enabled";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 6c8ada9e6e..849f023be6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -224,11 +224,13 @@ public class QueryServicesOptions {
      * HConstants#HIGH_QOS is the max we will see to a standard table. We go higher to differentiate
      * and give some room for things in the middle
      */
+    public static final int DEFAULT_SERVER_SIDE_PRIORITY = 500;
     public static final int DEFAULT_INDEX_PRIORITY = 1000;
     public static final int DEFAULT_METADATA_PRIORITY = 2000;
     public static final boolean DEFAULT_ALLOW_LOCAL_INDEX = true;
     public static final int DEFAULT_INDEX_HANDLER_COUNT = 30;
     public static final int DEFAULT_METADATA_HANDLER_COUNT = 30;
+    public static final int DEFAULT_SERVERSIDE_HANDLER_COUNT = 30;
     public static final int DEFAULT_SYSTEM_MAX_VERSIONS = 1;
     public static final boolean DEFAULT_SYSTEM_KEEP_DELETED_CELLS = false;
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index 5c7816ef3f..b409f056fd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -420,7 +420,8 @@ public final class QueryUtil {
     private static Connection getConnection(Properties props, Configuration conf)
             throws SQLException {
         String url = getConnectionUrl(props, conf);
-        LOGGER.info("Creating connection with the jdbc url: " + url);
+        LOGGER.info(String.format("Creating connection with the jdbc url: %s, isServerSide = %s",
+                url, props.getProperty(IS_SERVER_CONNECTION)));
         props = PropertiesUtil.combineProperties(props, conf);
         return DriverManager.getConnection(url, props);
     }
diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
index 031c679559..15b83e41b5 100644
--- a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
+++ b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
@@ -67,7 +67,7 @@ public class PhoenixIndexRpcSchedulerTest {
         RpcScheduler mock = Mockito.mock(RpcScheduler.class);
         PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class);
         Abortable abortable = new AbortServer();
-        PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250,qosFunction,abortable);
+        PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250, 225, qosFunction,abortable);
         BalancedQueueRpcExecutor executor = new BalancedQueueRpcExecutor("test-queue", 1, 1,qosFunction,conf,abortable);
         scheduler.setIndexExecutorForTesting(executor);
         dispatchCallWithPriority(scheduler, 200);
@@ -77,7 +77,7 @@ public class PhoenixIndexRpcSchedulerTest {
         queue.poll(20, TimeUnit.SECONDS);
 
         // try again, this time we tweak the ranges we support
-        scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110,qosFunction,abortable);
+        scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110, 105, qosFunction,abortable);
         scheduler.setIndexExecutorForTesting(executor);
         dispatchCallWithPriority(scheduler, 101);
         queue.poll(20, TimeUnit.SECONDS);
@@ -87,6 +87,49 @@ public class PhoenixIndexRpcSchedulerTest {
         executor.stop();
     }
 
+    @Test
+    public void testServerSideRPCalls() throws Exception {
+        RpcScheduler mock = Mockito.mock(RpcScheduler.class);
+        PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class);
+        Abortable abortable = new AbortServer();
+        PhoenixRpcScheduler scheduler1 = new PhoenixRpcScheduler(conf, mock, 200, 250, 100, qosFunction,abortable);
+        RpcExecutor executor1  = scheduler1.getServerSideExecutorForTesting();
+        for (int c = 0; c < 10; c++) {
+            dispatchCallWithPriority(scheduler1, 100);
+        }
+        List<BlockingQueue<CallRunner>> queues1 = executor1.getQueues();
+        int numDispatches1 = 0;
+        for (BlockingQueue<CallRunner> queue1 : queues1) {
+            if (queue1.size() > 0) {
+                numDispatches1 += queue1.size();
+                for (int i = 0; i < queue1.size(); i++) {
+                    queue1.poll(20, TimeUnit.SECONDS);
+                }
+            }
+        }
+        assertEquals(10, numDispatches1);
+        scheduler1.stop();
+
+        // try again, with the incorrect executor
+        PhoenixRpcScheduler scheduler2 = new PhoenixRpcScheduler(conf, mock, 101, 110, 50, qosFunction,abortable);
+        RpcExecutor executor2  = scheduler2.getIndexExecutorForTesting();
+        dispatchCallWithPriority(scheduler2, 50);
+        List<BlockingQueue<CallRunner>> queues2 = executor2.getQueues();
+        int numDispatches2 = 0;
+        for (BlockingQueue<CallRunner> queue2 : queues2) {
+            if (queue2.size() > 0) {
+                numDispatches2++;
+                queue2.poll(20, TimeUnit.SECONDS);
+            }
+        }
+        assertEquals(0, numDispatches2);
+        scheduler2.stop();
+
+        Mockito.verify(mock, Mockito.times(numDispatches1+1)).init(Mockito.any(Context.class));
+        //Verify no dispatches to the default delegate handler
+        Mockito.verify(mock, Mockito.times(0)).dispatch(Mockito.any(CallRunner.class));
+    }
+
     /**
      * Test that we delegate to the passed {@link RpcScheduler} when the call priority is outside
      * the index range
@@ -97,12 +140,12 @@ public class PhoenixIndexRpcSchedulerTest {
         PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class);
         Abortable abortable = new AbortServer();
         RpcScheduler mock = Mockito.mock(RpcScheduler.class);
-        PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250,qosFunction,abortable);
+        PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250, 225, qosFunction,abortable);
         dispatchCallWithPriority(scheduler, 100);
         dispatchCallWithPriority(scheduler, 251);
 
         // try again, this time we tweak the ranges we support
-        scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110,qosFunction,abortable);
+        scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110, 105, qosFunction,abortable);
         dispatchCallWithPriority(scheduler, 200);
         dispatchCallWithPriority(scheduler, 111);