You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2017/11/08 10:18:35 UTC

phoenix git commit: PHOENIX-4317 Update RPC controller to use the updated APIs

Repository: phoenix
Updated Branches:
  refs/heads/5.x-HBase-2.0 b989e55e0 -> 136c7a629


PHOENIX-4317 Update RPC controller to use the updated APIs


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/136c7a62
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/136c7a62
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/136c7a62

Branch: refs/heads/5.x-HBase-2.0
Commit: 136c7a62979167f93a9ce6d6bf7a47f34d73704b
Parents: b989e55
Author: Ankit Singhal <an...@gmail.com>
Authored: Wed Nov 8 15:46:26 2017 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Wed Nov 8 15:46:26 2017 +0530

----------------------------------------------------------------------
 .../hadoop/hbase/ipc/PhoenixRpcScheduler.java   | 43 ++++++++++++++++----
 .../hbase/ipc/PhoenixRpcSchedulerFactory.java   |  5 +--
 .../controller/ClientRpcControllerFactory.java  | 16 ++++----
 .../ipc/controller/IndexRpcController.java      |  8 ++--
 ...erRegionServerIndexRpcControllerFactory.java | 16 ++++----
 ...egionServerMetadataRpcControllerFactory.java | 16 ++++----
 .../ipc/controller/MetadataRpcController.java   |  8 ++--
 7 files changed, 69 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/136c7a62/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
