You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/04/16 16:31:47 UTC

[06/50] [abbrv] phoenix git commit: PHOENIX-1457 Use high priority queue for metadata endpoint calls

PHOENIX-1457 Use high priority queue for metadata endpoint calls


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a7d7dfb5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a7d7dfb5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a7d7dfb5

Branch: refs/heads/calcite
Commit: a7d7dfb52622a586482030ae6904d0c53ed7a4af
Parents: b256fde
Author: Thomas D'Silva <tw...@gmail.com>
Authored: Tue Mar 24 17:17:44 2015 -0700
Committer: Thomas <td...@salesforce.com>
Committed: Fri Mar 27 11:45:41 2015 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/index/IndexHandlerIT.java   |  12 +-
 .../phoenix/end2end/index/IndexQosIT.java       | 243 -------------------
 .../apache/phoenix/rpc/PhoenixClientRpcIT.java  | 122 ++++++++++
 .../apache/phoenix/rpc/PhoenixServerRpcIT.java  | 235 ++++++++++++++++++
 .../TestPhoenixIndexRpcSchedulerFactory.java    |  64 +++++
 .../hbase/ipc/PhoenixIndexRpcScheduler.java     | 123 ----------
 .../hadoop/hbase/ipc/PhoenixRpcScheduler.java   | 123 ++++++++++
 .../hbase/ipc/PhoenixRpcSchedulerFactory.java   |  95 ++++++++
 .../controller/ClientRpcControllerFactory.java  |  60 +++++
 .../ipc/controller/IndexRpcController.java      |  51 ++++
 .../ipc/controller/MetadataRpcController.java   |  55 +++++
 .../controller/ServerRpcControllerFactory.java  |  62 +++++
 .../index/IndexQosRpcControllerFactory.java     |  82 -------
 .../ipc/PhoenixIndexRpcSchedulerFactory.java    |  90 -------
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |   4 -
 .../org/apache/phoenix/query/QueryServices.java |   5 +-
 .../phoenix/query/QueryServicesOptions.java     |  12 +-
 .../org/apache/phoenix/util/SchemaUtil.java     |   7 -
 .../hbase/ipc/PhoenixIndexRpcSchedulerTest.java |  16 +-
 .../PhoenixIndexRpcSchedulerFactoryTest.java    | 106 --------
 .../PhoenixRpcSchedulerFactoryTest.java         | 125 ++++++++++
 .../java/org/apache/phoenix/query/BaseTest.java |  12 +-
 22 files changed, 1023 insertions(+), 681 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java
index 1507d6b..20a780a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java
@@ -35,8 +35,8 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.hbase.index.IndexQosRpcControllerFactory;
 import org.apache.phoenix.hbase.index.TableName;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.junit.After;
