You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2017/11/08 10:18:35 UTC
phoenix git commit: PHOENIX-4317 Update RPC controller to use the
updated APIs
Repository: phoenix
Updated Branches:
refs/heads/5.x-HBase-2.0 b989e55e0 -> 136c7a629
PHOENIX-4317 Update RPC controller to use the updated APIs
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/136c7a62
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/136c7a62
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/136c7a62
Branch: refs/heads/5.x-HBase-2.0
Commit: 136c7a62979167f93a9ce6d6bf7a47f34d73704b
Parents: b989e55
Author: Ankit Singhal <an...@gmail.com>
Authored: Wed Nov 8 15:46:26 2017 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Wed Nov 8 15:46:26 2017 +0530
----------------------------------------------------------------------
.../hadoop/hbase/ipc/PhoenixRpcScheduler.java | 43 ++++++++++++++++----
.../hbase/ipc/PhoenixRpcSchedulerFactory.java | 5 +--
.../controller/ClientRpcControllerFactory.java | 16 ++++----
.../ipc/controller/IndexRpcController.java | 8 ++--
...erRegionServerIndexRpcControllerFactory.java | 16 ++++----
...egionServerMetadataRpcControllerFactory.java | 16 ++++----
.../ipc/controller/MetadataRpcController.java | 8 ++--
7 files changed, 69 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/136c7a62/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
index 7712cc6..ea4b431 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
@@ -42,22 +43,20 @@ public class PhoenixRpcScheduler extends RpcScheduler {
private RpcExecutor indexCallExecutor;
private RpcExecutor metadataCallExecutor;
private int port;
+
- public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority, int metadataPriority) {
+ public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority, int metadataPriority, PriorityFunction priorityFunction, Abortable abortable) {
// copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4
int indexHandlerCount = conf.getInt(QueryServices.INDEX_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
int metadataHandlerCount = conf.getInt(QueryServices.METADATA_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
int maxIndexQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, indexHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
int maxMetadataQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, metadataHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
- float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
- int numIndexQueues = Math.max(1, Math.round(indexHandlerCount * callQueuesHandlersFactor));
- int numMetadataQueues = Math.max(1, Math.round(metadataHandlerCount * callQueuesHandlersFactor));
this.indexPriority = indexPriority;
this.metadataPriority = metadataPriority;
this.delegate = delegate;
- this.indexCallExecutor = new BalancedQueueRpcExecutor("Index", indexHandlerCount, numIndexQueues, maxIndexQueueLength);
- this.metadataCallExecutor = new BalancedQueueRpcExecutor("Metadata", metadataHandlerCount, numMetadataQueues, maxMetadataQueueLength);
+ this.indexCallExecutor = new BalancedQueueRpcExecutor("Index", indexHandlerCount, maxIndexQueueLength, priorityFunction,conf,abortable);
+ this.metadataCallExecutor = new BalancedQueueRpcExecutor("Metadata", metadataHandlerCount, maxMetadataQueueLength, priorityFunction,conf,abortable);
}
@Override
@@ -82,7 +81,7 @@ public class PhoenixRpcScheduler extends RpcScheduler {
@Override
public boolean dispatch(CallRunner callTask) throws InterruptedException, IOException {
- RpcServer.Call call = callTask.getCall();
+ ServerCall call = callTask.getCall();
int priority = call.header.getPriority();
if (indexPriority == priority) {
return indexCallExecutor.dispatch(callTask);
@@ -134,6 +133,36 @@ public class PhoenixRpcScheduler extends RpcScheduler {
public void setMetadataExecutorForTesting(RpcExecutor executor) {
this.metadataCallExecutor = executor;
}
+
+ @Override
+ public int getWriteQueueLength() {
+ return delegate.getWriteQueueLength();
+ }
+
+ @Override
+ public int getReadQueueLength() {
+ return delegate.getReadQueueLength();
+ }
+
+ @Override
+ public int getScanQueueLength() {
+ return delegate.getScanQueueLength();
+ }
+
+ @Override
+ public int getActiveWriteRpcHandlerCount() {
+ return delegate.getActiveWriteRpcHandlerCount();
+ }
+
+ @Override
+ public int getActiveReadRpcHandlerCount() {
+ return delegate.getActiveReadRpcHandlerCount();
+ }
+
+ @Override
+ public int getActiveScanRpcHandlerCount() {
+ return delegate.getActiveScanRpcHandlerCount();
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/136c7a62/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
index a697382..06cf708 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
@@ -22,9 +22,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ipc.PriorityFunction;
-import org.apache.hadoop.hbase.ipc.RpcScheduler;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
import org.apache.phoenix.query.QueryServices;
@@ -67,7 +64,7 @@ public class PhoenixRpcSchedulerFactory implements RpcSchedulerFactory {
LOG.info("Using custom Phoenix Index RPC Handling with index rpc priority " + indexPriority + " and metadata rpc priority " + metadataPriority);
PhoenixRpcScheduler scheduler =
- new PhoenixRpcScheduler(conf, delegate, indexPriority, metadataPriority);
+ new PhoenixRpcScheduler(conf, delegate, indexPriority, metadataPriority, priorityFunction,abortable);
return scheduler;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/136c7a62/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ClientRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ClientRpcControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ClientRpcControllerFactory.java
index 5a7dcc2..f90d640 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ClientRpcControllerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ClientRpcControllerFactory.java
@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
/**
@@ -36,24 +36,24 @@ public class ClientRpcControllerFactory extends RpcControllerFactory {
}
@Override
- public PayloadCarryingRpcController newController() {
- PayloadCarryingRpcController delegate = super.newController();
+ public HBaseRpcController newController() {
+ HBaseRpcController delegate = super.newController();
return getController(delegate);
}
@Override
- public PayloadCarryingRpcController newController(CellScanner cellScanner) {
- PayloadCarryingRpcController delegate = super.newController(cellScanner);
+ public HBaseRpcController newController(CellScanner cellScanner) {
+ HBaseRpcController delegate = super.newController(cellScanner);
return getController(delegate);
}
@Override
- public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) {
- PayloadCarryingRpcController delegate = super.newController(cellIterables);
+ public HBaseRpcController newController(List<CellScannable> cellIterables) {
+ HBaseRpcController delegate = super.newController(cellIterables);
return getController(delegate);
}
- private PayloadCarryingRpcController getController(PayloadCarryingRpcController delegate) {
+ private HBaseRpcController getController(HBaseRpcController delegate) {
return new MetadataRpcController(delegate, conf);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/136c7a62/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/IndexRpcController.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/IndexRpcController.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/IndexRpcController.java
index 86c4490..b8976ce 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/IndexRpcController.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/IndexRpcController.java
@@ -19,8 +19,8 @@ package org.apache.hadoop.hbase.ipc.controller;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
@@ -31,12 +31,12 @@ import com.google.protobuf.RpcController;
* {@link RpcController} that sets the appropriate priority of RPC calls destined for Phoenix index
* tables.
*/
-class IndexRpcController extends DelegatingPayloadCarryingRpcController {
+class IndexRpcController extends DelegatingHBaseRpcController {
private final int priority;
private final String tracingTableName;
- public IndexRpcController(PayloadCarryingRpcController delegate, Configuration conf) {
+ public IndexRpcController(HBaseRpcController delegate, Configuration conf) {
super(delegate);
this.priority = PhoenixRpcSchedulerFactory.getIndexPriority(conf);
this.tracingTableName = conf.get(QueryServices.TRACING_STATS_TABLE_NAME_ATTRIB,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/136c7a62/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerIndexRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerIndexRpcControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerIndexRpcControllerFactory.java
index 89b49b7..5a7f75f 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerIndexRpcControllerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerIndexRpcControllerFactory.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
/**
@@ -39,24 +39,24 @@ public class InterRegionServerIndexRpcControllerFactory extends RpcControllerFac
}
@Override
- public PayloadCarryingRpcController newController() {
- PayloadCarryingRpcController delegate = super.newController();
+ public HBaseRpcController newController() {
+ HBaseRpcController delegate = super.newController();
return getController(delegate);
}
@Override
- public PayloadCarryingRpcController newController(CellScanner cellScanner) {
- PayloadCarryingRpcController delegate = super.newController(cellScanner);
+ public HBaseRpcController newController(CellScanner cellScanner) {
+ HBaseRpcController delegate = super.newController(cellScanner);
return getController(delegate);
}
@Override
- public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) {
- PayloadCarryingRpcController delegate = super.newController(cellIterables);
+ public HBaseRpcController newController(List<CellScannable> cellIterables) {
+ HBaseRpcController delegate = super.newController(cellIterables);
return getController(delegate);
}
- private PayloadCarryingRpcController getController(PayloadCarryingRpcController delegate) {
+ private HBaseRpcController getController(HBaseRpcController delegate) {
// construct a chain of controllers: metadata, index and standard controller
IndexRpcController indexRpcController = new IndexRpcController(delegate, conf);
return new MetadataRpcController(indexRpcController, conf);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/136c7a62/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerMetadataRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerMetadataRpcControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerMetadataRpcControllerFactory.java
index ec4583b..37f3927 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerMetadataRpcControllerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerMetadataRpcControllerFactory.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
/**
@@ -37,24 +37,24 @@ public class InterRegionServerMetadataRpcControllerFactory extends RpcController
}
@Override
- public PayloadCarryingRpcController newController() {
- PayloadCarryingRpcController delegate = super.newController();
+ public HBaseRpcController newController() {
+ HBaseRpcController delegate = super.newController();
return getController(delegate);
}
@Override
- public PayloadCarryingRpcController newController(CellScanner cellScanner) {
- PayloadCarryingRpcController delegate = super.newController(cellScanner);
+ public HBaseRpcController newController(CellScanner cellScanner) {
+ HBaseRpcController delegate = super.newController(cellScanner);
return getController(delegate);
}
@Override
- public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) {
- PayloadCarryingRpcController delegate = super.newController(cellIterables);
+ public HBaseRpcController newController(List<CellScannable> cellIterables) {
+ HBaseRpcController delegate = super.newController(cellIterables);
return getController(delegate);
}
- private PayloadCarryingRpcController getController(PayloadCarryingRpcController delegate) {
+ private HBaseRpcController getController(HBaseRpcController delegate) {
// construct a chain of controllers: metadata and delegate controller
return new MetadataRpcController(delegate, conf);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/136c7a62/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
index e8fab25..cbeabaa 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
@@ -21,8 +21,8 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.util.SchemaUtil;
@@ -34,7 +34,7 @@ import com.google.protobuf.RpcController;
* {@link RpcController} that sets the appropriate priority of RPC calls destined for Phoenix SYSTEM
* tables
*/
-class MetadataRpcController extends DelegatingPayloadCarryingRpcController {
+class MetadataRpcController extends DelegatingHBaseRpcController {
private int priority;
// list of system tables
@@ -53,7 +53,7 @@ class MetadataRpcController extends DelegatingPayloadCarryingRpcController {
.getNameAsString())
.build();
- public MetadataRpcController(PayloadCarryingRpcController delegate,
+ public MetadataRpcController(HBaseRpcController delegate,
Configuration conf) {
super(delegate);
this.priority = PhoenixRpcSchedulerFactory.getMetadataPriority(conf);