index 7712cc6..ea4b431 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.ipc;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 
@@ -42,22 +43,20 @@ public class PhoenixRpcScheduler extends RpcScheduler {
     private RpcExecutor indexCallExecutor;
     private RpcExecutor metadataCallExecutor;
     private int port;
+    
 
-    public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority, int metadataPriority) {
+    public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority, int metadataPriority, PriorityFunction priorityFunction, Abortable abortable) {
         // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4
     	int indexHandlerCount = conf.getInt(QueryServices.INDEX_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
     	int metadataHandlerCount = conf.getInt(QueryServices.METADATA_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
         int maxIndexQueueLength =  conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, indexHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
         int maxMetadataQueueLength =  conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, metadataHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
-        float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
-        int numIndexQueues = Math.max(1, Math.round(indexHandlerCount * callQueuesHandlersFactor));
-        int numMetadataQueues = Math.max(1, Math.round(metadataHandlerCount * callQueuesHandlersFactor));
 
         this.indexPriority = indexPriority;
         this.metadataPriority = metadataPriority;
         this.delegate = delegate;
-        this.indexCallExecutor = new BalancedQueueRpcExecutor("Index", indexHandlerCount, numIndexQueues, maxIndexQueueLength);
-        this.metadataCallExecutor = new BalancedQueueRpcExecutor("Metadata", metadataHandlerCount, numMetadataQueues, maxMetadataQueueLength);
+        this.indexCallExecutor = new BalancedQueueRpcExecutor("Index", indexHandlerCount, maxIndexQueueLength, priorityFunction,conf,abortable);
+        this.metadataCallExecutor = new BalancedQueueRpcExecutor("Metadata", metadataHandlerCount, maxMetadataQueueLength, priorityFunction,conf,abortable);
     }
 
     @Override
@@ -82,7 +81,7 @@ public class PhoenixRpcScheduler extends RpcScheduler {
 
     @Override
     public boolean dispatch(CallRunner callTask) throws InterruptedException, IOException {
-        RpcServer.Call call = callTask.getCall();
+        ServerCall call = callTask.getCall();
         int priority = call.header.getPriority();
         if (indexPriority == priority) {
             return indexCallExecutor.dispatch(callTask);
@@ -134,6 +133,36 @@ public class PhoenixRpcScheduler extends RpcScheduler {
     public void setMetadataExecutorForTesting(RpcExecutor executor) {
         this.metadataCallExecutor = executor;
     }
+
+    @Override
+    public int getWriteQueueLength() {
+        return delegate.getWriteQueueLength();
+    }
+
+    @Override
+    public int getReadQueueLength() {
+        return delegate.getReadQueueLength();
+    }
+
+    @Override
+    public int getScanQueueLength() {
+        return delegate.getScanQueueLength();
+    }
+
+    @Override
+    public int getActiveWriteRpcHandlerCount() {
+        return delegate.getActiveWriteRpcHandlerCount();
+    }
+
+    @Override
+    public int getActiveReadRpcHandlerCount() {
+        return delegate.getActiveReadRpcHandlerCount();
+    }
+
+    @Override
+    public int getActiveScanRpcHandlerCount() {
+        return delegate.getActiveScanRpcHandlerCount();
+    }
     
     
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/136c7a62/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
index a697382..06cf708 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java
@@ -22,9 +22,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ipc.PriorityFunction;
-import org.apache.hadoop.hbase.ipc.RpcScheduler;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
 import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
 import org.apache.phoenix.query.QueryServices;
@@ -67,7 +64,7 @@ public class PhoenixRpcSchedulerFactory implements RpcSchedulerFactory {
         LOG.info("Using custom Phoenix Index RPC Handling with index rpc priority " + indexPriority + " and metadata rpc priority " + metadataPriority);
 
         PhoenixRpcScheduler scheduler =
-                new PhoenixRpcScheduler(conf, delegate, indexPriority, metadataPriority);
+                new PhoenixRpcScheduler(conf, delegate, indexPriority, metadataPriority, priorityFunction,abortable);
         return scheduler;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/136c7a62/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ClientRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ClientRpcControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ClientRpcControllerFactory.java
index 5a7dcc2..f90d640 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ClientRpcControllerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ClientRpcControllerFactory.java
@@ -22,7 +22,7 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 
 /**
@@ -36,24 +36,24 @@ public class ClientRpcControllerFactory extends RpcControllerFactory {
     }
 
     @Override
-    public PayloadCarryingRpcController newController() {
-        PayloadCarryingRpcController delegate = super.newController();
+    public HBaseRpcController newController() {
+        HBaseRpcController delegate = super.newController();
         return getController(delegate);
     }
 
     @Override
-    public PayloadCarryingRpcController newController(CellScanner cellScanner) {
-        PayloadCarryingRpcController delegate = super.newController(cellScanner);
+    public HBaseRpcController newController(CellScanner cellScanner) {
+        HBaseRpcController delegate = super.newController(cellScanner);
         return getController(delegate);
     }
 
     @Override
-    public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) {
-        PayloadCarryingRpcController delegate = super.newController(cellIterables);
+    public HBaseRpcController newController(List<CellScannable> cellIterables) {
+        HBaseRpcController delegate = super.newController(cellIterables);
         return getController(delegate);
     }
     
-    private PayloadCarryingRpcController getController(PayloadCarryingRpcController delegate) {
+    private HBaseRpcController getController(HBaseRpcController delegate) {
 		return new MetadataRpcController(delegate, conf);
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/136c7a62/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/IndexRpcController.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/IndexRpcController.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/IndexRpcController.java
index 86c4490..b8976ce 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/IndexRpcController.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/IndexRpcController.java
@@ -19,8 +19,8 @@ package org.apache.hadoop.hbase.ipc.controller;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -31,12 +31,12 @@ import com.google.protobuf.RpcController;
  * {@link RpcController} that sets the appropriate priority of RPC calls destined for Phoenix index
  * tables.
  */
-class IndexRpcController extends DelegatingPayloadCarryingRpcController {
+class IndexRpcController extends DelegatingHBaseRpcController {
 
     private final int priority;
     private final String tracingTableName;
     
-    public IndexRpcController(PayloadCarryingRpcController delegate, Configuration conf) {
+    public IndexRpcController(HBaseRpcController delegate, Configuration conf) {
         super(delegate);
         this.priority = PhoenixRpcSchedulerFactory.getIndexPriority(conf);
         this.tracingTableName = conf.get(QueryServices.TRACING_STATS_TABLE_NAME_ATTRIB,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/136c7a62/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerIndexRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerIndexRpcControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerIndexRpcControllerFactory.java
index 89b49b7..5a7f75f 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerIndexRpcControllerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerIndexRpcControllerFactory.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 
 /**
@@ -39,24 +39,24 @@ public class InterRegionServerIndexRpcControllerFactory extends RpcControllerFac
     }
 
     @Override
-    public PayloadCarryingRpcController newController() {
-        PayloadCarryingRpcController delegate = super.newController();
+    public HBaseRpcController newController() {
+        HBaseRpcController delegate = super.newController();
         return getController(delegate);
     }
 
     @Override
-    public PayloadCarryingRpcController newController(CellScanner cellScanner) {
-        PayloadCarryingRpcController delegate = super.newController(cellScanner);
+    public HBaseRpcController newController(CellScanner cellScanner) {
+        HBaseRpcController delegate = super.newController(cellScanner);
         return getController(delegate);
     }
 
     @Override
-    public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) {
-        PayloadCarryingRpcController delegate = super.newController(cellIterables);
+    public HBaseRpcController newController(List<CellScannable> cellIterables) {
+        HBaseRpcController delegate = super.newController(cellIterables);
         return getController(delegate);
     }
 
-    private PayloadCarryingRpcController getController(PayloadCarryingRpcController delegate) {
+    private HBaseRpcController getController(HBaseRpcController delegate) {
         // construct a chain of controllers: metadata, index and standard controller
         IndexRpcController indexRpcController = new IndexRpcController(delegate, conf);
         return new MetadataRpcController(indexRpcController, conf);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/136c7a62/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerMetadataRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerMetadataRpcControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerMetadataRpcControllerFactory.java
index ec4583b..37f3927 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerMetadataRpcControllerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerMetadataRpcControllerFactory.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 
 /**
@@ -37,24 +37,24 @@ public class InterRegionServerMetadataRpcControllerFactory extends RpcController
     }
 
     @Override
-    public PayloadCarryingRpcController newController() {
-        PayloadCarryingRpcController delegate = super.newController();
+    public HBaseRpcController newController() {
+        HBaseRpcController delegate = super.newController();
         return getController(delegate);
     }
 
     @Override
-    public PayloadCarryingRpcController newController(CellScanner cellScanner) {
-        PayloadCarryingRpcController delegate = super.newController(cellScanner);
+    public HBaseRpcController newController(CellScanner cellScanner) {
+        HBaseRpcController delegate = super.newController(cellScanner);
         return getController(delegate);
     }
 
     @Override
-    public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) {
-        PayloadCarryingRpcController delegate = super.newController(cellIterables);
+    public HBaseRpcController newController(List<CellScannable> cellIterables) {
+        HBaseRpcController delegate = super.newController(cellIterables);
         return getController(delegate);
     }
 
-    private PayloadCarryingRpcController getController(PayloadCarryingRpcController delegate) {
+    private HBaseRpcController getController(HBaseRpcController delegate) {
         // construct a chain of controllers: metadata and delegate controller
         return new MetadataRpcController(delegate, conf);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/136c7a62/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
index e8fab25..cbeabaa 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java
@@ -21,8 +21,8 @@ import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.util.SchemaUtil;
@@ -34,7 +34,7 @@ import com.google.protobuf.RpcController;
  * {@link RpcController} that sets the appropriate priority of RPC calls destined for Phoenix SYSTEM
  * tables
  */
-class MetadataRpcController extends DelegatingPayloadCarryingRpcController {
+class MetadataRpcController extends DelegatingHBaseRpcController {
 
 	private int priority;
 	// list of system tables
@@ -53,7 +53,7 @@ class MetadataRpcController extends DelegatingPayloadCarryingRpcController {
                     .getNameAsString())
             .build();
 
-	public MetadataRpcController(PayloadCarryingRpcController delegate,
+	public MetadataRpcController(HBaseRpcController delegate,
 			Configuration conf) {
 		super(delegate);
 		this.priority = PhoenixRpcSchedulerFactory.getMetadataPriority(conf);