You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/04/16 16:31:47 UTC
[06/50] [abbrv] phoenix git commit: PHOENIX-1457 Use high priority
queue for metadata endpoint calls
PHOENIX-1457 Use high priority queue for metadata endpoint calls
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a7d7dfb5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a7d7dfb5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a7d7dfb5
Branch: refs/heads/calcite
Commit: a7d7dfb52622a586482030ae6904d0c53ed7a4af
Parents: b256fde
Author: Thomas D'Silva <tw...@gmail.com>
Authored: Tue Mar 24 17:17:44 2015 -0700
Committer: Thomas <td...@salesforce.com>
Committed: Fri Mar 27 11:45:41 2015 -0700
----------------------------------------------------------------------
.../phoenix/end2end/index/IndexHandlerIT.java | 12 +-
.../phoenix/end2end/index/IndexQosIT.java | 243 -------------------
.../apache/phoenix/rpc/PhoenixClientRpcIT.java | 122 ++++++++++
.../apache/phoenix/rpc/PhoenixServerRpcIT.java | 235 ++++++++++++++++++
.../TestPhoenixIndexRpcSchedulerFactory.java | 64 +++++
.../hbase/ipc/PhoenixIndexRpcScheduler.java | 123 ----------
.../hadoop/hbase/ipc/PhoenixRpcScheduler.java | 123 ++++++++++
.../hbase/ipc/PhoenixRpcSchedulerFactory.java | 95 ++++++++
.../controller/ClientRpcControllerFactory.java | 60 +++++
.../ipc/controller/IndexRpcController.java | 51 ++++
.../ipc/controller/MetadataRpcController.java | 55 +++++
.../controller/ServerRpcControllerFactory.java | 62 +++++
.../index/IndexQosRpcControllerFactory.java | 82 -------
.../ipc/PhoenixIndexRpcSchedulerFactory.java | 90 -------
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 4 -
.../org/apache/phoenix/query/QueryServices.java | 5 +-
.../phoenix/query/QueryServicesOptions.java | 12 +-
.../org/apache/phoenix/util/SchemaUtil.java | 7 -
.../hbase/ipc/PhoenixIndexRpcSchedulerTest.java | 16 +-
.../PhoenixIndexRpcSchedulerFactoryTest.java | 106 --------
.../PhoenixRpcSchedulerFactoryTest.java | 125 ++++++++++
.../java/org/apache/phoenix/query/BaseTest.java | 12 +-
22 files changed, 1023 insertions(+), 681 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java
index 1507d6b..20a780a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java
@@ -35,8 +35,8 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.hbase.index.IndexQosRpcControllerFactory;
import org.apache.phoenix.hbase.index.TableName;
import org.apache.phoenix.query.QueryServicesOptions;
import org.junit.After;
@@ -53,11 +53,11 @@ public class IndexHandlerIT {
public static class CountingIndexClientRpcFactory extends RpcControllerFactory {
- private IndexQosRpcControllerFactory delegate;
+ private ServerRpcControllerFactory delegate;
public CountingIndexClientRpcFactory(Configuration conf) {
super(conf);
- this.delegate = new IndexQosRpcControllerFactory(conf);
+ this.delegate = new ServerRpcControllerFactory(conf);
}
@Override
@@ -146,8 +146,8 @@ public class IndexHandlerIT {
conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
CountingIndexClientRpcFactory.class.getName());
// and set the index table as the current table
- conf.setStrings(IndexQosRpcControllerFactory.INDEX_TABLE_NAMES_KEY,
- TestTable.getTableNameString());
+// conf.setStrings(PhoenixRpcControllerFactory.INDEX_TABLE_NAMES_KEY,
+// TestTable.getTableNameString());
HTable table = new HTable(conf, TestTable.getTableName());
// do a write to the table
@@ -159,7 +159,7 @@ public class IndexHandlerIT {
// check the counts on the rpc controller
assertEquals("Didn't get the expected number of index priority writes!", 1,
(int) CountingIndexClientRpcController.priorityCounts
- .get(QueryServicesOptions.DEFAULT_INDEX_MIN_PRIORITY));
+ .get(QueryServicesOptions.DEFAULT_INDEX_PRIORITY));
table.close();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexQosIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexQosIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexQosIT.java
deleted file mode 100644
index bab8f38..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexQosIT.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
- * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
- * governing permissions and limitations under the License.
- */
-package org.apache.phoenix.end2end.index;
-
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
-import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
-import static org.apache.phoenix.util.TestUtil.LOCALHOST;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.ipc.BalancedQueueRpcExecutor;
-import org.apache.hadoop.hbase.ipc.CallRunner;
-import org.apache.hadoop.hbase.ipc.PhoenixIndexRpcScheduler;
-import org.apache.hadoop.hbase.ipc.PriorityFunction;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.ipc.RpcExecutor;
-import org.apache.hadoop.hbase.ipc.RpcScheduler;
-import org.apache.hadoop.hbase.master.AssignmentManager;
-import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.RSRpcServices;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.hbase.index.IndexQosRpcControllerFactory;
-import org.apache.phoenix.hbase.index.ipc.PhoenixIndexRpcSchedulerFactory;
-import org.apache.phoenix.jdbc.PhoenixTestDriver;
-import org.apache.phoenix.query.BaseTest;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.QueryUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.SchemaUtil;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
-
-
-@Category(NeedsOwnMiniClusterTest.class)
-public class IndexQosIT extends BaseTest {
-
- private static final String SCHEMA_NAME = "S";
- private static final String INDEX_TABLE_NAME = "I";
- private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "T");
- private static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "I");
- private static final int NUM_SLAVES = 2;
-
- private static String url;
- private static PhoenixTestDriver driver;
- private HBaseTestingUtility util;
- private HBaseAdmin admin;
- private Configuration conf;
- private static RpcExecutor spyRpcExecutor = Mockito.spy(new BalancedQueueRpcExecutor("test-queue", 30, 1, 300));
-
- /**
- * Factory that uses a spyed RpcExecutor
- */
- public static class TestPhoenixIndexRpcSchedulerFactory extends PhoenixIndexRpcSchedulerFactory {
- @Override
- public RpcScheduler create(Configuration conf, PriorityFunction priorityFunction, Abortable abortable) {
- PhoenixIndexRpcScheduler phoenixIndexRpcScheduler = (PhoenixIndexRpcScheduler)super.create(conf, priorityFunction, abortable);
- phoenixIndexRpcScheduler.setExecutorForTesting(spyRpcExecutor);
- return phoenixIndexRpcScheduler;
- }
- }
-
- @Before
- public void doSetup() throws Exception {
- conf = HBaseConfiguration.create();
- setUpConfigForMiniCluster(conf);
- conf.set(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
- TestPhoenixIndexRpcSchedulerFactory.class.getName());
- conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, IndexQosRpcControllerFactory.class.getName());
- util = new HBaseTestingUtility(conf);
- // start cluster with 2 region servers
- util.startMiniCluster(NUM_SLAVES);
- admin = util.getHBaseAdmin();
- String clientPort = util.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
- url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort
- + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
- driver = initAndRegisterDriver(url, ReadOnlyProps.EMPTY_PROPS);
- }
-
- @After
- public void tearDown() throws Exception {
- try {
- destroyDriver(driver);
- if (admin!=null) {
- admin.close();
- }
- } finally {
- util.shutdownMiniCluster();
- }
- }
-
- @Test
- public void testIndexWriteQos() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = driver.connect(url, props);
-
- // create the table
- conn.createStatement().execute(
- "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-
- // create the index
- conn.createStatement().execute(
- "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
-
- byte[] dataTableName = Bytes.toBytes(DATA_TABLE_FULL_NAME);
- byte[] indexTableName = Bytes.toBytes(INDEX_TABLE_FULL_NAME);
- MiniHBaseCluster cluster = util.getHBaseCluster();
- HMaster master = cluster.getMaster();
- AssignmentManager am = master.getAssignmentManager();
-
- // verify there is only a single region for data table
- List<HRegionInfo> tableRegions = admin.getTableRegions(dataTableName);
- assertEquals("Expected single region for " + dataTableName, tableRegions.size(), 1);
- HRegionInfo dataHri = tableRegions.get(0);
-
- // verify there is only a single region for index table
- tableRegions = admin.getTableRegions(indexTableName);
- HRegionInfo indexHri = tableRegions.get(0);
- assertEquals("Expected single region for " + indexTableName, tableRegions.size(), 1);
-
- ServerName dataServerName = am.getRegionStates().getRegionServerOfRegion(dataHri);
- ServerName indexServerName = am.getRegionStates().getRegionServerOfRegion(indexHri);
-
- // if data table and index table are on same region server, move the index table to the other region server
- if (dataServerName.equals(indexServerName)) {
- HRegionServer server1 = util.getHBaseCluster().getRegionServer(0);
- HRegionServer server2 = util.getHBaseCluster().getRegionServer(1);
- HRegionServer dstServer = null;
- HRegionServer srcServer = null;
- if (server1.getServerName().equals(indexServerName)) {
- dstServer = server2;
- srcServer = server1;
- } else {
- dstServer = server1;
- srcServer = server2;
- }
- byte[] encodedRegionNameInBytes = indexHri.getEncodedNameAsBytes();
- admin.move(encodedRegionNameInBytes, Bytes.toBytes(dstServer.getServerName().getServerName()));
- while (dstServer.getOnlineRegion(indexHri.getRegionName()) == null
- || dstServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameInBytes)
- || srcServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameInBytes)
- || master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
- // wait for the move to be finished
- Thread.sleep(1);
- }
- }
-
- dataHri = admin.getTableRegions(dataTableName).get(0);
- dataServerName = am.getRegionStates().getRegionServerOfRegion(dataHri);
- indexHri = admin.getTableRegions(indexTableName).get(0);
- indexServerName = am.getRegionStates().getRegionServerOfRegion(indexHri);
-
- // verify index and data tables are on different servers
- assertNotEquals("Index and Data table should be on different region servers dataServer " + dataServerName
- + " indexServer " + indexServerName, dataServerName, indexServerName);
-
- PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
- stmt.setString(1, "k1");
- stmt.setString(2, "v1");
- stmt.setString(3, "v2");
- stmt.execute();
- conn.commit();
-
- // run select query that should use the index
- String selectSql = "SELECT k, v2 from " + DATA_TABLE_FULL_NAME + " WHERE v1=?";
- stmt = conn.prepareStatement(selectSql);
- stmt.setString(1, "v1");
-
- // verify that the query does a range scan on the index table
- ResultSet rs = stmt.executeQuery("EXPLAIN " + selectSql);
- assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER S.I ['v1']", QueryUtil.getExplainPlan(rs));
-
- // verify that the correct results are returned
- rs = stmt.executeQuery();
- assertTrue(rs.next());
- assertEquals("k1", rs.getString(1));
- assertEquals("v2", rs.getString(2));
- assertFalse(rs.next());
-
- // drop index table
- conn.createStatement().execute(
- "DROP INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME );
- // create a data table with the same name as the index table
- conn.createStatement().execute(
- "CREATE TABLE " + INDEX_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-
- // upsert one row to the table (which has the same table name as the previous index table)
- stmt = conn.prepareStatement("UPSERT INTO " + INDEX_TABLE_FULL_NAME + " VALUES(?,?,?)");
- stmt.setString(1, "k1");
- stmt.setString(2, "v1");
- stmt.setString(3, "v2");
- stmt.execute();
- conn.commit();
-
- // run select query on the new table
- selectSql = "SELECT k, v2 from " + INDEX_TABLE_FULL_NAME + " WHERE v1=?";
- stmt = conn.prepareStatement(selectSql);
- stmt.setString(1, "v1");
-
- // verify that the correct results are returned
- rs = stmt.executeQuery();
- assertTrue(rs.next());
- assertEquals("k1", rs.getString(1));
- assertEquals("v2", rs.getString(2));
- assertFalse(rs.next());
-
- // verify that that index queue is used only once (for the first upsert)
- Mockito.verify(spyRpcExecutor).dispatch(Mockito.any(CallRunner.class));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixClientRpcIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixClientRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixClientRpcIT.java
new file mode 100644
index 0000000..c079a30
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixClientRpcIT.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.rpc;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.ipc.CallRunner;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory;
+import org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Maps;
+
+public class PhoenixClientRpcIT extends BaseOwnClusterHBaseManagedTimeIT {
+
+ private static final String SCHEMA_NAME = "S";
+ private static final String INDEX_TABLE_NAME = "I";
+ private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "T");
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2);
+ serverProps.put(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
+ TestPhoenixIndexRpcSchedulerFactory.class.getName());
+ serverProps.put(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, ServerRpcControllerFactory.class.getName());
+ Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+ clientProps.put(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, ClientRpcControllerFactory.class.getName());
+ NUM_SLAVES_BASE = 2;
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet()
+ .iterator()));
+ }
+
+ @AfterClass
+ public static void doTeardown() throws Exception {
+ TestPhoenixIndexRpcSchedulerFactory.reset();
+ }
+
+ @Test
+ public void testIndexQos() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = driver.connect(getUrl(), props);
+ try {
+ // create the table
+ conn.createStatement().execute(
+ "CREATE TABLE " + DATA_TABLE_FULL_NAME
+ + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) IMMUTABLE_ROWS=true");
+
+ // create the index
+ conn.createStatement().execute(
+ "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
+ stmt.setString(1, "k1");
+ stmt.setString(2, "v1");
+ stmt.setString(3, "v2");
+ stmt.execute();
+ conn.commit();
+
+ // run select query that should use the index
+ String selectSql = "SELECT k, v2 from " + DATA_TABLE_FULL_NAME + " WHERE v1=?";
+ stmt = conn.prepareStatement(selectSql);
+ stmt.setString(1, "v1");
+
+ // verify that the query does a range scan on the index table
+ ResultSet rs = stmt.executeQuery("EXPLAIN " + selectSql);
+ assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER S.I ['v1']", QueryUtil.getExplainPlan(rs));
+
+ // verify that the correct results are returned
+ rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals("k1", rs.getString(1));
+ assertEquals("v2", rs.getString(2));
+ assertFalse(rs.next());
+
+ // verify that index queue is not used (since the index writes originate from a client an not a region server)
+ Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor(), Mockito.never()).dispatch(Mockito.any(CallRunner.class));
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testMetadataQos() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = driver.connect(getUrl(), props);
+ try {
+ // create the table
+ conn.createStatement().execute("CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v VARCHAR)");
+ // verify that that metadata queue is used at least once
+ Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getMetadataRpcExecutor(), Mockito.atLeastOnce()).dispatch(Mockito.any(CallRunner.class));
+ } finally {
+ conn.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
new file mode 100644
index 0000000..de0ab84
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.rpc;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.ipc.CallRunner;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Maps;
+
+public class PhoenixServerRpcIT extends BaseOwnClusterHBaseManagedTimeIT {
+
+ private static final String SCHEMA_NAME = "S";
+ private static final String INDEX_TABLE_NAME = "I";
+ private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "T");
+ private static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "I");
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2);
+ serverProps.put(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
+ TestPhoenixIndexRpcSchedulerFactory.class.getName());
+ serverProps.put(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, ServerRpcControllerFactory.class.getName());
+ Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+ clientProps.put(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, RpcControllerFactory.class.getName());
+ NUM_SLAVES_BASE = 2;
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+ }
+
+ @AfterClass
+ public static void doTeardown() throws Exception {
+ TestPhoenixIndexRpcSchedulerFactory.reset();
+ }
+
+ @Test
+ public void testIndexQos() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = driver.connect(getUrl(), props);
+ try {
+ // create the table
+ conn.createStatement().execute(
+ "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+
+ // create the index
+ conn.createStatement().execute(
+ "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
+
+ ensureTablesOnDifferentRegionServers(DATA_TABLE_FULL_NAME, INDEX_TABLE_FULL_NAME);
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
+ stmt.setString(1, "k1");
+ stmt.setString(2, "v1");
+ stmt.setString(3, "v2");
+ stmt.execute();
+ conn.commit();
+
+ // run select query that should use the index
+ String selectSql = "SELECT k, v2 from " + DATA_TABLE_FULL_NAME + " WHERE v1=?";
+ stmt = conn.prepareStatement(selectSql);
+ stmt.setString(1, "v1");
+
+ // verify that the query does a range scan on the index table
+ ResultSet rs = stmt.executeQuery("EXPLAIN " + selectSql);
+ assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER S.I ['v1']", QueryUtil.getExplainPlan(rs));
+
+ // verify that the correct results are returned
+ rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals("k1", rs.getString(1));
+ assertEquals("v2", rs.getString(2));
+ assertFalse(rs.next());
+
+ // drop index table
+ conn.createStatement().execute(
+ "DROP INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME );
+ // create a data table with the same name as the index table
+ conn.createStatement().execute(
+ "CREATE TABLE " + INDEX_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+
+ // upsert one row to the table (which has the same table name as the previous index table)
+ stmt = conn.prepareStatement("UPSERT INTO " + INDEX_TABLE_FULL_NAME + " VALUES(?,?,?)");
+ stmt.setString(1, "k1");
+ stmt.setString(2, "v1");
+ stmt.setString(3, "v2");
+ stmt.execute();
+ conn.commit();
+
+ // run select query on the new table
+ selectSql = "SELECT k, v2 from " + INDEX_TABLE_FULL_NAME + " WHERE v1=?";
+ stmt = conn.prepareStatement(selectSql);
+ stmt.setString(1, "v1");
+
+ // verify that the correct results are returned
+ rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals("k1", rs.getString(1));
+ assertEquals("v2", rs.getString(2));
+ assertFalse(rs.next());
+
+ // verify that that index queue is used only once (for the first upsert)
+ Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class));
+ }
+ finally {
+ conn.close();
+ }
+ }
+
+ /**
+ * Verifies that the given tables each have a single region and are on
+ * different region servers. If they are on the same server moves tableName2
+ * to the other region server.
+ */
+ private void ensureTablesOnDifferentRegionServers(String tableName1, String tableName2) throws Exception {
+ byte[] table1 = Bytes.toBytes(tableName1);
+ byte[] table2 = Bytes.toBytes(tableName2);
+ HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TEST_PROPERTIES).getAdmin();
+ HBaseTestingUtility util = getUtility();
+ MiniHBaseCluster cluster = util.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ AssignmentManager am = master.getAssignmentManager();
+
+ // verify there is only a single region for data table
+ List<HRegionInfo> tableRegions = admin.getTableRegions(table1);
+ assertEquals("Expected single region for " + table1, tableRegions.size(), 1);
+ HRegionInfo hri1 = tableRegions.get(0);
+
+ // verify there is only a single region for index table
+ tableRegions = admin.getTableRegions(table2);
+ HRegionInfo hri2 = tableRegions.get(0);
+ assertEquals("Expected single region for " + table2, tableRegions.size(), 1);
+
+ ServerName serverName1 = am.getRegionStates().getRegionServerOfRegion(hri1);
+ ServerName serverName2 = am.getRegionStates().getRegionServerOfRegion(hri2);
+
+ // if data table and index table are on same region server, move the index table to the other region server
+ if (serverName1.equals(serverName2)) {
+ HRegionServer server1 = util.getHBaseCluster().getRegionServer(0);
+ HRegionServer server2 = util.getHBaseCluster().getRegionServer(1);
+ HRegionServer dstServer = null;
+ HRegionServer srcServer = null;
+ if (server1.getServerName().equals(serverName2)) {
+ dstServer = server2;
+ srcServer = server1;
+ } else {
+ dstServer = server1;
+ srcServer = server2;
+ }
+ byte[] encodedRegionNameInBytes = hri2.getEncodedNameAsBytes();
+ admin.move(encodedRegionNameInBytes, Bytes.toBytes(dstServer.getServerName().getServerName()));
+ while (dstServer.getOnlineRegion(hri2.getRegionName()) == null
+ || dstServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameInBytes)
+ || srcServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameInBytes)
+ || master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
+ // wait for the move to be finished
+ Thread.sleep(1);
+ }
+ }
+
+ hri1 = admin.getTableRegions(table1).get(0);
+ serverName1 = am.getRegionStates().getRegionServerOfRegion(hri1);
+ hri2 = admin.getTableRegions(table2).get(0);
+ serverName2 = am.getRegionStates().getRegionServerOfRegion(hri2);
+
+ // verify index and data tables are on different servers
+ assertNotEquals("Tables " + tableName1 + " and " + tableName2 + " should be on different region servers", serverName1, serverName2);
+ }
+
+ @Test
+ public void testMetadataQos() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = driver.connect(getUrl(), props);
+ try {
+ ensureTablesOnDifferentRegionServers(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
+ // create the table
+ conn.createStatement().execute(
+ "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v VARCHAR)");
+ // query the table from another connection, so that SYSTEM.STATS will be used
+ conn.createStatement().execute("SELECT * FROM "+DATA_TABLE_FULL_NAME);
+ // verify that that metadata queue is used once
+ Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getMetadataRpcExecutor()).dispatch(Mockito.any(CallRunner.class));
+ }
+ finally {
+ conn.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/it/java/org/apache/phoenix/rpc/TestPhoenixIndexRpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/TestPhoenixIndexRpcSchedulerFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/TestPhoenixIndexRpcSchedulerFactory.java
new file mode 100644
index 0000000..fb29985
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/TestPhoenixIndexRpcSchedulerFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.rpc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ipc.BalancedQueueRpcExecutor;
+import org.apache.hadoop.hbase.ipc.PhoenixRpcScheduler;
+import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.RpcExecutor;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.mockito.Mockito;
+
+public class TestPhoenixIndexRpcSchedulerFactory extends PhoenixRpcSchedulerFactory {
+
+ private static RpcExecutor indexRpcExecutor = Mockito.spy(new BalancedQueueRpcExecutor("test-index-queue", 30, 1,
+ 300));
+ private static RpcExecutor metadataRpcExecutor = Mockito.spy(new BalancedQueueRpcExecutor("test-metataqueue", 30,
+ 1, 300));
+
+ @Override
+ public RpcScheduler create(Configuration conf, PriorityFunction priorityFunction, Abortable abortable) {
+ PhoenixRpcScheduler phoenixIndexRpcScheduler = (PhoenixRpcScheduler)super.create(conf, priorityFunction, abortable);
+ phoenixIndexRpcScheduler.setIndexExecutorForTesting(indexRpcExecutor);
+ phoenixIndexRpcScheduler.setMetadataExecutorForTesting(metadataRpcExecutor);
+ return phoenixIndexRpcScheduler;
+ }
+
+ @Override
+ public RpcScheduler create(Configuration configuration, PriorityFunction priorityFunction) {
+ return create(configuration, priorityFunction, null);
+ }
+
+ public static RpcExecutor getIndexRpcExecutor() {
+ return indexRpcExecutor;
+ }
+
+ public static RpcExecutor getMetadataRpcExecutor() {
+ return metadataRpcExecutor;
+ }
+
+ public static void reset() {
+ Mockito.reset(metadataRpcExecutor);
+ Mockito.reset(indexRpcExecutor);
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java
deleted file mode 100644
index 4709304..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * {@link RpcScheduler} that first checks to see if this is an index update before passing off the
- * call to the delegate {@link RpcScheduler}.
- * <p>
- * We reserve the range (1000, 1050], by default (though it is configurable), for index priority
- * writes. Currently, we don't do any prioritization within that range - all index writes are
- * treated with the same priority and put into the same queue.
- */
-public class PhoenixIndexRpcScheduler extends RpcScheduler {
-
- // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4
- public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = "ipc.server.callqueue.read.share";
- public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
- "ipc.server.callqueue.handler.factor";
- private static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
-
- private RpcScheduler delegate;
- private int minPriority;
- private int maxPriority;
- private RpcExecutor callExecutor;
- private int port;
-
- public PhoenixIndexRpcScheduler(int indexHandlerCount, Configuration conf,
- RpcScheduler delegate, int minPriority, int maxPriority) {
- int maxQueueLength =
- conf.getInt("ipc.server.max.callqueue.length", indexHandlerCount
- * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
-
- // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4
- float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
- int numCallQueues =
- Math.max(1, Math.round(indexHandlerCount * callQueuesHandlersFactor));
-
- this.minPriority = minPriority;
- this.maxPriority = maxPriority;
- this.delegate = delegate;
-
- this.callExecutor =
- new BalancedQueueRpcExecutor("Index", indexHandlerCount, numCallQueues,
- maxQueueLength);
- }
-
- @Override
- public void init(Context context) {
- delegate.init(context);
- this.port = context.getListenerAddress().getPort();
- }
-
- @Override
- public void start() {
- delegate.start();
- callExecutor.start(port);
- }
-
- @Override
- public void stop() {
- delegate.stop();
- callExecutor.stop();
- }
-
- @Override
- public void dispatch(CallRunner callTask) throws InterruptedException, IOException {
- RpcServer.Call call = callTask.getCall();
- int priority = call.header.getPriority();
- if (minPriority <= priority && priority < maxPriority) {
- callExecutor.dispatch(callTask);
- } else {
- delegate.dispatch(callTask);
- }
- }
-
- @Override
- public int getGeneralQueueLength() {
- // not the best way to calculate, but don't have a better way to hook
- // into metrics at the moment
- return this.delegate.getGeneralQueueLength() + this.callExecutor.getQueueLength();
- }
-
- @Override
- public int getPriorityQueueLength() {
- return this.delegate.getPriorityQueueLength();
- }
-
- @Override
- public int getReplicationQueueLength() {
- return this.delegate.getReplicationQueueLength();
- }
-
- @Override
- public int getActiveRpcHandlerCount() {
- return this.delegate.getActiveRpcHandlerCount() + this.callExecutor.getActiveHandlerCount();
- }
-
- @VisibleForTesting
- public void setExecutorForTesting(RpcExecutor executor) {
- this.callExecutor = executor;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
new file mode 100644
index 0000000..e721271
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * {@link RpcScheduler} that first checks to see if this is an index or metedata update before passing off the
+ * call to the delegate {@link RpcScheduler}.
+ */
+public class PhoenixRpcScheduler extends RpcScheduler {
+
+ // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4
+ private static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "ipc.server.callqueue.handler.factor";
+ private static final String CALLQUEUE_LENGTH_CONF_KEY = "ipc.server.max.callqueue.length";
+ private static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
+
+ private RpcScheduler delegate;
+ private int indexPriority;
+ private int metadataPriority;
+ private RpcExecutor indexCallExecutor;
+ private RpcExecutor metadataCallExecutor;
+ private int port;
+
+ public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority, int metadataPriority) {
+ // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4
+ int maxQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+ float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
+ int numQueues = Math.max(1, Math.round(callQueuesHandlersFactor));
+
+ this.indexPriority = indexPriority;
+ this.metadataPriority = metadataPriority;
+ this.delegate = delegate;
+ this.indexCallExecutor = new BalancedQueueRpcExecutor("Index", 1, numQueues, maxQueueLength);
+ this.metadataCallExecutor = new BalancedQueueRpcExecutor("Metadata", 1, numQueues, maxQueueLength);
+ }
+
+ @Override
+ public void init(Context context) {
+ delegate.init(context);
+ this.port = context.getListenerAddress().getPort();
+ }
+
+ @Override
+ public void start() {
+ delegate.start();
+ indexCallExecutor.start(port);
+ metadataCallExecutor.start(port);
+ }
+
+ @Override
+ public void stop() {
+ delegate.stop();
+ indexCallExecutor.stop();
+ metadataCallExecutor.stop();
+ }
+
+ @Override
+ public void dispatch(CallRunner callTask) throws InterruptedException, IOException {
+ RpcServer.Call call = callTask.getCall();
+ int priority = call.header.getPriority();
+ if (indexPriority == priority) {
+ indexCallExecutor.dispatch(callTask);
+ } else if (metadataPriority == priority) {
+ metadataCallExecutor.dispatch(callTask);
+ } else {
+ delegate.dispatch(callTask);
+ }
+ }
+
+ @Override
+ public int getGeneralQueueLength() {
+ // not the best way to calculate, but don't have a better way to hook
+ // into metrics at the moment
+ return this.delegate.getGeneralQueueLength() + this.indexCallExecutor.getQueueLength() + this.metadataCallExecutor.getQueueLength();
+ }
+
+ @Override
+ public int getPriorityQueueLength() {
+ return this.delegate.getPriorityQueueLength();
+ }
+
+ @Override
+ public int getReplicationQueueLength() {
+ return this.delegate.getReplicationQueueLength();
+ }
+
+ @Override
+ public int getActiveRpcHandlerCount() {
+ return this.delegate.getActiveRpcHandlerCount() + this.indexCallExecutor.getActiveHandlerCount() + this.metadataCallExecutor.getActiveHandlerCount();
+ }
+
+ @VisibleForTesting
+ public void setIndexExecutorForTesting(RpcExecutor executor) {
+ this.indexCallExecutor = executor;
+ }
+
+ @VisibleForTesting
+ public void setMetadataExecutorForTesting(RpcExecutor executor) {
+ this.metadataCallExecutor = executor;
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
new file mode 100644
index 0000000..a697382
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
+import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Factory to create a {@link PhoenixRpcScheduler}. In this package so we can access the
+ * {@link SimpleRpcSchedulerFactory}.
+ */
+public class PhoenixRpcSchedulerFactory implements RpcSchedulerFactory {
+
+ private static final Log LOG = LogFactory.getLog(PhoenixRpcSchedulerFactory.class);
+
+ private static final String VERSION_TOO_OLD_FOR_INDEX_RPC =
+ "Running an older version of HBase (less than 0.98.4), Phoenix index RPC handling cannot be enabled.";
+
+ @Override
+ public RpcScheduler create(Configuration conf, PriorityFunction priorityFunction, Abortable abortable) {
+ // create the delegate scheduler
+ RpcScheduler delegate;
+ try {
+ // happens in <=0.98.4 where the scheduler factory is not visible
+ delegate = new SimpleRpcSchedulerFactory().create(conf, priorityFunction, abortable);
+ } catch (IllegalAccessError e) {
+ LOG.fatal(VERSION_TOO_OLD_FOR_INDEX_RPC);
+ throw e;
+ }
+
+ // get the index priority configs
+ int indexPriority = getIndexPriority(conf);
+ validatePriority(indexPriority);
+ // get the metadata priority configs
+ int metadataPriority = getMetadataPriority(conf);
+ validatePriority(metadataPriority);
+
+ // validate index and metadata priorities are not the same
+ Preconditions.checkArgument(indexPriority != metadataPriority, "Index and Metadata priority must not be same "+ indexPriority);
+ LOG.info("Using custom Phoenix Index RPC Handling with index rpc priority " + indexPriority + " and metadata rpc priority " + metadataPriority);
+
+ PhoenixRpcScheduler scheduler =
+ new PhoenixRpcScheduler(conf, delegate, indexPriority, metadataPriority);
+ return scheduler;
+ }
+
+ @Override
+ public RpcScheduler create(Configuration configuration, PriorityFunction priorityFunction) {
+ return create(configuration, priorityFunction, null);
+ }
+
+ /**
+ * Validates that the given priority does not overlap with the HBase priority range
+ */
+ private void validatePriority(int priority) {
+ Preconditions.checkArgument( priority < HConstants.NORMAL_QOS || priority > HConstants.HIGH_QOS, "priority cannot be within hbase priority range "
+ + HConstants.NORMAL_QOS +" to " + HConstants.HIGH_QOS );
+ }
+
+ public static int getIndexPriority(Configuration conf) {
+ return conf.getInt(QueryServices.INDEX_PRIOIRTY_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_PRIORITY);
+ }
+
+ public static int getMetadataPriority(Configuration conf) {
+ return conf.getInt(QueryServices.METADATA_PRIOIRTY_ATTRIB, QueryServicesOptions.DEFAULT_METADATA_PRIORITY);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ClientRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ClientRpcControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ClientRpcControllerFactory.java
new file mode 100644
index 0000000..5a7dcc2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ClientRpcControllerFactory.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc.controller;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+
+/**
+ * {@link RpcControllerFactory} that sets the priority of metadata rpc calls to be processed
+ * in its own queue.
+ */
+public class ClientRpcControllerFactory extends RpcControllerFactory {
+
+ public ClientRpcControllerFactory(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ public PayloadCarryingRpcController newController() {
+ PayloadCarryingRpcController delegate = super.newController();
+ return getController(delegate);
+ }
+
+ @Override
+ public PayloadCarryingRpcController newController(CellScanner cellScanner) {
+ PayloadCarryingRpcController delegate = super.newController(cellScanner);
+ return getController(delegate);
+ }
+
+ @Override
+ public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) {
+ PayloadCarryingRpcController delegate = super.newController(cellIterables);
+ return getController(delegate);
+ }
+
+ private PayloadCarryingRpcController getController(PayloadCarryingRpcController delegate) {
+ return new MetadataRpcController(delegate, conf);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/IndexRpcController.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/IndexRpcController.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/IndexRpcController.java
new file mode 100644
index 0000000..fdb1d33
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/IndexRpcController.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc.controller;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+
+class IndexRpcController extends DelegatingPayloadCarryingRpcController {
+
+ private final int priority;
+ private final String tracingTableName;
+
+ public IndexRpcController(PayloadCarryingRpcController delegate, Configuration conf) {
+ super(delegate);
+ this.priority = PhoenixRpcSchedulerFactory.getIndexPriority(conf);
+ this.tracingTableName = conf.get(QueryServices.TRACING_STATS_TABLE_NAME_ATTRIB,
+ QueryServicesOptions.DEFAULT_TRACING_STATS_TABLE_NAME);
+ }
+
+ @Override
+ public void setPriority(final TableName tn) {
+ if (!tn.isSystemTable() && !tn.getNameAsString().equals(tracingTableName)) {
+ setPriority(this.priority);
+ }
+ else {
+ super.setPriority(tn);
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
new file mode 100644
index 0000000..23b9f03
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc.controller;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+
+import com.google.common.collect.ImmutableList;
+
+class MetadataRpcController extends DelegatingPayloadCarryingRpcController {
+
+ private int priority;
+ // list of system tables
+ private static final List<String> SYSTEM_TABLE_NAMES = new ImmutableList.Builder<String>()
+ .add(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME)
+ .add(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)
+ .add(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME).build();
+
+ public MetadataRpcController(PayloadCarryingRpcController delegate,
+ Configuration conf) {
+ super(delegate);
+ this.priority = PhoenixRpcSchedulerFactory.getMetadataPriority(conf);
+ }
+
+ @Override
+ public void setPriority(final TableName tn) {
+ if (SYSTEM_TABLE_NAMES.contains(tn.getNameAsString())) {
+ setPriority(this.priority);
+ } else {
+ super.setPriority(tn);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerRpcControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerRpcControllerFactory.java
new file mode 100644
index 0000000..8c17eda
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerRpcControllerFactory.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc.controller;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+
+/**
+ * {@link RpcControllerFactory} that sets the priority of index and metadata rpc calls
+ * so that they are each processed in their own queues
+ */
+public class ServerRpcControllerFactory extends RpcControllerFactory {
+
+ public ServerRpcControllerFactory(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ public PayloadCarryingRpcController newController() {
+ PayloadCarryingRpcController delegate = super.newController();
+ return getController(delegate);
+ }
+
+ @Override
+ public PayloadCarryingRpcController newController(CellScanner cellScanner) {
+ PayloadCarryingRpcController delegate = super.newController(cellScanner);
+ return getController(delegate);
+ }
+
+ @Override
+ public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) {
+ PayloadCarryingRpcController delegate = super.newController(cellIterables);
+ return getController(delegate);
+ }
+
+ private PayloadCarryingRpcController getController(PayloadCarryingRpcController delegate) {
+ // construct a chain of controllers: metadata, index and standard controller
+ IndexRpcController indexRpcController = new IndexRpcController(delegate, conf);
+ return new MetadataRpcController(indexRpcController, conf);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java
deleted file mode 100644
index a192feb..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hbase.index;
-
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScannable;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.phoenix.hbase.index.ipc.PhoenixIndexRpcSchedulerFactory;
-import org.apache.phoenix.util.SchemaUtil;
-
-/**
- * {@link RpcControllerFactory} that overrides the standard {@link PayloadCarryingRpcController} to
- * allow the configured index tables (via {@link #INDEX_TABLE_NAMES_KEY}) to use the Index priority.
- */
-public class IndexQosRpcControllerFactory extends RpcControllerFactory {
-
- public static final String INDEX_TABLE_NAMES_KEY = "phoenix.index.rpc.controller.index-tables";
-
- public IndexQosRpcControllerFactory(Configuration conf) {
- super(conf);
- }
-
- @Override
- public PayloadCarryingRpcController newController() {
- PayloadCarryingRpcController delegate = super.newController();
- return new IndexQosRpcController(delegate, conf);
- }
-
- @Override
- public PayloadCarryingRpcController newController(CellScanner cellScanner) {
- PayloadCarryingRpcController delegate = super.newController(cellScanner);
- return new IndexQosRpcController(delegate, conf);
- }
-
- @Override
- public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) {
- PayloadCarryingRpcController delegate = super.newController(cellIterables);
- return new IndexQosRpcController(delegate, conf);
- }
-
- private class IndexQosRpcController extends DelegatingPayloadCarryingRpcController {
-
- private int priority;
-
- public IndexQosRpcController(PayloadCarryingRpcController delegate, Configuration conf) {
- super(delegate);
- this.priority = PhoenixIndexRpcSchedulerFactory.getMinPriority(conf);
- }
- @Override
- public void setPriority(final TableName tn) {
- // if its an index table, then we override to the index priority
- if (!tn.isSystemTable() && !SchemaUtil.isSystemDataTable(tn.getNameAsString())) {
- setPriority(this.priority);
- }
- else {
- super.setPriority(tn);
- }
- }
-
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ipc/PhoenixIndexRpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ipc/PhoenixIndexRpcSchedulerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ipc/PhoenixIndexRpcSchedulerFactory.java
deleted file mode 100644
index 1789b0e..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ipc/PhoenixIndexRpcSchedulerFactory.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hbase.index.ipc;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ipc.PhoenixIndexRpcScheduler;
-import org.apache.hadoop.hbase.ipc.PriorityFunction;
-import org.apache.hadoop.hbase.ipc.RpcScheduler;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
-import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Factory to create a {@link PhoenixIndexRpcScheduler}. In this package so we can access the
- * {@link SimpleRpcSchedulerFactory}.
- */
-public class PhoenixIndexRpcSchedulerFactory implements RpcSchedulerFactory {
-
- private static final Log LOG = LogFactory.getLog(PhoenixIndexRpcSchedulerFactory.class);
-
- private static final String VERSION_TOO_OLD_FOR_INDEX_RPC =
- "Running an older version of HBase (less than 0.98.4), Phoenix index RPC handling cannot be enabled.";
-
- @Override
- public RpcScheduler create(Configuration conf, PriorityFunction priorityFunction, Abortable abortable) {
- // create the delegate scheduler
- RpcScheduler delegate;
- try {
- // happens in <=0.98.4 where the scheduler factory is not visible
- delegate = new SimpleRpcSchedulerFactory().create(conf, priorityFunction, abortable);
- } catch (IllegalAccessError e) {
- LOG.fatal(VERSION_TOO_OLD_FOR_INDEX_RPC);
- throw e;
- }
-
- int indexHandlerCount = conf.getInt(QueryServices.INDEX_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
- int minPriority = getMinPriority(conf);
- int maxPriority = conf.getInt(QueryServices.MAX_INDEX_PRIOIRTY_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_MAX_PRIORITY);
- // make sure the ranges are outside the warning ranges
- Preconditions.checkArgument(maxPriority > minPriority, "Max index priority (" + maxPriority
- + ") must be larger than min priority (" + minPriority + ")");
- boolean allSmaller =
- minPriority < HConstants.REPLICATION_QOS
- && maxPriority < HConstants.REPLICATION_QOS;
- boolean allLarger = minPriority > HConstants.HIGH_QOS;
- Preconditions.checkArgument(allSmaller || allLarger, "Index priority range (" + minPriority
- + ", " + maxPriority + ") must be outside HBase priority range ("
- + HConstants.REPLICATION_QOS + ", " + HConstants.HIGH_QOS + ")");
-
- LOG.info("Using custom Phoenix Index RPC Handling with " + indexHandlerCount
- + " handlers and priority range [" + minPriority + ", " + maxPriority + ")");
-
- PhoenixIndexRpcScheduler scheduler =
- new PhoenixIndexRpcScheduler(indexHandlerCount, conf, delegate, minPriority,
- maxPriority);
- return scheduler;
- }
-
- @Override
- public RpcScheduler create(Configuration configuration, PriorityFunction priorityFunction) {
- return create(configuration, priorityFunction, null);
- }
-
- public static int getMinPriority(Configuration conf) {
- return conf.getInt(QueryServices.MIN_INDEX_PRIOIRTY_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_MIN_PRIORITY);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 15bcfd0..1b8b57d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -279,10 +279,6 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
/** Version below which we fall back on the generic KeyValueBuilder */
public static final int CLIENT_KEY_VALUE_BUILDER_THRESHOLD = VersionUtil.encodeVersion("0", "94", "14");
- // list of system tables
- public static final List<String> SYSTEM_TABLE_NAMES = new ImmutableList.Builder<String>().add(SYSTEM_CATALOG_NAME)
- .add(SYSTEM_STATS_NAME).add(SEQUENCE_FULLNAME).build();
-
PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException {
this.emptyResultSet = new PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, new PhoenixStatement(connection));
this.connection = connection;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 2eab5dd..65f6acf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -123,9 +123,8 @@ public interface QueryServices extends SQLCloseable {
// Index will be partially re-built from index disable time stamp - following overlap time
public static final String INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB =
"phoenix.index.failure.handling.rebuild.overlap.time";
- public static final String MIN_INDEX_PRIOIRTY_ATTRIB = "phoenix.regionserver.index.priority.min";
- public static final String MAX_INDEX_PRIOIRTY_ATTRIB = "phoenix.regionserver.index.priority.max";
- public static final String INDEX_HANDLER_COUNT_ATTRIB = "phoenix.regionserver.index.handler.count";
+ public static final String INDEX_PRIOIRTY_ATTRIB = "phoenix.index.rpc.priority";
+ public static final String METADATA_PRIOIRTY_ATTRIB = "phoenix.metadata.rpc.priority";
public static final String ALLOW_LOCAL_INDEX_ATTRIB = "phoenix.index.allowLocalIndex";
// Config parameters for for configuring tracing
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 8cd740a..97040d2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -41,6 +41,7 @@ import static org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_TIME_TO_LI
import static org.apache.phoenix.query.QueryServices.MAX_SERVER_METADATA_CACHE_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.METRICS_ENABLED;
import static org.apache.phoenix.query.QueryServices.MIN_STATS_UPDATE_FREQ_MS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK;
@@ -61,12 +62,13 @@ import static org.apache.phoenix.query.QueryServices.STATS_USE_CURRENT_TIME_ATTR
import static org.apache.phoenix.query.QueryServices.THREAD_POOL_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.THREAD_TIMEOUT_MS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.METRICS_ENABLED;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.trace.util.Tracing;
@@ -138,13 +140,12 @@ public class QueryServicesOptions {
public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL = 10000; // 10 secs
public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME = 300000; // 5 mins
- public static final int DEFAULT_INDEX_MAX_PRIORITY = 1050;
/**
* HConstants#HIGH_QOS is the max we will see to a standard table. We go higher to differentiate
* and give some room for things in the middle
*/
- public static final int DEFAULT_INDEX_MIN_PRIORITY = 1000;
- public static final int DEFAULT_INDEX_HANDLER_COUNT = 30;
+ public static final int DEFAULT_INDEX_PRIORITY = 1000;
+ public static final int DEFAULT_METADATA_PRIORITY = 2000;
public static final boolean DEFAULT_ALLOW_LOCAL_INDEX = true;
public static final int DEFAULT_TRACING_PAGE_SIZE = 100;
@@ -235,7 +236,8 @@ public class QueryServicesOptions {
.setIfUnset(ALLOW_ONLINE_TABLE_SCHEMA_UPDATE, DEFAULT_ALLOW_ONLINE_TABLE_SCHEMA_UPDATE)
.setIfUnset(NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_RETRIES_FOR_SCHEMA_UPDATE_CHECK)
.setIfUnset(DELAY_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_DELAY_FOR_SCHEMA_UPDATE_CHECK)
- .setIfUnset(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED);
+ .setIfUnset(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED)
+ .setIfUnset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, ClientRpcControllerFactory.class.getName());
;
// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user set
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 4a8341d..46da726 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -404,13 +404,6 @@ public class SchemaUtil {
return false;
}
- /**
- * Returns true if the given table is a system table (does not include future system indexes)
- */
- public static boolean isSystemDataTable(String fullTableName) {
- return PhoenixDatabaseMetaData.SYSTEM_TABLE_NAMES.contains(fullTableName);
- }
-
// Given the splits and the rowKeySchema, find out the keys that
public static byte[][] processSplits(byte[][] splits, LinkedHashSet<PColumn> pkColumns, Integer saltBucketNum, boolean defaultRowKeyOrder) throws SQLException {
// FIXME: shouldn't this return if splits.length == 0?
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a7d7dfb5/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
index 8bd8c11..12f1863 100644
--- a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
+++ b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
@@ -44,9 +44,9 @@ public class PhoenixIndexRpcSchedulerTest {
public void testIndexPriorityWritesToIndexHandler() throws Exception {
RpcScheduler mock = Mockito.mock(RpcScheduler.class);
- PhoenixIndexRpcScheduler scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 200, 250);
+ PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250);
BalancedQueueRpcExecutor executor = new BalancedQueueRpcExecutor("test-queue", 1, 1, 1);
- scheduler.setExecutorForTesting(executor);
+ scheduler.setIndexExecutorForTesting(executor);
dispatchCallWithPriority(scheduler, 200);
List<BlockingQueue<CallRunner>> queues = executor.getQueues();
assertEquals(1, queues.size());
@@ -54,8 +54,8 @@ public class PhoenixIndexRpcSchedulerTest {
queue.poll(20, TimeUnit.SECONDS);
// try again, this time we tweak the ranges we support
- scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 101, 110);
- scheduler.setExecutorForTesting(executor);
+ scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110);
+ scheduler.setIndexExecutorForTesting(executor);
dispatchCallWithPriority(scheduler, 101);
queue.poll(20, TimeUnit.SECONDS);
@@ -71,14 +71,14 @@ public class PhoenixIndexRpcSchedulerTest {
@Test
public void testDelegateWhenOutsideRange() throws Exception {
RpcScheduler mock = Mockito.mock(RpcScheduler.class);
- PhoenixIndexRpcScheduler scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 200, 250);
+ PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250);
dispatchCallWithPriority(scheduler, 100);
- dispatchCallWithPriority(scheduler, 250);
+ dispatchCallWithPriority(scheduler, 251);
// try again, this time we tweak the ranges we support
- scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 101, 110);
+ scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110);
dispatchCallWithPriority(scheduler, 200);
- dispatchCallWithPriority(scheduler, 110);
+ dispatchCallWithPriority(scheduler, 111);
Mockito.verify(mock, Mockito.times(4)).init(Mockito.any(Context.class));
Mockito.verify(mock, Mockito.times(4)).dispatch(Mockito.any(CallRunner.class));