You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/05/26 06:27:31 UTC
git commit: Revert Use higher priority queue for index updates to
prevent deadlock
Repository: incubator-phoenix
Updated Branches:
refs/heads/master edc75a2c2 -> 817380820
Revert Use higher priority queue for index updates to prevent deadlock
Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/81738082
Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/81738082
Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/81738082
Branch: refs/heads/master
Commit: 817380820a34b28406067cc529395fb85ce862b2
Parents: edc75a2
Author: James Taylor <jt...@salesforce.com>
Authored: Sun May 25 21:28:31 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Sun May 25 21:28:31 2014 -0700
----------------------------------------------------------------------
.../phoenix/end2end/index/IndexHandlerIT.java | 166 -------------------
.../hbase/ipc/PhoenixIndexRpcScheduler.java | 158 ------------------
.../PhoenixIndexRpcSchedulerFactory.java | 75 ---------
.../index/IndexQosRpcControllerFactory.java | 95 -----------
.../index/table/CoprocessorHTableFactory.java | 70 ++++----
.../hbase/ipc/PhoenixIndexRpcSchedulerTest.java | 93 -----------
.../PhoenixIndexRpcSchedulerFactoryTest.java | 104 ------------
7 files changed, 29 insertions(+), 732 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/81738082/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java
deleted file mode 100644
index 7efaf71..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end.index;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScannable;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.regionserver.PhoenixIndexRpcSchedulerFactory;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.hbase.index.IndexQosRpcControllerFactory;
-import org.apache.phoenix.hbase.index.TableName;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/**
- * Comprehensive test that ensures we are adding custom index handlers
- */
-public class IndexHandlerIT {
-
- public static class CountingIndexClientRpcFactory extends RpcControllerFactory {
-
- private IndexQosRpcControllerFactory delegate;
-
- public CountingIndexClientRpcFactory(Configuration conf) {
- super(conf);
- this.delegate = new IndexQosRpcControllerFactory(conf);
- }
-
- @Override
- public PayloadCarryingRpcController newController() {
- PayloadCarryingRpcController controller = delegate.newController();
- return new CountingIndexClientRpcController(controller);
- }
-
- @Override
- public PayloadCarryingRpcController newController(CellScanner cellScanner) {
- PayloadCarryingRpcController controller = delegate.newController(cellScanner);
- return new CountingIndexClientRpcController(controller);
- }
-
- @Override
- public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) {
- PayloadCarryingRpcController controller = delegate.newController(cellIterables);
- return new CountingIndexClientRpcController(controller);
- }
- }
-
- public static class CountingIndexClientRpcController extends
- DelegatingPayloadCarryingRpcController {
-
- private static Map<Integer, Integer> priorityCounts = new HashMap<Integer, Integer>();
-
- public CountingIndexClientRpcController(PayloadCarryingRpcController delegate) {
- super(delegate);
- }
-
- @Override
- public void setPriority(int pri) {
- Integer count = priorityCounts.get(pri);
- if (count == 0) {
- count = new Integer(0);
- }
- count = count.intValue() + 1;
- priorityCounts.put(pri, count);
-
- }
- }
-
- private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
-
- private static final byte[] row = Bytes.toBytes("row");
- private static final byte[] family = Bytes.toBytes("FAM");
- private static final byte[] qual = Bytes.toBytes("qual");
- private static final HColumnDescriptor FAM1 = new HColumnDescriptor(family);
-
- @Rule
- public TableName TestTable = new TableName();
-
- @BeforeClass
- public static void setupCluster() throws Exception {
- UTIL.startMiniCluster();
- }
-
- @AfterClass
- public static void shutdownCluster() throws Exception {
- UTIL.shutdownMiniCluster();
- }
-
- @Before
- public void setup() throws Exception {
- HTableDescriptor desc =
- new HTableDescriptor(org.apache.hadoop.hbase.TableName.valueOf(TestTable
- .getTableNameString()));
- desc.addFamily(FAM1);
-
- // create the table
- HBaseAdmin admin = UTIL.getHBaseAdmin();
- admin.createTable(desc);
- }
-
- @After
- public void cleanup() throws Exception {
- HBaseAdmin admin = UTIL.getHBaseAdmin();
- admin.disableTable(TestTable.getTableName());
- admin.deleteTable(TestTable.getTableName());
- }
-
- @Test
- public void testClientWritesWithPriority() throws Exception {
- Configuration conf = new Configuration(UTIL.getConfiguration());
- // add the keys for our rpc factory
- conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
- CountingIndexClientRpcFactory.class.getName());
- // and set the index table as the current table
- conf.setStrings(IndexQosRpcControllerFactory.INDEX_TABLE_NAMES_KEY,
- TestTable.getTableNameString());
- HTable table = new HTable(conf, TestTable.getTableName());
-
- // do a write to the table
- Put p = new Put(row);
- p.add(family, qual, new byte[] { 1, 0, 1, 0 });
- table.put(p);
- table.flushCommits();
-
- // check the counts on the rpc controller
- assertEquals("Didn't get the expected number of index priority writes!", (int) 1,
- (int) CountingIndexClientRpcController.priorityCounts
- .get(PhoenixIndexRpcSchedulerFactory.DEFAULT_INDEX_MIN_PRIORITY));
-
- table.close();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/81738082/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java
deleted file mode 100644
index e5bd5d1..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ipc.CallRunner;
-import org.apache.hadoop.hbase.ipc.RpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcServer;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
-/**
- * {@link RpcScheduler} that first checks to see if this is an index update before passing off the
- * call to the delegate {@link RpcScheduler}.
- * <p>
- * We reserve the range (200, 250], by default (though it is configurable), for index priority
- * writes. Currently, we don't do any prioritization within that range - all index writes are
- * treated with the same priority and put into the same queue.
- */
-public class PhoenixIndexRpcScheduler implements RpcScheduler {
-
- private LinkedBlockingQueue<CallRunner> indexCallQueue;
- private RpcScheduler delegate;
- private final int handlerCount;
- private volatile boolean running;
- private int port;
- private final List<Thread> handlers = Lists.newArrayList();
- private int minPriority;
- private int maxPriority;
-
- public PhoenixIndexRpcScheduler(int indexHandlerCount, Configuration conf,
- RpcScheduler delegate, int minPriority, int maxPriority) {
- int maxQueueLength =
- conf.getInt("ipc.server.max.callqueue.length", indexHandlerCount
- * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
- this.minPriority = minPriority;
- this.maxPriority = maxPriority;
-
- this.indexCallQueue = new LinkedBlockingQueue<CallRunner>(maxQueueLength);
- this.handlerCount = indexHandlerCount;
- this.delegate = delegate;
- }
-
- @Override
- public void init(Context context) {
- delegate.init(context);
- this.port = context.getListenerAddress().getPort();
- }
-
- @Override
- public void start() {
- delegate.start();
- running = true;
- startHandlers(handlerCount, indexCallQueue, "PhoenixIndexing.");
- }
-
- @Override
- public void stop() {
- running = false;
- for (Thread handler : handlers) {
- handler.interrupt();
- }
- delegate.stop();
- }
-
- @Override
- public void dispatch(CallRunner callTask) throws InterruptedException, IOException {
- RpcServer.Call call = callTask.getCall();
- int priority = call.header.getPriority();
- if (minPriority <= priority && priority < maxPriority) {
- indexCallQueue.put(callTask);
- } else {
- delegate.dispatch(callTask);
- }
- }
-
- @Override
- public int getGeneralQueueLength() {
- // not the best way to calculate, but don't have a better way to hook
- // into metrics at the moment
- return this.delegate.getGeneralQueueLength() + indexCallQueue.size();
- }
-
- @Override
- public int getPriorityQueueLength() {
- return this.delegate.getPriorityQueueLength();
- }
-
- @Override
- public int getReplicationQueueLength() {
- return this.delegate.getReplicationQueueLength();
- }
-
- // ****************************************************
- // Below copied from SimpleRpcScheduler for visibility
- // *****************************************************
- private void startHandlers(int handlerCount, final BlockingQueue<CallRunner> callQueue,
- String threadNamePrefix) {
- for (int i = 0; i < handlerCount; i++) {
- Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- consumerLoop(callQueue);
- }
- });
- t.setDaemon(true);
- t.setName(Strings.nullToEmpty(threadNamePrefix) + "RpcServer.handler=" + i + ",port="
- + port);
- t.start();
- handlers.add(t);
- }
- }
-
- private void consumerLoop(BlockingQueue<CallRunner> myQueue) {
- boolean interrupted = false;
- try {
- while (running) {
- try {
- CallRunner task = myQueue.take();
- task.run();
- } catch (InterruptedException e) {
- interrupted = true;
- }
- }
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- @VisibleForTesting
- public void setIndexCallQueueForTesting(LinkedBlockingQueue<CallRunner> indexQueue) {
- this.indexCallQueue = indexQueue;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/81738082/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactory.java
deleted file mode 100644
index 6fbf462..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactory.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ipc.PhoenixIndexRpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcScheduler;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Factory to create a {@link PhoenixIndexRpcScheduler}. In this package so we can access the
- * {@link SimpleRpcSchedulerFactory}.
- */
-public class PhoenixIndexRpcSchedulerFactory implements RpcSchedulerFactory {
-
- private static final String INDEX_HANDLER_COUNT_KEY =
- "org.apache.phoenix.regionserver.index.handler.count";
- private static final int DEFAULT_INDEX_HANDLER_COUNT = 30;
-
- /**
- * HConstants#HIGH_QOS is the max we will see to a standard table. We go higher to differentiate
- * and give some room for things in the middle
- */
- public static final int DEFAULT_INDEX_MIN_PRIORITY = 200;
- public static final int DEFAULT_INDEX_MAX_PRIORITY = 250;
- public static final String MIN_INDEX_PRIOIRTY_KEY =
- "org.apache.phoenix.regionserver.index.priority.min";
- public static final String MAX_INDEX_PRIOIRTY_KEY =
- "org.apache.phoenix.regionserver.index.priority.max";
-
- @Override
- public RpcScheduler create(Configuration conf, RegionServerServices services) {
- // create the delegate scheduler
- RpcScheduler delegate = new SimpleRpcSchedulerFactory().create(conf, services);
- int indexHandlerCount = conf.getInt(INDEX_HANDLER_COUNT_KEY, DEFAULT_INDEX_HANDLER_COUNT);
- int minPriority = getMinPriority(conf);
- int maxPriority = conf.getInt(MAX_INDEX_PRIOIRTY_KEY, DEFAULT_INDEX_MAX_PRIORITY);
- // make sure the ranges are outside the warning ranges
- Preconditions.checkArgument(maxPriority > minPriority, "Max index priority (" + maxPriority
- + ") must be larger than min priority (" + minPriority + ")");
- boolean allSmaller =
- minPriority < HConstants.REPLICATION_QOS
- && maxPriority < HConstants.REPLICATION_QOS;
- boolean allLarger = minPriority > HConstants.HIGH_QOS;
- Preconditions.checkArgument(allSmaller || allLarger, "Index priority range (" + minPriority
- + ", " + maxPriority + ") must be outside HBase priority range ("
- + HConstants.REPLICATION_QOS + ", " + HConstants.HIGH_QOS + ")");
-
- PhoenixIndexRpcScheduler scheduler =
- new PhoenixIndexRpcScheduler(indexHandlerCount, conf, delegate, minPriority,
- maxPriority);
- return scheduler;
- }
-
- public static int getMinPriority(Configuration conf) {
- return conf.getInt(MIN_INDEX_PRIOIRTY_KEY, DEFAULT_INDEX_MIN_PRIORITY);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/81738082/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java
deleted file mode 100644
index 04a0f7c..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hbase.index;
-
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScannable;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.regionserver.PhoenixIndexRpcSchedulerFactory;
-
-/**
- * {@link RpcControllerFactory} that overrides the standard {@link PayloadCarryingRpcController} to
- * allow the configured index tables (via {@link #INDEX_TABLE_NAMES_KEY}) to use the Index priority.
- */
-public class IndexQosRpcControllerFactory extends RpcControllerFactory {
-
- public static final String INDEX_TABLE_NAMES_KEY = "phoenix.index.rpc.controller.index-tables";
-
- public IndexQosRpcControllerFactory(Configuration conf) {
- super(conf);
- }
-
- @Override
- public PayloadCarryingRpcController newController() {
- PayloadCarryingRpcController delegate = super.newController();
- return new IndexQosRpcController(delegate, conf);
- }
-
- @Override
- public PayloadCarryingRpcController newController(CellScanner cellScanner) {
- PayloadCarryingRpcController delegate = super.newController(cellScanner);
- return new IndexQosRpcController(delegate, conf);
- }
-
- @Override
- public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) {
- PayloadCarryingRpcController delegate = super.newController(cellIterables);
- return new IndexQosRpcController(delegate, conf);
- }
-
- /**
- * @param tableName name of the index table
- * @return configuration key for if a table should have Index QOS writes (its a target index
- * table)
- */
- public static String getTableIndexQosConfKey(String tableName) {
- return "phoenix.index.table.qos._" + tableName;
- }
-
- private class IndexQosRpcController extends DelegatingPayloadCarryingRpcController {
-
- private Configuration conf;
- private int priority;
-
- public IndexQosRpcController(PayloadCarryingRpcController delegate, Configuration conf) {
- super(delegate);
- this.conf = conf;
- this.priority = PhoenixIndexRpcSchedulerFactory.getMinPriority(conf);
- }
-
- @Override
- public void setPriority(final TableName tn) {
- // if its an index table, then we override to the index priority
- if (isIndexTable(tn)) {
- setPriority(this.priority);
- } else {
- super.setPriority(tn);
- }
- }
-
- private boolean isIndexTable(TableName tn) {
- return conf.get(getTableIndexQosConfKey(tn.getNameAsString())) == null;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/81738082/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
index 8ccb0f3..8ef3e4f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.phoenix.hbase.index.table;
import java.io.IOException;
@@ -26,56 +27,43 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.regionserver.PhoenixIndexRpcSchedulerFactory;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.hbase.index.IndexQosRpcControllerFactory;
+
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
public class CoprocessorHTableFactory implements HTableFactory {
- /** Number of milliseconds per-interval to retry zookeeper */
- private static final String ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL =
- "zookeeper.recovery.retry.intervalmill";
- /** Number of retries for zookeeper */
- private static final String ZOOKEEPER_RECOVERY_RETRY_KEY = "zookeeper.recovery.retry";
- private static final Log LOG = LogFactory.getLog(CoprocessorHTableFactory.class);
- private CoprocessorEnvironment e;
+ /** Number of milliseconds per-interval to retry zookeeper */
+ private static final String ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL = "zookeeper.recovery.retry.intervalmill";
+ /** Number of retries for zookeeper */
+ private static final String ZOOKEEPER_RECOVERY_RETRY_KEY = "zookeeper.recovery.retry";
+ private static final Log LOG = LogFactory.getLog(CoprocessorHTableFactory.class);
+ private CoprocessorEnvironment e;
- public CoprocessorHTableFactory(CoprocessorEnvironment e) {
- this.e = e;
- }
+ public CoprocessorHTableFactory(CoprocessorEnvironment e) {
+ this.e = e;
+ }
- @Override
- public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
- Configuration conf = e.getConfiguration();
- // make sure writers fail fast
- IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
- IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_PAUSE, 1000);
- IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_KEY, 3);
- IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL, 100);
- IndexManagementUtil.setIfNotSet(conf, HConstants.ZK_SESSION_TIMEOUT, 30000);
- IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_RPC_TIMEOUT_KEY, 5000);
+ @Override
+ public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
+ Configuration conf = e.getConfiguration();
+ // make sure writers fail fast
+ IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
+ IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_PAUSE, 1000);
+ IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_KEY, 3);
+ IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL, 100);
+ IndexManagementUtil.setIfNotSet(conf, HConstants.ZK_SESSION_TIMEOUT, 30000);
+ IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_RPC_TIMEOUT_KEY, 5000);
- // make sure we use the index priority writer for our rpcs
- conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
- PhoenixIndexRpcSchedulerFactory.class.getName());
- // make sure we include the index table in the tables we need to track
- String tableName = Bytes.toString(tablename.copyBytesIfNecessary());
- String confKey = IndexQosRpcControllerFactory.getTableIndexQosConfKey(tableName);
- if (conf.get(confKey) == null) {
- conf.setBoolean(confKey, true);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Creating new HTable: " + tableName);
- }
- return this.e.getTable(TableName.valueOf(tablename.copyBytesIfNecessary()));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating new HTable: " + Bytes.toString(tablename.copyBytesIfNecessary()));
}
+ return this.e.getTable(TableName.valueOf(tablename.copyBytesIfNecessary()));
+ }
- @Override
- public void shutdown() {
- // noop
- }
+ @Override
+ public void shutdown() {
+ // noop
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/81738082/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
deleted file mode 100644
index 875456c..0000000
--- a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.ipc.RpcScheduler.Context;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-/**
- * Test that the rpc scheduler schedules index writes to the index handler queue and sends
- * everything else to the standard queues
- */
-public class PhoenixIndexRpcSchedulerTest {
-
- private static final Configuration conf = HBaseConfiguration.create();
-
- @Test
- public void testIndexPriorityWritesToIndexHandler() throws Exception {
- RpcScheduler mock = Mockito.mock(RpcScheduler.class);
-
- PhoenixIndexRpcScheduler scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 200, 250);
- LinkedBlockingQueue<CallRunner> queue = new LinkedBlockingQueue<CallRunner>();
- scheduler.setIndexCallQueueForTesting(queue);
- dispatchCallWithPriority(scheduler, 200);
- queue.poll(20, TimeUnit.SECONDS);
-
- // try again, this time we tweak the ranges we support
- scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 101, 110);
- scheduler.setIndexCallQueueForTesting(queue);
- dispatchCallWithPriority(scheduler, 101);
- queue.poll(20, TimeUnit.SECONDS);
-
- Mockito.verify(mock, Mockito.times(2)).init(Mockito.any(Context.class));
- Mockito.verifyNoMoreInteractions(mock);
- }
-
- /**
- * Test that we delegate to the passed {@link RpcScheduler} when the call priority is outside
- * the index range
- * @throws Exception
- */
- @Test
- public void testDelegateWhenOutsideRange() throws Exception {
- RpcScheduler mock = Mockito.mock(RpcScheduler.class);
- PhoenixIndexRpcScheduler scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 200, 250);
- dispatchCallWithPriority(scheduler, 100);
- dispatchCallWithPriority(scheduler, 250);
-
- // try again, this time we tweak the ranges we support
- scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 101, 110);
- dispatchCallWithPriority(scheduler, 200);
- dispatchCallWithPriority(scheduler, 110);
-
- Mockito.verify(mock, Mockito.times(4)).init(Mockito.any(Context.class));
- Mockito.verify(mock, Mockito.times(4)).dispatch(Mockito.any(CallRunner.class));
- Mockito.verifyNoMoreInteractions(mock);
- }
-
- private void dispatchCallWithPriority(RpcScheduler scheduler, int priority) throws Exception {
- CallRunner task = Mockito.mock(CallRunner.class);
- RequestHeader header = RequestHeader.newBuilder().setPriority(priority).build();
- RpcServer server = new RpcServer(null, "test-rpcserver", null, null, conf, scheduler);
- RpcServer.Call call =
- server.new Call(0, null, null, header, null, null, null, null, 10, null);
- Mockito.when(task.getCall()).thenReturn(call);
-
- scheduler.dispatch(task);
-
- Mockito.verify(task).getCall();
- Mockito.verifyNoMoreInteractions(task);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/81738082/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactoryTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactoryTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactoryTest.java
deleted file mode 100644
index 1c78eb2..0000000
--- a/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactoryTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
-
-public class PhoenixIndexRpcSchedulerFactoryTest {
-
- @Test
- public void ensureInstantiation() throws Exception {
- Configuration conf = new Configuration(false);
- conf.setClass(HRegionServer.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
- PhoenixIndexRpcSchedulerFactory.class, RpcSchedulerFactory.class);
- // kinda lame that we copy the copy from the regionserver to do this and can't use a static
- // method, but meh
- try {
- Class<?> rpcSchedulerFactoryClass =
- conf.getClass(HRegionServer.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
- SimpleRpcSchedulerFactory.class);
- Object o = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
- assertTrue(o instanceof PhoenixIndexRpcSchedulerFactory);
- } catch (InstantiationException e) {
- assertTrue("Should not have got an exception when instantiing the rpc scheduler: " + e,
- false);
- } catch (IllegalAccessException e) {
- assertTrue("Should not have got an exception when instantiing the rpc scheduler: " + e,
- false);
- }
- }
-
- /**
- * Ensure that we can't configure the index priority ranges inside the hbase ranges
- * @throws Exception
- */
- @Test
- public void testValidateIndexPriorityRanges() throws Exception {
- Configuration conf = new Configuration(false);
- // standard configs should be fine
- PhoenixIndexRpcSchedulerFactory factory = new PhoenixIndexRpcSchedulerFactory();
- factory.create(conf, null);
-
- setMinMax(conf, 0, 4);
- factory.create(conf, null);
-
- setMinMax(conf, 101, 102);
- factory.create(conf, null);
-
- setMinMax(conf, 102, 101);
- try {
- factory.create(conf, null);
- fail("Should not have allowed max less than min");
- } catch (IllegalArgumentException e) {
- // expected
- }
-
- setMinMax(conf, 5, 6);
- try {
- factory.create(conf, null);
- fail("Should not have allowed min in range");
- } catch (IllegalArgumentException e) {
- // expected
- }
-
- setMinMax(conf, 6, 60);
- try {
- factory.create(conf, null);
- fail("Should not have allowed min/max in hbase range");
- } catch (IllegalArgumentException e) {
- // expected
- }
-
- setMinMax(conf, 6, 101);
- try {
- factory.create(conf, null);
- fail("Should not have allowed in range");
- } catch (IllegalArgumentException e) {
- // expected
- }
- }
-
- private void setMinMax(Configuration conf, int min, int max) {
- conf.setInt(PhoenixIndexRpcSchedulerFactory.MIN_INDEX_PRIOIRTY_KEY, min);
- conf.setInt(PhoenixIndexRpcSchedulerFactory.MAX_INDEX_PRIOIRTY_KEY, max);
- }
-}
\ No newline at end of file