You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/05/26 06:27:31 UTC

git commit: Revert Use higher priority queue for index updates to prevent deadlock

Repository: incubator-phoenix
Updated Branches:
  refs/heads/master edc75a2c2 -> 817380820


Revert Use higher priority queue for index updates to prevent deadlock


Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/81738082
Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/81738082
Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/81738082

Branch: refs/heads/master
Commit: 817380820a34b28406067cc529395fb85ce862b2
Parents: edc75a2
Author: James Taylor <jt...@salesforce.com>
Authored: Sun May 25 21:28:31 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Sun May 25 21:28:31 2014 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/index/IndexHandlerIT.java   | 166 -------------------
 .../hbase/ipc/PhoenixIndexRpcScheduler.java     | 158 ------------------
 .../PhoenixIndexRpcSchedulerFactory.java        |  75 ---------
 .../index/IndexQosRpcControllerFactory.java     |  95 -----------
 .../index/table/CoprocessorHTableFactory.java   |  70 ++++----
 .../hbase/ipc/PhoenixIndexRpcSchedulerTest.java |  93 -----------
 .../PhoenixIndexRpcSchedulerFactoryTest.java    | 104 ------------
 7 files changed, 29 insertions(+), 732 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/81738082/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java
deleted file mode 100644
index 7efaf71..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end.index;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScannable;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.regionserver.PhoenixIndexRpcSchedulerFactory;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.hbase.index.IndexQosRpcControllerFactory;
-import org.apache.phoenix.hbase.index.TableName;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Comprehensive test that ensures we are adding custom index handlers
- */
-public class IndexHandlerIT {
-
-    public static class CountingIndexClientRpcFactory extends RpcControllerFactory {
-
-        private IndexQosRpcControllerFactory delegate;
-
-        public CountingIndexClientRpcFactory(Configuration conf) {
-            super(conf);
-            this.delegate = new IndexQosRpcControllerFactory(conf);
-        }
-
-        @Override
-        public PayloadCarryingRpcController newController() {
-            PayloadCarryingRpcController controller = delegate.newController();
-            return new CountingIndexClientRpcController(controller);
-        }
-
-        @Override
-        public PayloadCarryingRpcController newController(CellScanner cellScanner) {
-            PayloadCarryingRpcController controller = delegate.newController(cellScanner);
-            return new CountingIndexClientRpcController(controller);
-        }
-
-        @Override
-        public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) {
-            PayloadCarryingRpcController controller = delegate.newController(cellIterables);
-            return new CountingIndexClientRpcController(controller);
-        }
-    }
-
-    public static class CountingIndexClientRpcController extends
-            DelegatingPayloadCarryingRpcController {
-
-        private static Map<Integer, Integer> priorityCounts = new HashMap<Integer, Integer>();
-
-        public CountingIndexClientRpcController(PayloadCarryingRpcController delegate) {
-            super(delegate);
-        }
-
-        @Override
-        public void setPriority(int pri) {
-            Integer count = priorityCounts.get(pri);
-            if (count == 0) {
-                count = new Integer(0);
-            }
-            count = count.intValue() + 1;
-            priorityCounts.put(pri, count);
-
-        }
-    }
-
-    private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
-
-    private static final byte[] row = Bytes.toBytes("row");
-    private static final byte[] family = Bytes.toBytes("FAM");
-    private static final byte[] qual = Bytes.toBytes("qual");
-    private static final HColumnDescriptor FAM1 = new HColumnDescriptor(family);
-
-    @Rule
-    public TableName TestTable = new TableName();
-
-    @BeforeClass
-    public static void setupCluster() throws Exception {
-        UTIL.startMiniCluster();
-    }
-
-    @AfterClass
-    public static void shutdownCluster() throws Exception {
-        UTIL.shutdownMiniCluster();
-    }
-
-    @Before
-    public void setup() throws Exception {
-        HTableDescriptor desc =
-                new HTableDescriptor(org.apache.hadoop.hbase.TableName.valueOf(TestTable
-                        .getTableNameString()));
-        desc.addFamily(FAM1);
-
-        // create the table
-        HBaseAdmin admin = UTIL.getHBaseAdmin();
-        admin.createTable(desc);
-    }
-
-    @After
-    public void cleanup() throws Exception {
-        HBaseAdmin admin = UTIL.getHBaseAdmin();
-        admin.disableTable(TestTable.getTableName());
-        admin.deleteTable(TestTable.getTableName());
-    }
-
-    @Test
-    public void testClientWritesWithPriority() throws Exception {
-        Configuration conf = new Configuration(UTIL.getConfiguration());
-        // add the keys for our rpc factory
-        conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
-            CountingIndexClientRpcFactory.class.getName());
-        // and set the index table as the current table
-        conf.setStrings(IndexQosRpcControllerFactory.INDEX_TABLE_NAMES_KEY,
-            TestTable.getTableNameString());
-        HTable table = new HTable(conf, TestTable.getTableName());
-
-        // do a write to the table
-        Put p = new Put(row);
-        p.add(family, qual, new byte[] { 1, 0, 1, 0 });
-        table.put(p);
-        table.flushCommits();
-
-        // check the counts on the rpc controller
-        assertEquals("Didn't get the expected number of index priority writes!", (int) 1,
-            (int) CountingIndexClientRpcController.priorityCounts
-                    .get(PhoenixIndexRpcSchedulerFactory.DEFAULT_INDEX_MIN_PRIORITY));
-
-        table.close();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/81738082/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java
deleted file mode 100644
index e5bd5d1..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ipc.CallRunner;
-import org.apache.hadoop.hbase.ipc.RpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcServer;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
-/**
- * {@link RpcScheduler} that first checks to see if this is an index update before passing off the
- * call to the delegate {@link RpcScheduler}.
- * <p>
- * We reserve the range (200, 250], by default (though it is configurable), for index priority
- * writes. Currently, we don't do any prioritization within that range - all index writes are
- * treated with the same priority and put into the same queue.
- */
-public class PhoenixIndexRpcScheduler implements RpcScheduler {
-
-    private LinkedBlockingQueue<CallRunner> indexCallQueue;
-    private RpcScheduler delegate;
-    private final int handlerCount;
-    private volatile boolean running;
-    private int port;
-    private final List<Thread> handlers = Lists.newArrayList();
-    private int minPriority;
-    private int maxPriority;
-
-    public PhoenixIndexRpcScheduler(int indexHandlerCount, Configuration conf,
-            RpcScheduler delegate, int minPriority, int maxPriority) {
-        int maxQueueLength =
-                conf.getInt("ipc.server.max.callqueue.length", indexHandlerCount
-                        * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
-        this.minPriority = minPriority;
-        this.maxPriority = maxPriority;
-
-        this.indexCallQueue = new LinkedBlockingQueue<CallRunner>(maxQueueLength);
-        this.handlerCount = indexHandlerCount;
-        this.delegate = delegate;
-    }
-
-    @Override
-    public void init(Context context) {
-        delegate.init(context);
-        this.port = context.getListenerAddress().getPort();
-    }
-
-    @Override
-    public void start() {
-        delegate.start();
-        running = true;
-        startHandlers(handlerCount, indexCallQueue, "PhoenixIndexing.");
-    }
-
-    @Override
-    public void stop() {
-        running = false;
-        for (Thread handler : handlers) {
-            handler.interrupt();
-        }
-        delegate.stop();
-    }
-
-    @Override
-    public void dispatch(CallRunner callTask) throws InterruptedException, IOException {
-        RpcServer.Call call = callTask.getCall();
-        int priority = call.header.getPriority();
-        if (minPriority <= priority && priority < maxPriority) {
-            indexCallQueue.put(callTask);
-        } else {
-            delegate.dispatch(callTask);
-        }
-    }
-
-    @Override
-    public int getGeneralQueueLength() {
-        // not the best way to calculate, but don't have a better way to hook
-        // into metrics at the moment
-        return this.delegate.getGeneralQueueLength() + indexCallQueue.size();
-    }
-
-    @Override
-    public int getPriorityQueueLength() {
-        return this.delegate.getPriorityQueueLength();
-    }
-
-    @Override
-    public int getReplicationQueueLength() {
-        return this.delegate.getReplicationQueueLength();
-    }
-
-    // ****************************************************
-    // Below copied from SimpleRpcScheduler for visibility
-    // *****************************************************
-    private void startHandlers(int handlerCount, final BlockingQueue<CallRunner> callQueue,
-            String threadNamePrefix) {
-        for (int i = 0; i < handlerCount; i++) {
-            Thread t = new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    consumerLoop(callQueue);
-                }
-            });
-            t.setDaemon(true);
-            t.setName(Strings.nullToEmpty(threadNamePrefix) + "RpcServer.handler=" + i + ",port="
-                    + port);
-            t.start();
-            handlers.add(t);
-        }
-    }
-
-    private void consumerLoop(BlockingQueue<CallRunner> myQueue) {
-        boolean interrupted = false;
-        try {
-            while (running) {
-                try {
-                    CallRunner task = myQueue.take();
-                    task.run();
-                } catch (InterruptedException e) {
-                    interrupted = true;
-                }
-            }
-        } finally {
-            if (interrupted) {
-                Thread.currentThread().interrupt();
-            }
-        }
-    }
-
-    @VisibleForTesting
-    public void setIndexCallQueueForTesting(LinkedBlockingQueue<CallRunner> indexQueue) {
-        this.indexCallQueue = indexQueue;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/81738082/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactory.java
deleted file mode 100644
index 6fbf462..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactory.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ipc.PhoenixIndexRpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcScheduler;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Factory to create a {@link PhoenixIndexRpcScheduler}. In this package so we can access the
- * {@link SimpleRpcSchedulerFactory}.
- */
-public class PhoenixIndexRpcSchedulerFactory implements RpcSchedulerFactory {
-
-    private static final String INDEX_HANDLER_COUNT_KEY =
-            "org.apache.phoenix.regionserver.index.handler.count";
-    private static final int DEFAULT_INDEX_HANDLER_COUNT = 30;
-
-    /**
-     * HConstants#HIGH_QOS is the max we will see to a standard table. We go higher to differentiate
-     * and give some room for things in the middle
-     */
-    public static final int DEFAULT_INDEX_MIN_PRIORITY = 200;
-    public static final int DEFAULT_INDEX_MAX_PRIORITY = 250;
-    public static final String MIN_INDEX_PRIOIRTY_KEY =
-            "org.apache.phoenix.regionserver.index.priority.min";
-    public static final String MAX_INDEX_PRIOIRTY_KEY =
-            "org.apache.phoenix.regionserver.index.priority.max";
-
-    @Override
-    public RpcScheduler create(Configuration conf, RegionServerServices services) {
-        // create the delegate scheduler
-        RpcScheduler delegate = new SimpleRpcSchedulerFactory().create(conf, services);
-        int indexHandlerCount = conf.getInt(INDEX_HANDLER_COUNT_KEY, DEFAULT_INDEX_HANDLER_COUNT);
-        int minPriority = getMinPriority(conf);
-        int maxPriority = conf.getInt(MAX_INDEX_PRIOIRTY_KEY, DEFAULT_INDEX_MAX_PRIORITY);
-        // make sure the ranges are outside the warning ranges
-        Preconditions.checkArgument(maxPriority > minPriority, "Max index priority (" + maxPriority
-                + ") must be larger than min priority (" + minPriority + ")");
-        boolean allSmaller =
-                minPriority < HConstants.REPLICATION_QOS
-                        && maxPriority < HConstants.REPLICATION_QOS;
-        boolean allLarger = minPriority > HConstants.HIGH_QOS;
-        Preconditions.checkArgument(allSmaller || allLarger, "Index priority range (" + minPriority
-                + ",  " + maxPriority + ") must be outside HBase priority range ("
-                + HConstants.REPLICATION_QOS + ", " + HConstants.HIGH_QOS + ")");
-
-        PhoenixIndexRpcScheduler scheduler =
-                new PhoenixIndexRpcScheduler(indexHandlerCount, conf, delegate, minPriority,
-                        maxPriority);
-        return scheduler;
-    }
-
-    public static int getMinPriority(Configuration conf) {
-        return conf.getInt(MIN_INDEX_PRIOIRTY_KEY, DEFAULT_INDEX_MIN_PRIORITY);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/81738082/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java
deleted file mode 100644
index 04a0f7c..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hbase.index;
-
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScannable;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.regionserver.PhoenixIndexRpcSchedulerFactory;
-
-/**
- * {@link RpcControllerFactory} that overrides the standard {@link PayloadCarryingRpcController} to
- * allow the configured index tables (via {@link #INDEX_TABLE_NAMES_KEY}) to use the Index priority.
- */
-public class IndexQosRpcControllerFactory extends RpcControllerFactory {
-
-    public static final String INDEX_TABLE_NAMES_KEY = "phoenix.index.rpc.controller.index-tables";
-
-    public IndexQosRpcControllerFactory(Configuration conf) {
-        super(conf);
-    }
-
-    @Override
-    public PayloadCarryingRpcController newController() {
-        PayloadCarryingRpcController delegate = super.newController();
-        return new IndexQosRpcController(delegate, conf);
-    }
-
-    @Override
-    public PayloadCarryingRpcController newController(CellScanner cellScanner) {
-        PayloadCarryingRpcController delegate = super.newController(cellScanner);
-        return new IndexQosRpcController(delegate, conf);
-    }
-
-    @Override
-    public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) {
-        PayloadCarryingRpcController delegate = super.newController(cellIterables);
-        return new IndexQosRpcController(delegate, conf);
-    }
-
-    /**
-     * @param tableName name of the index table
-     * @return configuration key for if a table should have Index QOS writes (its a target index
-     *         table)
-     */
-    public static String getTableIndexQosConfKey(String tableName) {
-        return "phoenix.index.table.qos._" + tableName;
-    }
-
-    private class IndexQosRpcController extends DelegatingPayloadCarryingRpcController {
-
-        private Configuration conf;
-        private int priority;
-
-        public IndexQosRpcController(PayloadCarryingRpcController delegate, Configuration conf) {
-            super(delegate);
-            this.conf = conf;
-            this.priority = PhoenixIndexRpcSchedulerFactory.getMinPriority(conf);
-        }
-
-        @Override
-        public void setPriority(final TableName tn) {
-            // if its an index table, then we override to the index priority
-            if (isIndexTable(tn)) {
-                setPriority(this.priority);
-            } else {
-                super.setPriority(tn);
-            }
-        }
-
-        private boolean isIndexTable(TableName tn) {
-            return conf.get(getTableIndexQosConfKey(tn.getNameAsString())) == null;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/81738082/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
index 8ccb0f3..8ef3e4f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.phoenix.hbase.index.table;
 
 import java.io.IOException;
@@ -26,56 +27,43 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.regionserver.PhoenixIndexRpcSchedulerFactory;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.hbase.index.IndexQosRpcControllerFactory;
+
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 
 public class CoprocessorHTableFactory implements HTableFactory {
 
-    /** Number of milliseconds per-interval to retry zookeeper */
-    private static final String ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL =
-            "zookeeper.recovery.retry.intervalmill";
-    /** Number of retries for zookeeper */
-    private static final String ZOOKEEPER_RECOVERY_RETRY_KEY = "zookeeper.recovery.retry";
-    private static final Log LOG = LogFactory.getLog(CoprocessorHTableFactory.class);
-    private CoprocessorEnvironment e;
+  /** Number of milliseconds per-interval to retry zookeeper */
+  private static final String ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL = "zookeeper.recovery.retry.intervalmill";
+  /** Number of retries for zookeeper */
+  private static final String ZOOKEEPER_RECOVERY_RETRY_KEY = "zookeeper.recovery.retry";
+  private static final Log LOG = LogFactory.getLog(CoprocessorHTableFactory.class);
+  private CoprocessorEnvironment e;
 
-    public CoprocessorHTableFactory(CoprocessorEnvironment e) {
-        this.e = e;
-    }
+  public CoprocessorHTableFactory(CoprocessorEnvironment e) {
+    this.e = e;
+  }
 
-    @Override
-    public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
-        Configuration conf = e.getConfiguration();
-        // make sure writers fail fast
-        IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
-        IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_PAUSE, 1000);
-        IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_KEY, 3);
-        IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL, 100);
-        IndexManagementUtil.setIfNotSet(conf, HConstants.ZK_SESSION_TIMEOUT, 30000);
-        IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_RPC_TIMEOUT_KEY, 5000);
+  @Override
+  public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
+    Configuration conf = e.getConfiguration();
+    // make sure writers fail fast
+    IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
+    IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_PAUSE, 1000);
+    IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_KEY, 3);
+    IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL, 100);
+    IndexManagementUtil.setIfNotSet(conf, HConstants.ZK_SESSION_TIMEOUT, 30000);
+    IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_RPC_TIMEOUT_KEY, 5000);
 