@@ -53,11 +53,11 @@ public class IndexHandlerIT {
 
     public static class CountingIndexClientRpcFactory extends RpcControllerFactory {
 
-        private IndexQosRpcControllerFactory delegate;
+        private ServerRpcControllerFactory delegate;
 
         public CountingIndexClientRpcFactory(Configuration conf) {
             super(conf);
-            this.delegate = new IndexQosRpcControllerFactory(conf);
+            this.delegate = new ServerRpcControllerFactory(conf);
         }
 
         @Override
@@ -146,8 +146,8 @@ public class IndexHandlerIT {
         conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
             CountingIndexClientRpcFactory.class.getName());
         // and set the index table as the current table
-        conf.setStrings(IndexQosRpcControllerFactory.INDEX_TABLE_NAMES_KEY,
-            TestTable.getTableNameString());
+//        conf.setStrings(PhoenixRpcControllerFactory.INDEX_TABLE_NAMES_KEY,
+//            TestTable.getTableNameString());
         HTable table = new HTable(conf, TestTable.getTableName());
 
         // do a write to the table
@@ -159,7 +159,7 @@ public class IndexHandlerIT {
         // check the counts on the rpc controller
         assertEquals("Didn't get the expected number of index priority writes!", 1,
             (int) CountingIndexClientRpcController.priorityCounts
-                    .get(QueryServicesOptions.DEFAULT_INDEX_MIN_PRIORITY));
+                    .get(QueryServicesOptions.DEFAULT_INDEX_PRIORITY));
 
         table.close();
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexQosIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexQosIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexQosIT.java
deleted file mode 100644
index bab8f38..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexQosIT.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
- * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
- * governing permissions and limitations under the License.
- */
-package org.apache.phoenix.end2end.index;
-
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
-import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
-import static org.apache.phoenix.util.TestUtil.LOCALHOST;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.ipc.BalancedQueueRpcExecutor;
-import org.apache.hadoop.hbase.ipc.CallRunner;
-import org.apache.hadoop.hbase.ipc.PhoenixIndexRpcScheduler;
-import org.apache.hadoop.hbase.ipc.PriorityFunction;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.ipc.RpcExecutor;
-import org.apache.hadoop.hbase.ipc.RpcScheduler;
-import org.apache.hadoop.hbase.master.AssignmentManager;
-import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.RSRpcServices;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.hbase.index.IndexQosRpcControllerFactory;
-import org.apache.phoenix.hbase.index.ipc.PhoenixIndexRpcSchedulerFactory;
-import org.apache.phoenix.jdbc.PhoenixTestDriver;
-import org.apache.phoenix.query.BaseTest;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.QueryUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.SchemaUtil;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
-
-
-@Category(NeedsOwnMiniClusterTest.class)
-public class IndexQosIT extends BaseTest {
-
-    private static final String SCHEMA_NAME = "S";
-    private static final String INDEX_TABLE_NAME = "I";
-    private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "T");
-    private static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "I");
-    private static final int NUM_SLAVES = 2;
-
-    private static String url;
-    private static PhoenixTestDriver driver;
-    private HBaseTestingUtility util;
-    private HBaseAdmin admin;
-    private Configuration conf;
-    private static RpcExecutor spyRpcExecutor = Mockito.spy(new BalancedQueueRpcExecutor("test-queue", 30, 1, 300));
-
-    /**
-     * Factory that uses a spyed RpcExecutor
-     */
-    public static class TestPhoenixIndexRpcSchedulerFactory extends PhoenixIndexRpcSchedulerFactory {
-        @Override
-        public RpcScheduler create(Configuration conf, PriorityFunction priorityFunction, Abortable abortable) {
-            PhoenixIndexRpcScheduler phoenixIndexRpcScheduler = (PhoenixIndexRpcScheduler)super.create(conf, priorityFunction, abortable);
-            phoenixIndexRpcScheduler.setExecutorForTesting(spyRpcExecutor);
-            return phoenixIndexRpcScheduler;
-        }
-    }
-
-    @Before
-    public void doSetup() throws Exception {
-        conf = HBaseConfiguration.create();
-        setUpConfigForMiniCluster(conf);
-        conf.set(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
-                TestPhoenixIndexRpcSchedulerFactory.class.getName());
-        conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, IndexQosRpcControllerFactory.class.getName());
-        util = new HBaseTestingUtility(conf);
-        // start cluster with 2 region servers
-        util.startMiniCluster(NUM_SLAVES);
-        admin = util.getHBaseAdmin();
-        String clientPort = util.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
-        url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort
-                + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
-        driver = initAndRegisterDriver(url, ReadOnlyProps.EMPTY_PROPS);
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        try {
-            destroyDriver(driver);
-            if (admin!=null) {
-            	admin.close();
-            }
-        } finally {
-            util.shutdownMiniCluster();
-        }
-    }
-    
-    @Test
-    public void testIndexWriteQos() throws Exception { 
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = driver.connect(url, props);
-
-        // create the table 
-        conn.createStatement().execute(
-                "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-
-        // create the index 
-        conn.createStatement().execute(
-                "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
-
-        byte[] dataTableName = Bytes.toBytes(DATA_TABLE_FULL_NAME);
-        byte[] indexTableName = Bytes.toBytes(INDEX_TABLE_FULL_NAME);
-        MiniHBaseCluster cluster = util.getHBaseCluster();
-        HMaster master = cluster.getMaster();
-        AssignmentManager am = master.getAssignmentManager();
-
-        // verify there is only a single region for data table
-        List<HRegionInfo> tableRegions = admin.getTableRegions(dataTableName);
-        assertEquals("Expected single region for " + dataTableName, tableRegions.size(), 1);
-        HRegionInfo dataHri = tableRegions.get(0);
-
-        // verify there is only a single region for index table
-        tableRegions = admin.getTableRegions(indexTableName);
-        HRegionInfo indexHri = tableRegions.get(0);
-        assertEquals("Expected single region for " + indexTableName, tableRegions.size(), 1);
-
-        ServerName dataServerName = am.getRegionStates().getRegionServerOfRegion(dataHri);
-        ServerName indexServerName = am.getRegionStates().getRegionServerOfRegion(indexHri);
-
-        // if data table and index table are on same region server, move the index table to the other region server
-        if (dataServerName.equals(indexServerName)) {
-            HRegionServer server1 = util.getHBaseCluster().getRegionServer(0);
-            HRegionServer server2 = util.getHBaseCluster().getRegionServer(1);
-            HRegionServer dstServer = null;
-            HRegionServer srcServer = null;
-            if (server1.getServerName().equals(indexServerName)) {
-                dstServer = server2;
-                srcServer = server1;
-            } else {
-                dstServer = server1;
-                srcServer = server2;
-            }
-            byte[] encodedRegionNameInBytes = indexHri.getEncodedNameAsBytes();
-            admin.move(encodedRegionNameInBytes, Bytes.toBytes(dstServer.getServerName().getServerName()));
-            while (dstServer.getOnlineRegion(indexHri.getRegionName()) == null
-                    || dstServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameInBytes)
-                    || srcServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameInBytes)
-                    || master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
-                // wait for the move to be finished
-                Thread.sleep(1);
-            }
-        }
-
-        dataHri = admin.getTableRegions(dataTableName).get(0);
-        dataServerName = am.getRegionStates().getRegionServerOfRegion(dataHri);
-        indexHri = admin.getTableRegions(indexTableName).get(0);
-        indexServerName = am.getRegionStates().getRegionServerOfRegion(indexHri);
-
-        // verify index and data tables are on different servers
-        assertNotEquals("Index and Data table should be on different region servers dataServer " + dataServerName
-                + " indexServer " + indexServerName, dataServerName, indexServerName);
-
-        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
-        stmt.setString(1, "k1");
-        stmt.setString(2, "v1");
-        stmt.setString(3, "v2");
-        stmt.execute();
-        conn.commit();
-
-        // run select query that should use the index
-        String selectSql = "SELECT k, v2 from " + DATA_TABLE_FULL_NAME + " WHERE v1=?";
-        stmt = conn.prepareStatement(selectSql);
-        stmt.setString(1, "v1");
-
-        // verify that the query does a range scan on the index table
-        ResultSet rs = stmt.executeQuery("EXPLAIN " + selectSql);
-        assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER S.I ['v1']", QueryUtil.getExplainPlan(rs));
-
-        // verify that the correct results are returned
-        rs = stmt.executeQuery();
-        assertTrue(rs.next());
-        assertEquals("k1", rs.getString(1));
-        assertEquals("v2", rs.getString(2));
-        assertFalse(rs.next());
-        
-        // drop index table 
-        conn.createStatement().execute(
-                "DROP INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME );
-        // create a data table with the same name as the index table 
-        conn.createStatement().execute(
-                "CREATE TABLE " + INDEX_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-        
-        // upsert one row to the table (which has the same table name as the previous index table)
-        stmt = conn.prepareStatement("UPSERT INTO " + INDEX_TABLE_FULL_NAME + " VALUES(?,?,?)");
-        stmt.setString(1, "k1");
-        stmt.setString(2, "v1");
-        stmt.setString(3, "v2");
-        stmt.execute();
-        conn.commit();
-        
-        // run select query on the new table
-        selectSql = "SELECT k, v2 from " + INDEX_TABLE_FULL_NAME + " WHERE v1=?";
-        stmt = conn.prepareStatement(selectSql);
-        stmt.setString(1, "v1");
-
-        // verify that the correct results are returned
-        rs = stmt.executeQuery();
-        assertTrue(rs.next());
-        assertEquals("k1", rs.getString(1));
-        assertEquals("v2", rs.getString(2));
-        assertFalse(rs.next());
-        
-        // verify that that index queue is used only once (for the first upsert)
-        Mockito.verify(spyRpcExecutor).dispatch(Mockito.any(CallRunner.class));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixClientRpcIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixClientRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixClientRpcIT.java
new file mode 100644
index 0000000..c079a30
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixClientRpcIT.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.rpc;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.ipc.CallRunner;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory;
+import org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Maps;
+
+public class PhoenixClientRpcIT extends BaseOwnClusterHBaseManagedTimeIT {
+
+    private static final String SCHEMA_NAME = "S";
+    private static final String INDEX_TABLE_NAME = "I";
+    private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "T");
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2);
+        serverProps.put(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
+                TestPhoenixIndexRpcSchedulerFactory.class.getName());
+        serverProps.put(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, ServerRpcControllerFactory.class.getName());
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+        clientProps.put(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, ClientRpcControllerFactory.class.getName());
+        NUM_SLAVES_BASE = 2;
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet()
+                .iterator()));
+    }
+    
+    @AfterClass
+    public static void doTeardown() throws Exception {
+        TestPhoenixIndexRpcSchedulerFactory.reset();
+    }
+
+    @Test
+    public void testIndexQos() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = driver.connect(getUrl(), props);
+        try {
+            // create the table
+            conn.createStatement().execute(
+                    "CREATE TABLE " + DATA_TABLE_FULL_NAME
+                            + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) IMMUTABLE_ROWS=true");
+
+            // create the index
+            conn.createStatement().execute(
+                    "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
+            stmt.setString(1, "k1");
+            stmt.setString(2, "v1");
+            stmt.setString(3, "v2");
+            stmt.execute();
+            conn.commit();
+
+            // run select query that should use the index
+            String selectSql = "SELECT k, v2 from " + DATA_TABLE_FULL_NAME + " WHERE v1=?";
+            stmt = conn.prepareStatement(selectSql);
+            stmt.setString(1, "v1");
+
+            // verify that the query does a range scan on the index table
+            ResultSet rs = stmt.executeQuery("EXPLAIN " + selectSql);
+            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER S.I ['v1']", QueryUtil.getExplainPlan(rs));
+
+            // verify that the correct results are returned
+            rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals("k1", rs.getString(1));
+            assertEquals("v2", rs.getString(2));
+            assertFalse(rs.next());
+
+            // verify that index queue is not used (since the index writes originate from a client an not a region server)
+            Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor(), Mockito.never()).dispatch(Mockito.any(CallRunner.class));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testMetadataQos() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = driver.connect(getUrl(), props);
+        try {
+            // create the table
+            conn.createStatement().execute("CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v VARCHAR)");
+            // verify that that metadata queue is used at least once
+            Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getMetadataRpcExecutor(), Mockito.atLeastOnce()).dispatch(Mockito.any(CallRunner.class));
+        } finally {
+            conn.close();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
new file mode 100644
index 0000000..de0ab84
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.rpc;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.ipc.CallRunner;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Maps;
+
+public class PhoenixServerRpcIT extends BaseOwnClusterHBaseManagedTimeIT {
+
+    private static final String SCHEMA_NAME = "S";
+    private static final String INDEX_TABLE_NAME = "I";
+    private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "T");
+    private static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "I");
+    
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2);
+        serverProps.put(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
+                TestPhoenixIndexRpcSchedulerFactory.class.getName());
+        serverProps.put(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, ServerRpcControllerFactory.class.getName());
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+        clientProps.put(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, RpcControllerFactory.class.getName());
+        NUM_SLAVES_BASE = 2;
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+    }
+    
+    @AfterClass
+    public static void doTeardown() throws Exception {
+        TestPhoenixIndexRpcSchedulerFactory.reset();
+    }
+    
+    @Test
+    public void testIndexQos() throws Exception { 
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = driver.connect(getUrl(), props);
+        try {
+            // create the table 
+            conn.createStatement().execute(
+                    "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+    
+            // create the index 
+            conn.createStatement().execute(
+                    "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
+
+            ensureTablesOnDifferentRegionServers(DATA_TABLE_FULL_NAME, INDEX_TABLE_FULL_NAME);
+    
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
+            stmt.setString(1, "k1");
+            stmt.setString(2, "v1");
+            stmt.setString(3, "v2");
+            stmt.execute();
+            conn.commit();
+    
+            // run select query that should use the index
+            String selectSql = "SELECT k, v2 from " + DATA_TABLE_FULL_NAME + " WHERE v1=?";
+            stmt = conn.prepareStatement(selectSql);
+            stmt.setString(1, "v1");
+    
+            // verify that the query does a range scan on the index table
+            ResultSet rs = stmt.executeQuery("EXPLAIN " + selectSql);
+            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER S.I ['v1']", QueryUtil.getExplainPlan(rs));
+    
+            // verify that the correct results are returned
+            rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals("k1", rs.getString(1));
+            assertEquals("v2", rs.getString(2));
+            assertFalse(rs.next());
+            
+            // drop index table 
+            conn.createStatement().execute(
+                    "DROP INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME );
+            // create a data table with the same name as the index table 
+            conn.createStatement().execute(
+                    "CREATE TABLE " + INDEX_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+            
+            // upsert one row to the table (which has the same table name as the previous index table)
+            stmt = conn.prepareStatement("UPSERT INTO " + INDEX_TABLE_FULL_NAME + " VALUES(?,?,?)");
+            stmt.setString(1, "k1");
+            stmt.setString(2, "v1");
+            stmt.setString(3, "v2");
+            stmt.execute();
+            conn.commit();
+            
+            // run select query on the new table
+            selectSql = "SELECT k, v2 from " + INDEX_TABLE_FULL_NAME + " WHERE v1=?";
+            stmt = conn.prepareStatement(selectSql);
+            stmt.setString(1, "v1");
+    
+            // verify that the correct results are returned
+            rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals("k1", rs.getString(1));
+            assertEquals("v2", rs.getString(2));
+            assertFalse(rs.next());
+            
+            // verify that that index queue is used only once (for the first upsert)
+            Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class));
+        }
+        finally {
+            conn.close();
+        }
+    }
+
+	/**
+	 * Verifies that the given tables each have a single region and are on
+	 * different region servers. If they are on the same server moves tableName2
+	 * to the other region server.
+	 */
+	private void ensureTablesOnDifferentRegionServers(String tableName1, String tableName2) throws Exception  {
+		byte[] table1 = Bytes.toBytes(tableName1);
+		byte[] table2 = Bytes.toBytes(tableName2);
+		HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TEST_PROPERTIES).getAdmin();
+		HBaseTestingUtility util = getUtility();
+		MiniHBaseCluster cluster = util.getHBaseCluster();
+		HMaster master = cluster.getMaster();
+		AssignmentManager am = master.getAssignmentManager();
+   
+		// verify there is only a single region for data table
+		List<HRegionInfo> tableRegions = admin.getTableRegions(table1);
+		assertEquals("Expected single region for " + table1, tableRegions.size(), 1);
+		HRegionInfo hri1 = tableRegions.get(0);
+   
+		// verify there is only a single region for index table
+		tableRegions = admin.getTableRegions(table2);
+		HRegionInfo hri2 = tableRegions.get(0);
+		assertEquals("Expected single region for " + table2, tableRegions.size(), 1);
+   
+		ServerName serverName1 = am.getRegionStates().getRegionServerOfRegion(hri1);
+		ServerName serverName2 = am.getRegionStates().getRegionServerOfRegion(hri2);
+   
+		// if data table and index table are on same region server, move the index table to the other region server
+		if (serverName1.equals(serverName2)) {
+		    HRegionServer server1 = util.getHBaseCluster().getRegionServer(0);
+		    HRegionServer server2 = util.getHBaseCluster().getRegionServer(1);
+		    HRegionServer dstServer = null;
+		    HRegionServer srcServer = null;
+		    if (server1.getServerName().equals(serverName2)) {
+		        dstServer = server2;
+		        srcServer = server1;
+		    } else {
+		        dstServer = server1;
+		        srcServer = server2;
+		    }
+		    byte[] encodedRegionNameInBytes = hri2.getEncodedNameAsBytes();
+		    admin.move(encodedRegionNameInBytes, Bytes.toBytes(dstServer.getServerName().getServerName()));
+		    while (dstServer.getOnlineRegion(hri2.getRegionName()) == null
+		            || dstServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameInBytes)
+		            || srcServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameInBytes)
+		            || master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
+		        // wait for the move to be finished
+		        Thread.sleep(1);
+		    }
+		}
+   
+		hri1 = admin.getTableRegions(table1).get(0);
+		serverName1 = am.getRegionStates().getRegionServerOfRegion(hri1);
+		hri2 = admin.getTableRegions(table2).get(0);
+		serverName2 = am.getRegionStates().getRegionServerOfRegion(hri2);
+
+		// verify index and data tables are on different servers
+		assertNotEquals("Tables " + tableName1 + " and " + tableName2 + " should be on different region servers", serverName1, serverName2);
+	}
+    
+    @Test
+    public void testMetadataQos() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = driver.connect(getUrl(), props);
+        try {
+        	ensureTablesOnDifferentRegionServers(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
+            // create the table 
+            conn.createStatement().execute(
+                    "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v VARCHAR)");
+            // query the table from another connection, so that SYSTEM.STATS will be used 
+            conn.createStatement().execute("SELECT * FROM "+DATA_TABLE_FULL_NAME);
+            // verify that that metadata queue is used once 
+            Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getMetadataRpcExecutor()).dispatch(Mockito.any(CallRunner.class));
+        }
+        finally {
+            conn.close();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/it/java/org/apache/phoenix/rpc/TestPhoenixIndexRpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/TestPhoenixIndexRpcSchedulerFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/TestPhoenixIndexRpcSchedulerFactory.java
new file mode 100644
index 0000000..fb29985
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/TestPhoenixIndexRpcSchedulerFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.rpc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ipc.BalancedQueueRpcExecutor;
+import org.apache.hadoop.hbase.ipc.PhoenixRpcScheduler;
+import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.RpcExecutor;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.mockito.Mockito;
+
+public class TestPhoenixIndexRpcSchedulerFactory extends PhoenixRpcSchedulerFactory {
+    
+    private static RpcExecutor indexRpcExecutor = Mockito.spy(new BalancedQueueRpcExecutor("test-index-queue", 30, 1,
+            300));
+    private static RpcExecutor metadataRpcExecutor = Mockito.spy(new BalancedQueueRpcExecutor("test-metataqueue", 30,
+            1, 300));
+
+    @Override
+    public RpcScheduler create(Configuration conf, PriorityFunction priorityFunction, Abortable abortable) {
+        PhoenixRpcScheduler phoenixIndexRpcScheduler = (PhoenixRpcScheduler)super.create(conf, priorityFunction, abortable);
+        phoenixIndexRpcScheduler.setIndexExecutorForTesting(indexRpcExecutor);
+        phoenixIndexRpcScheduler.setMetadataExecutorForTesting(metadataRpcExecutor);
+        return phoenixIndexRpcScheduler;
+    }
+    
+    @Override
+    public RpcScheduler create(Configuration configuration, PriorityFunction priorityFunction) {
+        return create(configuration, priorityFunction, null);
+    }
+    
+    public static RpcExecutor getIndexRpcExecutor() {
+        return indexRpcExecutor;
+    }
+    
+    public static RpcExecutor getMetadataRpcExecutor() {
+        return metadataRpcExecutor;
+    }
+    
+    public static void reset() {
+        Mockito.reset(metadataRpcExecutor);
+        Mockito.reset(indexRpcExecutor);
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java
deleted file mode 100644
index 4709304..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * {@link RpcScheduler} that first checks to see if this is an index update before passing off the
- * call to the delegate {@link RpcScheduler}.
- * <p>
- * We reserve the range (1000, 1050], by default (though it is configurable), for index priority
- * writes. Currently, we don't do any prioritization within that range - all index writes are
- * treated with the same priority and put into the same queue.
- */
-public class PhoenixIndexRpcScheduler extends RpcScheduler {
-
-    // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4
-    public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = "ipc.server.callqueue.read.share";
-    public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
-            "ipc.server.callqueue.handler.factor";
-    private static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
-
-    private RpcScheduler delegate;
-    private int minPriority;
-    private int maxPriority;
-    private RpcExecutor callExecutor;
-    private int port;
-
-    public PhoenixIndexRpcScheduler(int indexHandlerCount, Configuration conf,
-            RpcScheduler delegate, int minPriority, int maxPriority) {
-        int maxQueueLength =
-                conf.getInt("ipc.server.max.callqueue.length", indexHandlerCount
-                        * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
-
-        // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4
-        float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
-        int numCallQueues =
-                Math.max(1, Math.round(indexHandlerCount * callQueuesHandlersFactor));
-
-        this.minPriority = minPriority;
-        this.maxPriority = maxPriority;
-        this.delegate = delegate;
-
-        this.callExecutor =
-                new BalancedQueueRpcExecutor("Index", indexHandlerCount, numCallQueues,
-                        maxQueueLength);
-    }
-
-    @Override
-    public void init(Context context) {
-        delegate.init(context);
-        this.port = context.getListenerAddress().getPort();
-    }
-
-    @Override
-    public void start() {
-        delegate.start();
-        callExecutor.start(port);
-    }
-
-    @Override
-    public void stop() {
-        delegate.stop();
-        callExecutor.stop();
-    }
-
-    @Override
-    public void dispatch(CallRunner callTask) throws InterruptedException, IOException {
-        RpcServer.Call call = callTask.getCall();
-        int priority = call.header.getPriority();
-        if (minPriority <= priority && priority < maxPriority) {
-            callExecutor.dispatch(callTask);
-        } else {
-            delegate.dispatch(callTask);
-        }
-    }
-
-    @Override
-    public int getGeneralQueueLength() {
-        // not the best way to calculate, but don't have a better way to hook
-        // into metrics at the moment
-        return this.delegate.getGeneralQueueLength() + this.callExecutor.getQueueLength();
-    }
-
-    @Override
-    public int getPriorityQueueLength() {
-        return this.delegate.getPriorityQueueLength();
-    }
-
-    @Override
-    public int getReplicationQueueLength() {
-        return this.delegate.getReplicationQueueLength();
-    }
-
-    @Override
-    public int getActiveRpcHandlerCount() {
-        return this.delegate.getActiveRpcHandlerCount() + this.callExecutor.getActiveHandlerCount();
-    }
-
-    @VisibleForTesting
-    public void setExecutorForTesting(RpcExecutor executor) {
-        this.callExecutor = executor;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
new file mode 100644
index 0000000..e721271
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * {@link RpcScheduler} that first checks to see if this is an index or metedata update before passing off the
+ * call to the delegate {@link RpcScheduler}.
+ */
+public class PhoenixRpcScheduler extends RpcScheduler {
+
+    // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4
+    private static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "ipc.server.callqueue.handler.factor";
+    private static final String CALLQUEUE_LENGTH_CONF_KEY = "ipc.server.max.callqueue.length";
+    private static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
+
+    private RpcScheduler delegate;
+    private int indexPriority;
+    private int metadataPriority;
+    private RpcExecutor indexCallExecutor;
+    private RpcExecutor metadataCallExecutor;
+    private int port;
+
+    public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority, int metadataPriority) {
+        // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4
+        int maxQueueLength =  conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+        float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
+        int numQueues = Math.max(1, Math.round(callQueuesHandlersFactor));
+
+        this.indexPriority = indexPriority;
+        this.metadataPriority = metadataPriority;
+        this.delegate = delegate;
+        this.indexCallExecutor = new BalancedQueueRpcExecutor("Index", 1, numQueues, maxQueueLength);
+        this.metadataCallExecutor = new BalancedQueueRpcExecutor("Metadata", 1, numQueues, maxQueueLength);
+    }
+
+    @Override
+    public void init(Context context) {
+        delegate.init(context);
+        this.port = context.getListenerAddress().getPort();
+    }
+
+    @Override
+    public void start() {
+        delegate.start();
+        indexCallExecutor.start(port);
+        metadataCallExecutor.start(port);
+    }
+
+    @Override
+    public void stop() {
+        delegate.stop();
+        indexCallExecutor.stop();
+        metadataCallExecutor.stop();
+    }
+
+    @Override
+    public void dispatch(CallRunner callTask) throws InterruptedException, IOException {
+        RpcServer.Call call = callTask.getCall();
+        int priority = call.header.getPriority();
+        if (indexPriority == priority) {
+            indexCallExecutor.dispatch(callTask);
+        } else if (metadataPriority == priority) {
+            metadataCallExecutor.dispatch(callTask);
+        } else {
+            delegate.dispatch(callTask);
+        }
+    }
+
+    @Override
+    public int getGeneralQueueLength() {
+        // not the best way to calculate, but don't have a better way to hook
+        // into metrics at the moment
+        return this.delegate.getGeneralQueueLength() + this.indexCallExecutor.getQueueLength() + this.metadataCallExecutor.getQueueLength();
+    }
+
+    @Override
+    public int getPriorityQueueLength() {
+        return this.delegate.getPriorityQueueLength();
+    }
+
+    @Override
+    public int getReplicationQueueLength() {
+        return this.delegate.getReplicationQueueLength();
+    }
+
+    @Override
+    public int getActiveRpcHandlerCount() {
+        return this.delegate.getActiveRpcHandlerCount() + this.indexCallExecutor.getActiveHandlerCount() + this.metadataCallExecutor.getActiveHandlerCount();
+    }
+
+    @VisibleForTesting
+    public void setIndexExecutorForTesting(RpcExecutor executor) {
+        this.indexCallExecutor = executor;
+    }
+    
+    @VisibleForTesting
+    public void setMetadataExecutorForTesting(RpcExecutor executor) {
+        this.metadataCallExecutor = executor;
+    }
+    
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
new file mode 100644
index 0000000..a697382
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
+import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Factory to create a {@link PhoenixRpcScheduler}. In this package so we can access the
+ * {@link SimpleRpcSchedulerFactory}.
+ */
+public class PhoenixRpcSchedulerFactory implements RpcSchedulerFactory {
+
+    private static final Log LOG = LogFactory.getLog(PhoenixRpcSchedulerFactory.class);
+
+    private static final String VERSION_TOO_OLD_FOR_INDEX_RPC =
+            "Running an older version of HBase (less than 0.98.4), Phoenix index RPC handling cannot be enabled.";
+
+    @Override
+    public RpcScheduler create(Configuration conf, PriorityFunction priorityFunction, Abortable abortable) {
+        // create the delegate scheduler
+        RpcScheduler delegate;
+        try {
+            // happens in <=0.98.4 where the scheduler factory is not visible
+            delegate = new SimpleRpcSchedulerFactory().create(conf, priorityFunction, abortable);
+        } catch (IllegalAccessError e) {
+            LOG.fatal(VERSION_TOO_OLD_FOR_INDEX_RPC);
+            throw e;
+        }
+
+        // get the index priority configs
+        int indexPriority = getIndexPriority(conf);
+        validatePriority(indexPriority);
+        // get the metadata priority configs
+        int metadataPriority = getMetadataPriority(conf);
+        validatePriority(metadataPriority);
+
+        // validate index and metadata priorities are not the same
+        Preconditions.checkArgument(indexPriority != metadataPriority, "Index and Metadata priority must not be same "+ indexPriority);
+        LOG.info("Using custom Phoenix Index RPC Handling with index rpc priority " + indexPriority + " and metadata rpc priority " + metadataPriority);
+
+        PhoenixRpcScheduler scheduler =
+                new PhoenixRpcScheduler(conf, delegate, indexPriority, metadataPriority);
+        return scheduler;
+    }
+
+    @Override
+    public RpcScheduler create(Configuration configuration, PriorityFunction priorityFunction) {
+        return create(configuration, priorityFunction, null);
+    }
+
+    /**
+     * Validates that the given priority does not overlap with the HBase priority range
+     */
+    private void validatePriority(int priority) {
+        Preconditions.checkArgument( priority < HConstants.NORMAL_QOS || priority > HConstants.HIGH_QOS, "priority cannot be within hbase priority range " 
+        			+ HConstants.NORMAL_QOS +" to " + HConstants.HIGH_QOS ); 
+    }
+
+    public static int getIndexPriority(Configuration conf) {
+        return conf.getInt(QueryServices.INDEX_PRIOIRTY_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_PRIORITY);
+    }
+    
+    public static int getMetadataPriority(Configuration conf) {
+        return conf.getInt(QueryServices.METADATA_PRIOIRTY_ATTRIB, QueryServicesOptions.DEFAULT_METADATA_PRIORITY);
+    }
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ClientRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ClientRpcControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ClientRpcControllerFactory.java
new file mode 100644
index 0000000..5a7dcc2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ClientRpcControllerFactory.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc.controller;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+
+/**
+ * {@link RpcControllerFactory} that sets the priority of metadata rpc calls to be processed
+ * in its own queue.
+ */
+public class ClientRpcControllerFactory extends RpcControllerFactory {
+
+    public ClientRpcControllerFactory(Configuration conf) {
+        super(conf);
+    }
+
+    @Override
+    public PayloadCarryingRpcController newController() {
+        PayloadCarryingRpcController delegate = super.newController();
+        return getController(delegate);
+    }
+
+    @Override
+    public PayloadCarryingRpcController newController(CellScanner cellScanner) {
+        PayloadCarryingRpcController delegate = super.newController(cellScanner);
+        return getController(delegate);
+    }
+
+    @Override
+    public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) {
+        PayloadCarryingRpcController delegate = super.newController(cellIterables);
+        return getController(delegate);
+    }
+    
+    private PayloadCarryingRpcController getController(PayloadCarryingRpcController delegate) {
+		return new MetadataRpcController(delegate, conf);
+    }
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/IndexRpcController.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/IndexRpcController.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/IndexRpcController.java
new file mode 100644
index 0000000..fdb1d33
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/IndexRpcController.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc.controller;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+
+class IndexRpcController extends DelegatingPayloadCarryingRpcController {
+
+    private final int priority;
+    private final String tracingTableName;
+    
+    public IndexRpcController(PayloadCarryingRpcController delegate, Configuration conf) {
+        super(delegate);
+        this.priority = PhoenixRpcSchedulerFactory.getIndexPriority(conf);
+        this.tracingTableName = conf.get(QueryServices.TRACING_STATS_TABLE_NAME_ATTRIB,
+                QueryServicesOptions.DEFAULT_TRACING_STATS_TABLE_NAME);
+    }
+    
+    @Override
+    public void setPriority(final TableName tn) {
+		if (!tn.isSystemTable() && !tn.getNameAsString().equals(tracingTableName)) {
+			setPriority(this.priority);
+		}  
+        else {
+            super.setPriority(tn);
+        }
+    }
+    
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
new file mode 100644
index 0000000..23b9f03
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc.controller;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+
+import com.google.common.collect.ImmutableList;
+
+class MetadataRpcController extends DelegatingPayloadCarryingRpcController {
+
+	private int priority;
+	// list of system tables
+	private static final List<String> SYSTEM_TABLE_NAMES = new ImmutableList.Builder<String>()
+			.add(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME)
+			.add(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)
+			.add(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME).build();
+
+	public MetadataRpcController(PayloadCarryingRpcController delegate,
+			Configuration conf) {
+		super(delegate);
+		this.priority = PhoenixRpcSchedulerFactory.getMetadataPriority(conf);
+	}
+
+	@Override
+	public void setPriority(final TableName tn) {
+		if (SYSTEM_TABLE_NAMES.contains(tn.getNameAsString())) {
+			setPriority(this.priority);
+		} else {
+			super.setPriority(tn);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerRpcControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerRpcControllerFactory.java
new file mode 100644
index 0000000..8c17eda
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerRpcControllerFactory.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc.controller;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+
+/**
+ * {@link RpcControllerFactory} that sets the priority of index and metadata rpc calls
+ * so that they are each processed in their own queues
+ */
+public class ServerRpcControllerFactory extends RpcControllerFactory {
+
+    public ServerRpcControllerFactory(Configuration conf) {
+        super(conf);
+    }
+
+    @Override
+    public PayloadCarryingRpcController newController() {
+        PayloadCarryingRpcController delegate = super.newController();
+        return getController(delegate);
+    }
+
+    @Override
+    public PayloadCarryingRpcController newController(CellScanner cellScanner) {
+        PayloadCarryingRpcController delegate = super.newController(cellScanner);
+        return getController(delegate);
+    }
+
+    @Override
+    public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) {
+        PayloadCarryingRpcController delegate = super.newController(cellIterables);
+        return getController(delegate);
+    }
+    
+    private PayloadCarryingRpcController getController(PayloadCarryingRpcController delegate) {
+    	// construct a chain of controllers: metadata, index and standard controller
+    	IndexRpcController indexRpcController = new IndexRpcController(delegate, conf);
+		return new MetadataRpcController(indexRpcController, conf);
+    }
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java
deleted file mode 100644
index a192feb..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hbase.index;
-
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScannable;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.phoenix.hbase.index.ipc.PhoenixIndexRpcSchedulerFactory;
-import org.apache.phoenix.util.SchemaUtil;
-
-/**
- * {@link RpcControllerFactory} that overrides the standard {@link PayloadCarryingRpcController} to
- * allow the configured index tables (via {@link #INDEX_TABLE_NAMES_KEY}) to use the Index priority.
- */
-public class IndexQosRpcControllerFactory extends RpcControllerFactory {
-
-    public static final String INDEX_TABLE_NAMES_KEY = "phoenix.index.rpc.controller.index-tables";
-
-    public IndexQosRpcControllerFactory(Configuration conf) {
-        super(conf);
-    }
-
-    @Override
-    public PayloadCarryingRpcController newController() {
-        PayloadCarryingRpcController delegate = super.newController();
-        return new IndexQosRpcController(delegate, conf);
-    }
-
-    @Override
-    public PayloadCarryingRpcController newController(CellScanner cellScanner) {
-        PayloadCarryingRpcController delegate = super.newController(cellScanner);
-        return new IndexQosRpcController(delegate, conf);
-    }
-
-    @Override
-    public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) {
-        PayloadCarryingRpcController delegate = super.newController(cellIterables);
-        return new IndexQosRpcController(delegate, conf);
-    }
-
-    private class IndexQosRpcController extends DelegatingPayloadCarryingRpcController {
-
-        private int priority;
-
-        public IndexQosRpcController(PayloadCarryingRpcController delegate, Configuration conf) {
-            super(delegate);
-            this.priority = PhoenixIndexRpcSchedulerFactory.getMinPriority(conf);
-        }
-        @Override
-        public void setPriority(final TableName tn) {
-            // if its an index table, then we override to the index priority
-            if (!tn.isSystemTable() &&  !SchemaUtil.isSystemDataTable(tn.getNameAsString())) {
-                setPriority(this.priority);
-            } 
-            else {
-                super.setPriority(tn);
-            }
-        }
-
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ipc/PhoenixIndexRpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ipc/PhoenixIndexRpcSchedulerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ipc/PhoenixIndexRpcSchedulerFactory.java
deleted file mode 100644
index 1789b0e..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ipc/PhoenixIndexRpcSchedulerFactory.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hbase.index.ipc;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ipc.PhoenixIndexRpcScheduler;
-import org.apache.hadoop.hbase.ipc.PriorityFunction;
-import org.apache.hadoop.hbase.ipc.RpcScheduler;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
-import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Factory to create a {@link PhoenixIndexRpcScheduler}. In this package so we can access the
- * {@link SimpleRpcSchedulerFactory}.
- */
-public class PhoenixIndexRpcSchedulerFactory implements RpcSchedulerFactory {
-
-    private static final Log LOG = LogFactory.getLog(PhoenixIndexRpcSchedulerFactory.class);
-
-    private static final String VERSION_TOO_OLD_FOR_INDEX_RPC =
-            "Running an older version of HBase (less than 0.98.4), Phoenix index RPC handling cannot be enabled.";
-
-    @Override
-    public RpcScheduler create(Configuration conf, PriorityFunction priorityFunction, Abortable abortable) {
-        // create the delegate scheduler
-        RpcScheduler delegate;
-        try {
-            // happens in <=0.98.4 where the scheduler factory is not visible
-            delegate = new SimpleRpcSchedulerFactory().create(conf, priorityFunction, abortable);
-        } catch (IllegalAccessError e) {
-            LOG.fatal(VERSION_TOO_OLD_FOR_INDEX_RPC);
-            throw e;
-        }
-
-        int indexHandlerCount = conf.getInt(QueryServices.INDEX_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
-        int minPriority = getMinPriority(conf);
-        int maxPriority = conf.getInt(QueryServices.MAX_INDEX_PRIOIRTY_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_MAX_PRIORITY);
-        // make sure the ranges are outside the warning ranges
-        Preconditions.checkArgument(maxPriority > minPriority, "Max index priority (" + maxPriority
-                + ") must be larger than min priority (" + minPriority + ")");
-        boolean allSmaller =
-                minPriority < HConstants.REPLICATION_QOS
-                        && maxPriority < HConstants.REPLICATION_QOS;
-        boolean allLarger = minPriority > HConstants.HIGH_QOS;
-        Preconditions.checkArgument(allSmaller || allLarger, "Index priority range (" + minPriority
-                + ",  " + maxPriority + ") must be outside HBase priority range ("
-                + HConstants.REPLICATION_QOS + ", " + HConstants.HIGH_QOS + ")");
-
-        LOG.info("Using custom Phoenix Index RPC Handling with " + indexHandlerCount
-                + " handlers and priority range [" + minPriority + ", " + maxPriority + ")");
-
-        PhoenixIndexRpcScheduler scheduler =
-                new PhoenixIndexRpcScheduler(indexHandlerCount, conf, delegate, minPriority,
-                        maxPriority);
-        return scheduler;
-    }
-
-    @Override
-    public RpcScheduler create(Configuration configuration, PriorityFunction priorityFunction) {
-        return create(configuration, priorityFunction, null);
-    }
-
-    public static int getMinPriority(Configuration conf) {
-        return conf.getInt(QueryServices.MIN_INDEX_PRIOIRTY_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_MIN_PRIORITY);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 15bcfd0..1b8b57d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -279,10 +279,6 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
     /** Version below which we fall back on the generic KeyValueBuilder */
     public static final int CLIENT_KEY_VALUE_BUILDER_THRESHOLD = VersionUtil.encodeVersion("0", "94", "14");
     
-    // list of system tables
-    public static final List<String> SYSTEM_TABLE_NAMES = new ImmutableList.Builder<String>().add(SYSTEM_CATALOG_NAME)
-            .add(SYSTEM_STATS_NAME).add(SEQUENCE_FULLNAME).build();
-    
     PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException {
         this.emptyResultSet = new PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, new PhoenixStatement(connection));
         this.connection = connection;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 2eab5dd..65f6acf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -123,9 +123,8 @@ public interface QueryServices extends SQLCloseable {
     // Index will be partially re-built from index disable time stamp - following overlap time
     public static final String INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB =
         "phoenix.index.failure.handling.rebuild.overlap.time";
-    public static final String MIN_INDEX_PRIOIRTY_ATTRIB = "phoenix.regionserver.index.priority.min";
-    public static final String MAX_INDEX_PRIOIRTY_ATTRIB = "phoenix.regionserver.index.priority.max";
-    public static final String INDEX_HANDLER_COUNT_ATTRIB = "phoenix.regionserver.index.handler.count";
+    public static final String INDEX_PRIOIRTY_ATTRIB = "phoenix.index.rpc.priority";
+    public static final String METADATA_PRIOIRTY_ATTRIB = "phoenix.metadata.rpc.priority";
     public static final String ALLOW_LOCAL_INDEX_ATTRIB = "phoenix.index.allowLocalIndex";
 
     // Config parameters for for configuring tracing

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 8cd740a..97040d2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -41,6 +41,7 @@ import static org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_TIME_TO_LI
 import static org.apache.phoenix.query.QueryServices.MAX_SERVER_METADATA_CACHE_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.METRICS_ENABLED;
 import static org.apache.phoenix.query.QueryServices.MIN_STATS_UPDATE_FREQ_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK;
@@ -61,12 +62,13 @@ import static org.apache.phoenix.query.QueryServices.STATS_USE_CURRENT_TIME_ATTR
 import static org.apache.phoenix.query.QueryServices.THREAD_POOL_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.THREAD_TIMEOUT_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.METRICS_ENABLED;
 
 import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.trace.util.Tracing;
@@ -138,13 +140,12 @@ public class QueryServicesOptions {
     public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL = 10000; // 10 secs
     public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME = 300000; // 5 mins
 
-    public static final int DEFAULT_INDEX_MAX_PRIORITY = 1050;
     /**
      * HConstants#HIGH_QOS is the max we will see to a standard table. We go higher to differentiate
      * and give some room for things in the middle
      */
-    public static final int DEFAULT_INDEX_MIN_PRIORITY = 1000;
-    public static final int DEFAULT_INDEX_HANDLER_COUNT = 30;
+    public static final int DEFAULT_INDEX_PRIORITY = 1000;
+    public static final int DEFAULT_METADATA_PRIORITY = 2000;
     public static final boolean DEFAULT_ALLOW_LOCAL_INDEX = true;
 
     public static final int DEFAULT_TRACING_PAGE_SIZE = 100;
@@ -235,7 +236,8 @@ public class QueryServicesOptions {
             .setIfUnset(ALLOW_ONLINE_TABLE_SCHEMA_UPDATE, DEFAULT_ALLOW_ONLINE_TABLE_SCHEMA_UPDATE)
             .setIfUnset(NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_RETRIES_FOR_SCHEMA_UPDATE_CHECK)
             .setIfUnset(DELAY_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_DELAY_FOR_SCHEMA_UPDATE_CHECK)
-            .setIfUnset(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED);
+            .setIfUnset(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED)
+            .setIfUnset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, ClientRpcControllerFactory.class.getName());
             ;
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user set

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 4a8341d..46da726 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -404,13 +404,6 @@ public class SchemaUtil {
         return false;
     }
     
-    /**
-     * Returns true if the given table is a system table (does not include future system indexes)
-     */
-    public static boolean isSystemDataTable(String fullTableName) {
-    	return PhoenixDatabaseMetaData.SYSTEM_TABLE_NAMES.contains(fullTableName);
-    }
-
     // Given the splits and the rowKeySchema, find out the keys that 
     public static byte[][] processSplits(byte[][] splits, LinkedHashSet<PColumn> pkColumns, Integer saltBucketNum, boolean defaultRowKeyOrder) throws SQLException {
         // FIXME: shouldn't this return if splits.length == 0?

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
index 8bd8c11..12f1863 100644
--- a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
+++ b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
@@ -44,9 +44,9 @@ public class PhoenixIndexRpcSchedulerTest {
     public void testIndexPriorityWritesToIndexHandler() throws Exception {
         RpcScheduler mock = Mockito.mock(RpcScheduler.class);
 
-        PhoenixIndexRpcScheduler scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 200, 250);
+        PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250);
         BalancedQueueRpcExecutor executor = new BalancedQueueRpcExecutor("test-queue", 1, 1, 1);
-        scheduler.setExecutorForTesting(executor);
+        scheduler.setIndexExecutorForTesting(executor);
         dispatchCallWithPriority(scheduler, 200);
         List<BlockingQueue<CallRunner>> queues = executor.getQueues();
         assertEquals(1, queues.size());
@@ -54,8 +54,8 @@ public class PhoenixIndexRpcSchedulerTest {
         queue.poll(20, TimeUnit.SECONDS);
 
         // try again, this time we tweak the ranges we support
-        scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 101, 110);
-        scheduler.setExecutorForTesting(executor);
+        scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110);
+        scheduler.setIndexExecutorForTesting(executor);
         dispatchCallWithPriority(scheduler, 101);
         queue.poll(20, TimeUnit.SECONDS);
 
@@ -71,14 +71,14 @@ public class PhoenixIndexRpcSchedulerTest {
     @Test
     public void testDelegateWhenOutsideRange() throws Exception {
         RpcScheduler mock = Mockito.mock(RpcScheduler.class);
-        PhoenixIndexRpcScheduler scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 200, 250);
+        PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250);
         dispatchCallWithPriority(scheduler, 100);
-        dispatchCallWithPriority(scheduler, 250);
+        dispatchCallWithPriority(scheduler, 251);
 
         // try again, this time we tweak the ranges we support
-        scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 101, 110);
+        scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110);
         dispatchCallWithPriority(scheduler, 200);
-        dispatchCallWithPriority(scheduler, 110);
+        dispatchCallWithPriority(scheduler, 111);
 
         Mockito.verify(mock, Mockito.times(4)).init(Mockito.any(Context.class));
         Mockito.verify(mock, Mockito.times(4)).dispatch(Mockito.any(CallRunner.class));