You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2017/11/22 09:06:48 UTC

phoenix git commit: PHOENIX-4317 Update RPC controller to use the updated APIs(Test fixes)

Repository: phoenix
Updated Branches:
  refs/heads/5.x-HBase-2.0 4a3912904 -> 6db352aa3


PHOENIX-4317 Update RPC controller to use the updated APIs(Test fixes)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6db352aa
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6db352aa
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6db352aa

Branch: refs/heads/5.x-HBase-2.0
Commit: 6db352aa3acb2f33fc76f15487d22350b15ec0ec
Parents: 4a39129
Author: Ankit Singhal <an...@gmail.com>
Authored: Wed Nov 22 14:36:40 2017 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Wed Nov 22 14:36:40 2017 +0530

----------------------------------------------------------------------
 .../hadoop/hbase/ipc/PhoenixRpcScheduler.java   |  7 ++-
 .../hbase/ipc/PhoenixIndexRpcSchedulerTest.java | 48 +++++++++++++++-----
 2 files changed, 42 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/6db352aa/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
index ea4b431..a27c205 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
@@ -82,7 +82,7 @@ public class PhoenixRpcScheduler extends RpcScheduler {
     @Override
     public boolean dispatch(CallRunner callTask) throws InterruptedException, IOException {
         ServerCall call = callTask.getCall();
-        int priority = call.header.getPriority();
+        int priority = call.getHeader().getPriority();
         if (indexPriority == priority) {
             return indexCallExecutor.dispatch(callTask);
         } else if (metadataPriority == priority) {
@@ -93,6 +93,11 @@ public class PhoenixRpcScheduler extends RpcScheduler {
     }
 
     @Override
+    public CallQueueInfo getCallQueueInfo() {
+        return delegate.getCallQueueInfo();
+    }
+
+    @Override
     public int getGeneralQueueLength() {
         // not the best way to calculate, but don't have a better way to hook
         // into metrics at the moment

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6db352aa/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
index 3a2780d..3b80159 100644
--- a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
+++ b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
@@ -17,18 +17,23 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
 
 import java.net.InetSocketAddress;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.ipc.RpcScheduler.Context;
-import org.apache.hadoop.hbase.ipc.RpcServer.Connection;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -41,12 +46,28 @@ public class PhoenixIndexRpcSchedulerTest {
     private static final Configuration conf = HBaseConfiguration.create();
     private static final InetSocketAddress isa = new InetSocketAddress("localhost", 0);
 
+    
+    private class AbortServer implements Abortable {
+      private boolean aborted = false;
+
+      @Override
+      public void abort(String why, Throwable e) {
+        aborted = true;
+      }
+
+      @Override
+      public boolean isAborted() {
+        return aborted;
+      }
+    }
+
     @Test
     public void testIndexPriorityWritesToIndexHandler() throws Exception {
         RpcScheduler mock = Mockito.mock(RpcScheduler.class);
-
-        PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250);
-        BalancedQueueRpcExecutor executor = new BalancedQueueRpcExecutor("test-queue", 1, 1, 1);
+        PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class);
+        Abortable abortable = new AbortServer();
+        PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250,qosFunction,abortable);
+        BalancedQueueRpcExecutor executor = new BalancedQueueRpcExecutor("test-queue", 1, 1,qosFunction,conf,abortable);
         scheduler.setIndexExecutorForTesting(executor);
         dispatchCallWithPriority(scheduler, 200);
         List<BlockingQueue<CallRunner>> queues = executor.getQueues();
@@ -55,7 +76,7 @@ public class PhoenixIndexRpcSchedulerTest {
         queue.poll(20, TimeUnit.SECONDS);
 
         // try again, this time we tweak the ranges we support
-        scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110);
+        scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110,qosFunction,abortable);
         scheduler.setIndexExecutorForTesting(executor);
         dispatchCallWithPriority(scheduler, 101);
         queue.poll(20, TimeUnit.SECONDS);
@@ -72,13 +93,15 @@ public class PhoenixIndexRpcSchedulerTest {
      */
     @Test
     public void testDelegateWhenOutsideRange() throws Exception {
+        PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class);
+        Abortable abortable = new AbortServer();
         RpcScheduler mock = Mockito.mock(RpcScheduler.class);
-        PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250);
+        PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250,qosFunction,abortable);
         dispatchCallWithPriority(scheduler, 100);
         dispatchCallWithPriority(scheduler, 251);
 
         // try again, this time we tweak the ranges we support
-        scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110);
+        scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110,qosFunction,abortable);
         dispatchCallWithPriority(scheduler, 200);
         dispatchCallWithPriority(scheduler, 111);
 
@@ -88,12 +111,13 @@ public class PhoenixIndexRpcSchedulerTest {
     }
 
     private void dispatchCallWithPriority(RpcScheduler scheduler, int priority) throws Exception {
-        Connection connection = Mockito.mock(Connection.class);
         CallRunner task = Mockito.mock(CallRunner.class);
         RequestHeader header = RequestHeader.newBuilder().setPriority(priority).build();
-        RpcServer server = new RpcServer(null, "test-rpcserver", null, isa, conf, scheduler);
-        RpcServer.Call call =
-                server.new Call(0, null, null, header, null, null, connection, null, 10, null, null, 0);
+        RpcServer server = RpcServerFactory.createRpcServer(null, "test-rpcserver", Lists.newArrayList(new BlockingServiceAndInterface(
+                SERVICE, null)), isa, conf, scheduler);
+        ServerCall call = Mockito.mock(ServerCall.class);
+        when(call.getHeader()).thenReturn(header);
+        when(call.getRequestUser()).thenReturn(Optional.empty());
         Mockito.when(task.getCall()).thenReturn(call);
 
         scheduler.dispatch(task);