You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2022/07/21 16:08:20 UTC
[phoenix] branch master updated: PHOENIX-6687 The region server hosting the SYSTEM.CATALOG fails to serve any metadata requests as default handler pool threads are exhausted (#1440)
This is an automated email from the ASF dual-hosted git repository.
gjacoby pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 4f4d0863cc PHOENIX-6687 The region server hosting the SYSTEM.CATALOG fails to serve any metadata requests as default handler pool threads are exhausted (#1440)
4f4d0863cc is described below
commit 4f4d0863cca2ea7233aa871484be0763d32f0b33
Author: Jacob Isaac <ja...@gmail.com>
AuthorDate: Thu Jul 21 09:08:15 2022 -0700
PHOENIX-6687 The region server hosting the SYSTEM.CATALOG fails to serve any metadata requests as default handler pool threads are exhausted (#1440)
Co-authored-by: Jacob Isaac <ji...@salesforce.com>
Co-authored-by: Geoffrey Jacoby <gj...@apache.org>
---
.../end2end/ConnectionQueryServicesTestImpl.java | 5 +
.../SystemTablesCreationOnConnectionIT.java | 45 ++++++
.../hadoop/hbase/ipc/PhoenixRpcScheduler.java | 44 +++++-
.../hbase/ipc/PhoenixRpcSchedulerFactory.java | 11 +-
.../ipc/controller/MetadataRpcController.java | 3 +
.../controller/ServerSideRPCControllerFactory.java | 41 ++++++
.../controller/ServerToServerRpcController.java | 42 ++++++
...r.java => ServerToServerRpcControllerImpl.java} | 74 +++++-----
.../query/ChildLinkMetaDataServiceCallBack.java | 26 +++-
.../phoenix/query/ConnectionQueryServicesImpl.java | 156 +++++++++++++--------
.../org/apache/phoenix/query/QueryServices.java | 4 +-
.../apache/phoenix/query/QueryServicesOptions.java | 2 +
.../java/org/apache/phoenix/util/QueryUtil.java | 3 +-
.../hbase/ipc/PhoenixIndexRpcSchedulerTest.java | 51 ++++++-
14 files changed, 394 insertions(+), 113 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
index 7e7b6b1236..4fdf47c538 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
@@ -28,6 +28,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import com.google.protobuf.RpcController;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.query.ConnectionQueryServicesImpl;
@@ -76,6 +77,10 @@ public class ConnectionQueryServicesTestImpl extends ConnectionQueryServicesImpl
super.removeConnection(connection);
}
+ public RpcController getController() {
+ return super.getController();
+ }
+
@Override
public void close() throws SQLException {
try {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
index 76af21346a..3633617e1c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
@@ -49,6 +49,9 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceNotFoundException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.ipc.controller.ServerToServerRpcController;
import org.apache.phoenix.compat.hbase.CompatUtil;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.exception.SQLExceptionCode;
@@ -63,6 +66,7 @@ import org.apache.phoenix.query.ConnectionQueryServicesImpl;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesTestImpl;
+import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.UpgradeUtil;
import org.junit.After;
@@ -217,7 +221,48 @@ public class SystemTablesCreationOnConnectionIT {
assertEquals(1, countUpgradeAttempts);
}
+ // Conditions: isDoNotUpgradePropSet is true
+ // Conditions: IS_SERVER_CONNECTION is true
+ // Expected: We do not create SYSTEM.CATALOG even if this is the first connection to the server
+ @Test
+ public void testGetConnectionOnServer() throws Exception {
+ startMiniClusterWithToggleNamespaceMapping(Boolean.FALSE.toString());
+ verifyCQSIUsingAppropriateRPCContoller(true);
+ }
+
+ // Conditions: isDoNotUpgradePropSet is true
+ // Conditions: IS_SERVER_CONNECTION is false
+ // Expected: We do not create SYSTEM.CATALOG even if this is the first connection to the server
+ @Test
+ public void testGetRegularConnection() throws Exception {
+ startMiniClusterWithToggleNamespaceMapping(Boolean.FALSE.toString());
+ verifyCQSIUsingAppropriateRPCContoller(false);
+ }
+
+ private void verifyCQSIUsingAppropriateRPCContoller(boolean isServerSideConnection) throws Exception {
+ Properties serverSideProperties = new Properties();
+ // Set doNotUpgradeProperty to true
+ UpgradeUtil.doNotUpgradeOnFirstConnection(serverSideProperties);
+ if (isServerSideConnection) {
+ QueryUtil.setServerConnection(serverSideProperties);
+ }
+ PhoenixTestDriver driver = new PhoenixTestDriver(ReadOnlyProps.EMPTY_PROPS);
+
+ ConnectionQueryServices cqsi = driver.getConnectionQueryServices(getJdbcUrl(), serverSideProperties);
+ assertTrue(cqsi instanceof ConnectionQueryServicesTestImpl);
+ ConnectionQueryServicesTestImpl testCQSI = (ConnectionQueryServicesTestImpl)cqsi;
+ if (isServerSideConnection) {
+ assertTrue( testCQSI.getController() instanceof ServerToServerRpcController);
+ } else {
+ assertTrue( testCQSI.getController() instanceof ServerRpcController);
+ }
+ assertTrue(testCQSI.isUpgradeRequired());
+ hbaseTables = getHBaseTables();
+ assertFalse(hbaseTables.contains(PHOENIX_SYSTEM_CATALOG) ||
+ hbaseTables.contains(PHOENIX_NAMESPACE_MAPPED_SYSTEM_CATALOG));
+ assertEquals(0, hbaseTables.size());
+ }
/********************* Testing SYSTEM.CATALOG/SYSTEM:CATALOG creation/upgrade behavior for subsequent connections *********************/
// We are ignoring this test because we aren't testing SYSCAT timestamp anymore if
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
index b2a7503538..6d816ec17d 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
@@ -40,23 +40,29 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
private int indexPriority;
private int metadataPriority;
+ private int serverSidePriority;
private RpcExecutor indexCallExecutor;
private RpcExecutor metadataCallExecutor;
+ private RpcExecutor serverSideCallExecutor;
private int port;
- public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority, int metadataPriority, PriorityFunction priorityFunction, Abortable abortable) {
+ public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority, int metadataPriority, int serversidePriority, PriorityFunction priorityFunction, Abortable abortable) {
// copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4
int indexHandlerCount = conf.getInt(QueryServices.INDEX_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
- int metadataHandlerCount = conf.getInt(QueryServices.METADATA_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
+ int metadataHandlerCount = conf.getInt(QueryServices.METADATA_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_METADATA_HANDLER_COUNT);
+ int serverSideHandlerCount = conf.getInt(QueryServices.SERVER_SIDE_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_SERVERSIDE_HANDLER_COUNT);
int maxIndexQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, indexHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
int maxMetadataQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, metadataHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+ int maxServerSideQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, serverSideHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
this.indexPriority = indexPriority;
this.metadataPriority = metadataPriority;
+ this.serverSidePriority = serversidePriority;
this.delegate = delegate;
this.indexCallExecutor = new BalancedQueueRpcExecutor("Index", indexHandlerCount, maxIndexQueueLength, priorityFunction,conf,abortable);
this.metadataCallExecutor = new BalancedQueueRpcExecutor("Metadata", metadataHandlerCount, maxMetadataQueueLength, priorityFunction,conf,abortable);
+ this.serverSideCallExecutor = new BalancedQueueRpcExecutor("ServerSide", serverSideHandlerCount, maxServerSideQueueLength, priorityFunction,conf,abortable);
}
@Override
@@ -70,6 +76,7 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
delegate.start();
indexCallExecutor.start(port);
metadataCallExecutor.start(port);
+ serverSideCallExecutor.start(port);
}
@Override
@@ -77,6 +84,7 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
delegate.stop();
indexCallExecutor.stop();
metadataCallExecutor.stop();
+ serverSideCallExecutor.stop();
}
@Override
@@ -87,6 +95,8 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
return indexCallExecutor.dispatch(callTask);
} else if (metadataPriority == priority) {
return metadataCallExecutor.dispatch(callTask);
+ } else if (serverSidePriority == priority) {
+ return serverSideCallExecutor.dispatch(callTask);
} else {
return delegate.dispatch(callTask);
}
@@ -101,7 +111,10 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
public int getGeneralQueueLength() {
// not the best way to calculate, but don't have a better way to hook
// into metrics at the moment
- return this.delegate.getGeneralQueueLength() + this.indexCallExecutor.getQueueLength() + this.metadataCallExecutor.getQueueLength();
+ return this.delegate.getGeneralQueueLength()
+ + this.indexCallExecutor.getQueueLength()
+ + this.metadataCallExecutor.getQueueLength()
+ + this.serverSideCallExecutor.getQueueLength();
}
@Override
@@ -116,7 +129,10 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
@Override
public int getActiveRpcHandlerCount() {
- return this.delegate.getActiveRpcHandlerCount() + this.indexCallExecutor.getActiveHandlerCount() + this.metadataCallExecutor.getActiveHandlerCount();
+ return this.delegate.getActiveRpcHandlerCount()
+ + this.indexCallExecutor.getActiveHandlerCount()
+ + this.metadataCallExecutor.getActiveHandlerCount()
+ + this.serverSideCallExecutor.getActiveHandlerCount();
}
@Override
@@ -139,6 +155,26 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler {
this.metadataCallExecutor = executor;
}
+ @VisibleForTesting
+ public void setServerSideExecutorForTesting(RpcExecutor executor) {
+ this.serverSideCallExecutor = executor;
+ }
+
+ @VisibleForTesting
+ public RpcExecutor getIndexExecutorForTesting() {
+ return this.indexCallExecutor;
+ }
+
+ @VisibleForTesting
+ public RpcExecutor getMetadataExecutorForTesting() {
+ return this.metadataCallExecutor;
+ }
+
+ @VisibleForTesting
+ public RpcExecutor getServerSideExecutorForTesting() {
+ return this.serverSideCallExecutor;
+ }
+
@Override
public int getWriteQueueLength() {
return delegate.getWriteQueueLength();
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
index 57c0d19685..74adf01573 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
@@ -58,6 +58,9 @@ public class PhoenixRpcSchedulerFactory implements RpcSchedulerFactory {
// get the metadata priority configs
int metadataPriority = getMetadataPriority(conf);
validatePriority(metadataPriority);
+ // get the server side priority configs
+ int serverSidePriority = getServerSidePriority(conf);
+ validatePriority(serverSidePriority);
// validate index and metadata priorities are not the same
Preconditions.checkArgument(indexPriority != metadataPriority, "Index and Metadata priority must not be same "+ indexPriority);
@@ -65,7 +68,7 @@ public class PhoenixRpcSchedulerFactory implements RpcSchedulerFactory {
+ indexPriority + " and metadata rpc priority " + metadataPriority);
PhoenixRpcScheduler scheduler =
- new PhoenixRpcScheduler(conf, delegate, indexPriority, metadataPriority, priorityFunction,abortable);
+ new PhoenixRpcScheduler(conf, delegate, indexPriority, metadataPriority, serverSidePriority, priorityFunction,abortable);
return scheduler;
}
@@ -89,5 +92,9 @@ public class PhoenixRpcSchedulerFactory implements RpcSchedulerFactory {
public static int getMetadataPriority(Configuration conf) {
return conf.getInt(QueryServices.METADATA_PRIOIRTY_ATTRIB, QueryServicesOptions.DEFAULT_METADATA_PRIORITY);
}
-
+
+ public static int getServerSidePriority(Configuration conf) {
+ return conf.getInt(QueryServices.SERVER_SIDE_PRIOIRTY_ATTRIB, QueryServicesOptions.DEFAULT_SERVER_SIDE_PRIORITY);
+ }
+
}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
index ef135f0cb1..2800ca461d 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
@@ -43,6 +43,7 @@ class MetadataRpcController extends DelegatingHBaseRpcController {
.add(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)
.add(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME)
.add(PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME)
+ .add(PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME)
.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, true)
.getNameAsString())
.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES, true)
@@ -51,6 +52,8 @@ class MetadataRpcController extends DelegatingHBaseRpcController {
.getNameAsString())
.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES, true)
.getNameAsString())
+ .add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES, true)
+ .getNameAsString())
.build();
public MetadataRpcController(HBaseRpcController delegate,
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java
new file mode 100644
index 0000000000..ba7fb6339d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc.controller;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link RpcControllerFactory} that should only be used when
+ * making server-server remote RPCs to the region servers hosting Phoenix SYSTEM tables.
+ */
+public class ServerSideRPCControllerFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ServerSideRPCControllerFactory.class);
+ protected final Configuration conf;
+
+ public ServerSideRPCControllerFactory(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public ServerToServerRpcController newController() {
+ return new ServerToServerRpcControllerImpl(this.conf);
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerToServerRpcController.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerToServerRpcController.java
new file mode 100644
index 0000000000..4916168b9d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerToServerRpcController.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc.controller;
+
+import com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+
+public interface ServerToServerRpcController extends RpcController {
+
+ /**
+ * @param priority Priority for this request; should fall roughly in the range
+ * {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS}
+ */
+ void setPriority(int priority);
+
+ /**
+ * @param tn Set priority based off the table we are going against.
+ */
+ void setPriority(final TableName tn);
+
+ /**
+ * @return The priority of this request
+ */
+ int getPriority();
+}
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerToServerRpcControllerImpl.java
similarity index 53%
copy from phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
copy to phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerToServerRpcControllerImpl.java
index ef135f0cb1..b6a27cd4b0 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerToServerRpcControllerImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,57 +15,61 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.hbase.ipc.controller;
-import java.util.List;
-
+import com.google.protobuf.RpcController;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
-import com.google.protobuf.RpcController;
+import java.util.List;
/**
- * {@link RpcController} that sets the appropriate priority of RPC calls destined for Phoenix SYSTEM
- * tables
+ * {@link RpcController} that sets the appropriate priority of server-server RPC calls destined
+ * for Phoenix SYSTEM tables.
*/
-class MetadataRpcController extends DelegatingHBaseRpcController {
+public class ServerToServerRpcControllerImpl extends ServerRpcController implements
+ ServerToServerRpcController {
- private int priority;
- // list of system tables
- private static final List<String> SYSTEM_TABLE_NAMES = new ImmutableList.Builder<String>()
- .add(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME)
- .add(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)
- .add(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME)
- .add(PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME)
+ private int priority;
+ // list of system tables that can possibly have server-server rpc's
+ private static final List<String> SYSTEM_TABLE_NAMES = new ImmutableList.Builder<String>()
+ .add(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME)
+ .add(PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME)
+ .add(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME)
.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, true)
.getNameAsString())
- .add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES, true)
- .getNameAsString())
- .add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES, true)
+ .add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES, true)
.getNameAsString())
- .add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES, true)
+ .add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME_BYTES, true)
.getNameAsString())
.build();
- public MetadataRpcController(HBaseRpcController delegate,
- Configuration conf) {
- super(delegate);
- this.priority = PhoenixRpcSchedulerFactory.getMetadataPriority(conf);
- }
+ public ServerToServerRpcControllerImpl(
+ Configuration conf) {
+ super();
+ this.priority = PhoenixRpcSchedulerFactory.getServerSidePriority(conf);
+ }
+
+ @Override
+ public void setPriority(final TableName tn) {
+ if (SYSTEM_TABLE_NAMES.contains(tn.getNameAsString())) {
+ setPriority(this.priority);
+ }
+ }
+
+
+ @Override public void setPriority(int priority) {
+ this.priority = priority;
+ }
- @Override
- public void setPriority(final TableName tn) {
- if (SYSTEM_TABLE_NAMES.contains(tn.getNameAsString())) {
- setPriority(this.priority);
- } else {
- super.setPriority(tn);
- }
- }
+ @Override public int getPriority() {
+ return this.priority;
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ChildLinkMetaDataServiceCallBack.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ChildLinkMetaDataServiceCallBack.java
index 73373ef256..ab16a02c63 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ChildLinkMetaDataServiceCallBack.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ChildLinkMetaDataServiceCallBack.java
@@ -18,9 +18,11 @@
package org.apache.phoenix.query;
+import com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.phoenix.coprocessor.generated.ChildLinkMetaDataProtos
@@ -41,15 +43,16 @@ class ChildLinkMetaDataServiceCallBack
implements Batch.Call<ChildLinkMetaDataService, MetaDataResponse> {
private final List<Mutation> childLinkMutations;
+ private final RpcController controller;
- public ChildLinkMetaDataServiceCallBack(List<Mutation> childLinkMutations) {
+ public ChildLinkMetaDataServiceCallBack(RpcController controller, List<Mutation> childLinkMutations) {
+ this.controller = controller;
this.childLinkMutations = childLinkMutations;
}
@Override
public MetaDataResponse call(ChildLinkMetaDataService instance)
throws IOException {
- ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<>();
CreateViewAddChildLinkRequest.Builder builder =
@@ -60,9 +63,22 @@ class ChildLinkMetaDataServiceCallBack
}
CreateViewAddChildLinkRequest build = builder.build();
instance.createViewAddChildLink(controller, build, rpcCallback);
- if (controller.getFailedOn() != null) {
- throw controller.getFailedOn();
- }
+ checkForRemoteExceptions(controller);
return rpcCallback.get();
}
+
+ private void checkForRemoteExceptions(RpcController controller) throws IOException {
+ if (controller != null) {
+ if (controller instanceof ServerRpcController) {
+ if (((ServerRpcController)controller).getFailedOn() != null) {
+ throw ((ServerRpcController)controller).getFailedOn();
+ }
+ } else {
+ if (((HBaseRpcController)controller).getFailed() != null) {
+ throw ((HBaseRpcController)controller).getFailed();
+ }
+ }
+ }
+ }
+
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 4fc2f1ede3..1a5cb1cf52 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -49,6 +49,8 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAM
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_HBASE_TABLE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_MUTEX_TABLE_NAME;
@@ -63,6 +65,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_FOR_MUTEX;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_SERVICES_COUNTER;
@@ -124,6 +127,7 @@ import java.util.regex.Pattern;
import javax.annotation.concurrent.GuardedBy;
+import com.google.protobuf.RpcController;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
@@ -152,8 +156,11 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.ipc.controller.ServerToServerRpcController;
+import org.apache.hadoop.hbase.ipc.controller.ServerSideRPCControllerFactory;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.regionserver.IndexHalfStoreFileReaderGenerator;
import org.apache.hadoop.hbase.security.AccessDeniedException;
@@ -382,6 +389,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private final int maxInternalConnectionsAllowed;
private final boolean shouldThrottleNumConnections;
public static final byte[] MUTEX_LOCKED = "MUTEX_LOCKED".getBytes(StandardCharsets.UTF_8);
+ private ServerSideRPCControllerFactory serverSideRPCControllerFactory;
private static interface FeatureSupported {
boolean isSupported(ConnectionQueryServices services);
@@ -434,6 +442,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// Without making a copy of the configuration we cons up, we lose some of our properties
// on the server side during testing.
this.config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config);
+ //Set the rpcControllerFactory if it is a server side connnection.
+ boolean isServerSideConnection = config.getBoolean(QueryUtil.IS_SERVER_CONNECTION, false);
+ if (isServerSideConnection) {
+ this.serverSideRPCControllerFactory = new ServerSideRPCControllerFactory(config);
+ }
// set replication required parameter
ConfigUtil.setReplicationConfigIfAbsent(this.config);
this.props = new ReadOnlyProps(this.config.iterator());
@@ -1662,7 +1675,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
@Override
public GetVersionResponse call(MetaDataService instance)
throws IOException {
- ServerRpcController controller = new ServerRpcController();
+ RpcController controller = getController();
BlockingRpcCallback<GetVersionResponse> rpcCallback =
new BlockingRpcCallback<>();
GetVersionRequest.Builder builder =
@@ -1671,9 +1684,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION,
PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.getVersion(controller, builder.build(), rpcCallback);
- if (controller.getFailedOn() != null) {
- throw controller.getFailedOn();
- }
+ checkForRemoteExceptions(controller);
return rpcCallback.get();
}
});
@@ -1784,6 +1795,50 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
+ @VisibleForTesting
+ protected RpcController getController() {
+ return getController(SYSTEM_CATALOG_HBASE_TABLE_NAME);
+ }
+
+ /**
+ * If configured to use the server-server metadata handler pool for server side connections,
+ * use the {@link org.apache.hadoop.hbase.ipc.controller.ServerToServerRpcController}
+ * else use the ordinary handler pool {@link ServerRpcController}
+ *
+ * return the rpcController to use
+ * @return
+ */
+ @VisibleForTesting
+ protected RpcController getController(TableName systemTableName) {
+ if (serverSideRPCControllerFactory != null) {
+ ServerToServerRpcController controller = serverSideRPCControllerFactory.newController();
+ controller.setPriority(systemTableName);
+ return controller;
+ } else {
+ return new ServerRpcController();
+ }
+ }
+
+ /**
+ * helper function to return the exception from the RPC
+ * @param controller
+ * @throws IOException
+ */
+
+ private void checkForRemoteExceptions(RpcController controller) throws IOException {
+ if (controller != null) {
+ if (controller instanceof ServerRpcController) {
+ if (((ServerRpcController)controller).getFailedOn() != null) {
+ throw ((ServerRpcController)controller).getFailedOn();
+ }
+ } else {
+ if (((HBaseRpcController)controller).getFailed() != null) {
+ throw ((HBaseRpcController)controller).getFailed();
+ }
+ }
+ }
+ }
+
/**
* Invoke meta data coprocessor with one retry if the key was found to not be in the regions
* (due to a table split)
@@ -1812,8 +1867,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
Table ht = this.getTable(SchemaUtil.getPhysicalName(systemTableName, this.getProps()).getName());
try {
- results =
- ht.coprocessorService(MetaDataService.class, tableKey, tableKey, callable);
+ results = ht.coprocessorService(MetaDataService.class, tableKey, tableKey, callable);
assert(results.size() == 1);
MetaDataResponse result = results.values().iterator().next();
@@ -2003,9 +2057,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// We invoke this using rowKey available in the first element
// of childLinkMutations.
final byte[] rowKey = childLinkMutations.get(0).getRow();
+ final RpcController controller = getController(PhoenixDatabaseMetaData.SYSTEM_LINK_HBASE_TABLE_NAME);
final MetaDataMutationResult result =
childLinkMetaDataCoprocessorExec(rowKey,
- new ChildLinkMetaDataServiceCallBack(childLinkMutations));
+ new ChildLinkMetaDataServiceCallBack(controller, childLinkMutations));
switch (result.getMutationCode()) {
case UNABLE_TO_CREATE_CHILD_LINK:
@@ -2025,7 +2080,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
+ RpcController controller = getController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<>();
CreateTableRequest.Builder builder = CreateTableRequest.newBuilder();
@@ -2042,9 +2097,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
CreateTableRequest build = builder.build();
instance.createTable(controller, build, rpcCallback);
- if (controller.getFailedOn() != null) {
- throw controller.getFailedOn();
- }
+ checkForRemoteExceptions(controller);
return rpcCallback.get();
}
});
@@ -2061,7 +2114,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
+ RpcController controller = getController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
GetTableRequest.Builder builder = GetTableRequest.newBuilder();
@@ -2072,9 +2125,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
builder.setClientTimestamp(clientTimestamp);
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.getTable(controller, builder.build(), rpcCallback);
- if (controller.getFailedOn() != null) {
- throw controller.getFailedOn();
- }
+ checkForRemoteExceptions(controller);
return rpcCallback.get();
}
});
@@ -2096,7 +2147,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
+ RpcController controller = getController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
DropTableRequest.Builder builder = DropTableRequest.newBuilder();
@@ -2108,9 +2159,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
builder.setCascade(cascade);
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.dropTable(controller, builder.build(), rpcCallback);
- if (controller.getFailedOn() != null) {
- throw controller.getFailedOn();
- }
+ checkForRemoteExceptions(controller);
return rpcCallback.get();
}
});
@@ -2183,7 +2232,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
+ RpcController controller = getController(SYSTEM_FUNCTION_HBASE_TABLE_NAME);
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
DropFunctionRequest.Builder builder = DropFunctionRequest.newBuilder();
@@ -2194,12 +2243,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
builder.setIfExists(ifExists);
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.dropFunction(controller, builder.build(), rpcCallback);
- if (controller.getFailedOn() != null) {
- throw controller.getFailedOn();
- }
+ checkForRemoteExceptions(controller);
return rpcCallback.get();
}
- }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES);
+ }, SYSTEM_FUNCTION_NAME_BYTES);
return result;
}
@@ -2404,7 +2451,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
+ RpcController controller = getController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
AddColumnRequest.Builder builder = AddColumnRequest.newBuilder();
@@ -2420,9 +2467,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
builder.setAddingColumns(addingColumns);
instance.addColumn(controller, builder.build(), rpcCallback);
- if (controller.getFailedOn() != null) {
- throw controller.getFailedOn();
- }
+ checkForRemoteExceptions(controller);
return rpcCallback.get();
}
});
@@ -3122,7 +3167,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
+ RpcController controller = getController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
DropColumnRequest.Builder builder = DropColumnRequest.newBuilder();
@@ -3134,9 +3179,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (parentTable!=null)
builder.setParentTable(PTableImpl.toProto(parentTable));
instance.dropColumn(controller, builder.build(), rpcCallback);
- if (controller.getFailedOn() != null) {
- throw controller.getFailedOn();
- }
+ checkForRemoteExceptions(controller);
return rpcCallback.get();
}
});
@@ -4937,12 +4980,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
this.getProps()).getName())) {
try {
startTime = EnvironmentEdgeManager.currentTimeMillis();
- results =
- htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW,
+ results = htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW, new Batch.Call<MetaDataService, Long>() {
@Override
public Long call(MetaDataService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
+ RpcController controller = getController();
BlockingRpcCallback<ClearCacheResponse> rpcCallback =
new BlockingRpcCallback<ClearCacheResponse>();
ClearCacheRequest.Builder builder = ClearCacheRequest.newBuilder();
@@ -4950,13 +4992,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION,
PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.clearCache(controller, builder.build(), rpcCallback);
- if (controller.getFailedOn() != null) {
- throw controller.getFailedOn();
- }
+ checkForRemoteExceptions(controller);
return rpcCallback.get().getUnfreedBytes();
}
});
- TableMetricsManager.updateMetricsForSystemCatalogTableMethod(null, NUM_SYSTEM_TABLE_RPC_SUCCESS, 1);
+ TableMetricsManager.updateMetricsForSystemCatalogTableMethod(null,NUM_SYSTEM_TABLE_RPC_SUCCESS, 1);
} catch(Throwable e) {
TableMetricsManager.updateMetricsForSystemCatalogTableMethod(null, NUM_SYSTEM_TABLE_RPC_FAILURES, 1);
throw ServerUtil.parseServerException(e);
@@ -5023,7 +5063,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
+ RpcController controller = getController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
UpdateIndexStateRequest.Builder builder = UpdateIndexStateRequest.newBuilder();
@@ -5033,9 +5073,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.updateIndexState(controller, builder.build(), rpcCallback);
- if (controller.getFailedOn() != null) {
- throw controller.getFailedOn();
- }
+ checkForRemoteExceptions(controller);
return rpcCallback.get();
}
});
@@ -5295,7 +5333,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
new Batch.Call<MetaDataService, ClearTableFromCacheResponse>() {
@Override
public ClearTableFromCacheResponse call(MetaDataService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
+ RpcController controller = getController();
BlockingRpcCallback<ClearTableFromCacheResponse> rpcCallback = new BlockingRpcCallback<ClearTableFromCacheResponse>();
ClearTableFromCacheRequest.Builder builder = ClearTableFromCacheRequest.newBuilder();
builder.setTenantId(ByteStringer.wrap(tenantId));
@@ -5304,7 +5342,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
builder.setClientTimestamp(clientTS);
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.clearTableFromCache(controller, builder.build(), rpcCallback);
- if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
+ checkForRemoteExceptions(controller);
return rpcCallback.get();
}
});
@@ -5615,7 +5653,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
+ RpcController controller = getController(SYSTEM_FUNCTION_HBASE_TABLE_NAME);
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
GetFunctionsRequest.Builder builder = GetFunctionsRequest.newBuilder();
@@ -5627,12 +5665,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
builder.setClientTimestamp(clientTimestamp);
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.getFunctions(controller, builder.build(), rpcCallback);
- if (controller.getFailedOn() != null) {
- throw controller.getFailedOn();
- }
+ checkForRemoteExceptions(controller);
return rpcCallback.get();
}
- }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES);
+ }, SYSTEM_FUNCTION_NAME_BYTES);
}
@@ -5642,7 +5678,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
+ RpcController controller = getController();
BlockingRpcCallback<MetaDataResponse> rpcCallback = new BlockingRpcCallback<MetaDataResponse>();
GetSchemaRequest.Builder builder = GetSchemaRequest.newBuilder();
builder.setSchemaName(schemaName);
@@ -5650,7 +5686,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION,
PHOENIX_PATCH_NUMBER));
instance.getSchema(controller, builder.build(), rpcCallback);
- if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
+ checkForRemoteExceptions(controller);
return rpcCallback.get();
}
});
@@ -5672,7 +5708,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
+ RpcController controller = getController(SYSTEM_FUNCTION_HBASE_TABLE_NAME);
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
CreateFunctionRequest.Builder builder = CreateFunctionRequest.newBuilder();
@@ -5684,12 +5720,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
builder.setReplace(function.isReplace());
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.createFunction(controller, builder.build(), rpcCallback);
- if (controller.getFailedOn() != null) {
- throw controller.getFailedOn();
- }
+ checkForRemoteExceptions(controller);
return rpcCallback.get();
}
- }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES);
+ }, SYSTEM_FUNCTION_NAME_BYTES);
return result;
}
@@ -5861,7 +5895,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
+ RpcController controller = getController();
BlockingRpcCallback<MetaDataResponse> rpcCallback = new BlockingRpcCallback<MetaDataResponse>();
CreateSchemaRequest.Builder builder = CreateSchemaRequest.newBuilder();
for (Mutation m : schemaMutations) {
@@ -5872,7 +5906,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION,
PHOENIX_PATCH_NUMBER));
instance.createSchema(controller, builder.build(), rpcCallback);
- if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
+ checkForRemoteExceptions(controller);
return rpcCallback.get();
}
});
@@ -5896,7 +5930,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
+ RpcController controller = getController();
BlockingRpcCallback<MetaDataResponse> rpcCallback = new BlockingRpcCallback<MetaDataResponse>();
DropSchemaRequest.Builder builder = DropSchemaRequest.newBuilder();
for (Mutation m : schemaMetaData) {
@@ -5907,7 +5941,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION,
PHOENIX_PATCH_NUMBER));
instance.dropSchema(controller, builder.build(), rpcCallback);
- if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
+ checkForRemoteExceptions(controller);
return rpcCallback.get();
}
});
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index e51c680862..406c4a1a65 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -179,6 +179,7 @@ public interface QueryServices extends SQLCloseable {
"phoenix.index.failure.handling.rebuild.overlap.forward.time";
public static final String INDEX_PRIOIRTY_ATTRIB = "phoenix.index.rpc.priority";
public static final String METADATA_PRIOIRTY_ATTRIB = "phoenix.metadata.rpc.priority";
+ public static final String SERVER_SIDE_PRIOIRTY_ATTRIB = "phoenix.serverside.rpc.priority";
public static final String ALLOW_LOCAL_INDEX_ATTRIB = "phoenix.index.allowLocalIndex";
// Retries when doing server side writes to SYSTEM.CATALOG
@@ -243,7 +244,8 @@ public interface QueryServices extends SQLCloseable {
// rpc queue configs
public static final String INDEX_HANDLER_COUNT_ATTRIB = "phoenix.rpc.index.handler.count";
public static final String METADATA_HANDLER_COUNT_ATTRIB = "phoenix.rpc.metadata.handler.count";
-
+ public static final String SERVER_SIDE_HANDLER_COUNT_ATTRIB = "phoenix.rpc.serverside.handler.count";
+
public static final String FORCE_ROW_KEY_ORDER_ATTRIB = "phoenix.query.force.rowkeyorder";
public static final String ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB = "phoenix.functions.allowUserDefinedFunctions";
public static final String COLLECT_REQUEST_LEVEL_METRICS = "phoenix.query.request.metrics.enabled";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 6c8ada9e6e..849f023be6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -224,11 +224,13 @@ public class QueryServicesOptions {
* HConstants#HIGH_QOS is the max we will see to a standard table. We go higher to differentiate
* and give some room for things in the middle
*/
+ public static final int DEFAULT_SERVER_SIDE_PRIORITY = 500;
public static final int DEFAULT_INDEX_PRIORITY = 1000;
public static final int DEFAULT_METADATA_PRIORITY = 2000;
public static final boolean DEFAULT_ALLOW_LOCAL_INDEX = true;
public static final int DEFAULT_INDEX_HANDLER_COUNT = 30;
public static final int DEFAULT_METADATA_HANDLER_COUNT = 30;
+ public static final int DEFAULT_SERVERSIDE_HANDLER_COUNT = 30;
public static final int DEFAULT_SYSTEM_MAX_VERSIONS = 1;
public static final boolean DEFAULT_SYSTEM_KEEP_DELETED_CELLS = false;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index 5c7816ef3f..b409f056fd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -420,7 +420,8 @@ public final class QueryUtil {
private static Connection getConnection(Properties props, Configuration conf)
throws SQLException {
String url = getConnectionUrl(props, conf);
- LOGGER.info("Creating connection with the jdbc url: " + url);
+ LOGGER.info(String.format("Creating connection with the jdbc url: %s, isServerSide = %s",
+ url, props.getProperty(IS_SERVER_CONNECTION)));
props = PropertiesUtil.combineProperties(props, conf);
return DriverManager.getConnection(url, props);
}
diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
index 031c679559..15b83e41b5 100644
--- a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
+++ b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
@@ -67,7 +67,7 @@ public class PhoenixIndexRpcSchedulerTest {
RpcScheduler mock = Mockito.mock(RpcScheduler.class);
PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class);
Abortable abortable = new AbortServer();
- PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250,qosFunction,abortable);
+ PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250, 225, qosFunction,abortable);
BalancedQueueRpcExecutor executor = new BalancedQueueRpcExecutor("test-queue", 1, 1,qosFunction,conf,abortable);
scheduler.setIndexExecutorForTesting(executor);
dispatchCallWithPriority(scheduler, 200);
@@ -77,7 +77,7 @@ public class PhoenixIndexRpcSchedulerTest {
queue.poll(20, TimeUnit.SECONDS);
// try again, this time we tweak the ranges we support
- scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110,qosFunction,abortable);
+ scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110, 105, qosFunction,abortable);
scheduler.setIndexExecutorForTesting(executor);
dispatchCallWithPriority(scheduler, 101);
queue.poll(20, TimeUnit.SECONDS);
@@ -87,6 +87,49 @@ public class PhoenixIndexRpcSchedulerTest {
executor.stop();
}
+ @Test
+ public void testServerSideRPCalls() throws Exception {
+ RpcScheduler mock = Mockito.mock(RpcScheduler.class);
+ PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class);
+ Abortable abortable = new AbortServer();
+ PhoenixRpcScheduler scheduler1 = new PhoenixRpcScheduler(conf, mock, 200, 250, 100, qosFunction,abortable);
+ RpcExecutor executor1 = scheduler1.getServerSideExecutorForTesting();
+ for (int c = 0; c < 10; c++) {
+ dispatchCallWithPriority(scheduler1, 100);
+ }
+ List<BlockingQueue<CallRunner>> queues1 = executor1.getQueues();
+ int numDispatches1 = 0;
+ for (BlockingQueue<CallRunner> queue1 : queues1) {
+ if (queue1.size() > 0) {
+ numDispatches1 += queue1.size();
+ for (int i = 0; i < queue1.size(); i++) {
+ queue1.poll(20, TimeUnit.SECONDS);
+ }
+ }
+ }
+ assertEquals(10, numDispatches1);
+ scheduler1.stop();
+
+ // try again, with the incorrect executor
+ PhoenixRpcScheduler scheduler2 = new PhoenixRpcScheduler(conf, mock, 101, 110, 50, qosFunction,abortable);
+ RpcExecutor executor2 = scheduler2.getIndexExecutorForTesting();
+ dispatchCallWithPriority(scheduler2, 50);
+ List<BlockingQueue<CallRunner>> queues2 = executor2.getQueues();
+ int numDispatches2 = 0;
+ for (BlockingQueue<CallRunner> queue2 : queues2) {
+ if (queue2.size() > 0) {
+ numDispatches2++;
+ queue2.poll(20, TimeUnit.SECONDS);
+ }
+ }
+ assertEquals(0, numDispatches2);
+ scheduler2.stop();
+
+ Mockito.verify(mock, Mockito.times(numDispatches1+1)).init(Mockito.any(Context.class));
+ //Verify no dispatches to the default delegate handler
+ Mockito.verify(mock, Mockito.times(0)).dispatch(Mockito.any(CallRunner.class));
+ }
+
/**
* Test that we delegate to the passed {@link RpcScheduler} when the call priority is outside
* the index range
@@ -97,12 +140,12 @@ public class PhoenixIndexRpcSchedulerTest {
PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class);
Abortable abortable = new AbortServer();
RpcScheduler mock = Mockito.mock(RpcScheduler.class);
- PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250,qosFunction,abortable);
+ PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250, 225, qosFunction,abortable);
dispatchCallWithPriority(scheduler, 100);
dispatchCallWithPriority(scheduler, 251);
// try again, this time we tweak the ranges we support
- scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110,qosFunction,abortable);
+ scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110, 105, qosFunction,abortable);
dispatchCallWithPriority(scheduler, 200);
dispatchCallWithPriority(scheduler, 111);