-        // make sure we use the index priority writer for our rpcs
-        conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
-            PhoenixIndexRpcSchedulerFactory.class.getName());
-        // make sure we include the index table in the tables we need to track
-        String tableName = Bytes.toString(tablename.copyBytesIfNecessary());
-        String confKey = IndexQosRpcControllerFactory.getTableIndexQosConfKey(tableName);
-        if (conf.get(confKey) == null) {
-            conf.setBoolean(confKey, true);
-        }
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Creating new HTable: " + tableName);
-        }
-        return this.e.getTable(TableName.valueOf(tablename.copyBytesIfNecessary()));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Creating new HTable: " + Bytes.toString(tablename.copyBytesIfNecessary()));
     }
+    return this.e.getTable(TableName.valueOf(tablename.copyBytesIfNecessary()));
+  }
 
-    @Override
-    public void shutdown() {
-        // noop
-    }
+  @Override
+  public void shutdown() {
+    // noop
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/81738082/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
deleted file mode 100644
index 875456c..0000000
--- a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.ipc.RpcScheduler.Context;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-/**
- * Test that the rpc scheduler schedules index writes to the index handler queue and sends
- * everything else to the standard queues
- */
-public class PhoenixIndexRpcSchedulerTest {
-
-    private static final Configuration conf = HBaseConfiguration.create();
-
-    @Test
-    public void testIndexPriorityWritesToIndexHandler() throws Exception {
-        RpcScheduler mock = Mockito.mock(RpcScheduler.class);
-
-        PhoenixIndexRpcScheduler scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 200, 250);
-        LinkedBlockingQueue<CallRunner> queue = new LinkedBlockingQueue<CallRunner>();
-        scheduler.setIndexCallQueueForTesting(queue);
-        dispatchCallWithPriority(scheduler, 200);
-        queue.poll(20, TimeUnit.SECONDS);
-
-        // try again, this time we tweak the ranges we support
-        scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 101, 110);
-        scheduler.setIndexCallQueueForTesting(queue);
-        dispatchCallWithPriority(scheduler, 101);
-        queue.poll(20, TimeUnit.SECONDS);
-
-        Mockito.verify(mock, Mockito.times(2)).init(Mockito.any(Context.class));
-        Mockito.verifyNoMoreInteractions(mock);
-    }
-
-    /**
-     * Test that we delegate to the passed {@link RpcScheduler} when the call priority is outside
-     * the index range
-     * @throws Exception
-     */
-    @Test
-    public void testDelegateWhenOutsideRange() throws Exception {
-        RpcScheduler mock = Mockito.mock(RpcScheduler.class);
-        PhoenixIndexRpcScheduler scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 200, 250);
-        dispatchCallWithPriority(scheduler, 100);
-        dispatchCallWithPriority(scheduler, 250);
-
-        // try again, this time we tweak the ranges we support
-        scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 101, 110);
-        dispatchCallWithPriority(scheduler, 200);
-        dispatchCallWithPriority(scheduler, 110);
-
-        Mockito.verify(mock, Mockito.times(4)).init(Mockito.any(Context.class));
-        Mockito.verify(mock, Mockito.times(4)).dispatch(Mockito.any(CallRunner.class));
-        Mockito.verifyNoMoreInteractions(mock);
-    }
-
-    private void dispatchCallWithPriority(RpcScheduler scheduler, int priority) throws Exception {
-        CallRunner task = Mockito.mock(CallRunner.class);
-        RequestHeader header = RequestHeader.newBuilder().setPriority(priority).build();
-        RpcServer server = new RpcServer(null, "test-rpcserver", null, null, conf, scheduler);
-        RpcServer.Call call =
-                server.new Call(0, null, null, header, null, null, null, null, 10, null);
-        Mockito.when(task.getCall()).thenReturn(call);
-
-        scheduler.dispatch(task);
-
-        Mockito.verify(task).getCall();
-        Mockito.verifyNoMoreInteractions(task);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/81738082/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactoryTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactoryTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactoryTest.java
deleted file mode 100644
index 1c78eb2..0000000
--- a/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactoryTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
-
-public class PhoenixIndexRpcSchedulerFactoryTest {
-
-    @Test
-    public void ensureInstantiation() throws Exception {
-        Configuration conf = new Configuration(false);
-        conf.setClass(HRegionServer.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
-            PhoenixIndexRpcSchedulerFactory.class, RpcSchedulerFactory.class);
-        // kinda lame that we copy the copy from the regionserver to do this and can't use a static
-        // method, but meh
-        try {
-            Class<?> rpcSchedulerFactoryClass =
-                    conf.getClass(HRegionServer.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
-                        SimpleRpcSchedulerFactory.class);
-            Object o = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
-            assertTrue(o instanceof PhoenixIndexRpcSchedulerFactory);
-        } catch (InstantiationException e) {
-            assertTrue("Should not have got an exception when instantiing the rpc scheduler: " + e,
-                false);
-        } catch (IllegalAccessException e) {
-            assertTrue("Should not have got an exception when instantiing the rpc scheduler: " + e,
-                false);
-        }
-    }
-
-    /**
-     * Ensure that we can't configure the index priority ranges inside the hbase ranges
-     * @throws Exception
-     */
-    @Test
-    public void testValidateIndexPriorityRanges() throws Exception {
-        Configuration conf = new Configuration(false);
-        // standard configs should be fine
-        PhoenixIndexRpcSchedulerFactory factory = new PhoenixIndexRpcSchedulerFactory();
-        factory.create(conf, null);
-
-        setMinMax(conf, 0, 4);
-        factory.create(conf, null);
-
-        setMinMax(conf, 101, 102);
-        factory.create(conf, null);
-
-        setMinMax(conf, 102, 101);
-        try {
-            factory.create(conf, null);
-            fail("Should not have allowed max less than min");
-        } catch (IllegalArgumentException e) {
-            // expected
-        }
-
-        setMinMax(conf, 5, 6);
-        try {
-            factory.create(conf, null);
-            fail("Should not have allowed min in range");
-        } catch (IllegalArgumentException e) {
-            // expected
-        }
-
-        setMinMax(conf, 6, 60);
-        try {
-            factory.create(conf, null);
-            fail("Should not have allowed min/max in hbase range");
-        } catch (IllegalArgumentException e) {
-            // expected
-        }
-
-        setMinMax(conf, 6, 101);
-        try {
-            factory.create(conf, null);
-            fail("Should not have allowed in range");
-        } catch (IllegalArgumentException e) {
-            // expected
-        }
-    }
-
-    private void setMinMax(Configuration conf, int min, int max) {
-        conf.setInt(PhoenixIndexRpcSchedulerFactory.MIN_INDEX_PRIOIRTY_KEY, min);
-        conf.setInt(PhoenixIndexRpcSchedulerFactory.MAX_INDEX_PRIOIRTY_KEY, max);
-    }
-}
\ No newline at end of file