You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2017/11/22 09:06:48 UTC
phoenix git commit: PHOENIX-4317 Update RPC controller to use the
updated APIs(Test fixes)
Repository: phoenix
Updated Branches:
refs/heads/5.x-HBase-2.0 4a3912904 -> 6db352aa3
PHOENIX-4317 Update RPC controller to use the updated APIs(Test fixes)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6db352aa
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6db352aa
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6db352aa
Branch: refs/heads/5.x-HBase-2.0
Commit: 6db352aa3acb2f33fc76f15487d22350b15ec0ec
Parents: 4a39129
Author: Ankit Singhal <an...@gmail.com>
Authored: Wed Nov 22 14:36:40 2017 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Wed Nov 22 14:36:40 2017 +0530
----------------------------------------------------------------------
.../hadoop/hbase/ipc/PhoenixRpcScheduler.java | 7 ++-
.../hbase/ipc/PhoenixIndexRpcSchedulerTest.java | 48 +++++++++++++++-----
2 files changed, 42 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6db352aa/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
index ea4b431..a27c205 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
@@ -82,7 +82,7 @@ public class PhoenixRpcScheduler extends RpcScheduler {
@Override
public boolean dispatch(CallRunner callTask) throws InterruptedException, IOException {
ServerCall call = callTask.getCall();
- int priority = call.header.getPriority();
+ int priority = call.getHeader().getPriority();
if (indexPriority == priority) {
return indexCallExecutor.dispatch(callTask);
} else if (metadataPriority == priority) {
@@ -93,6 +93,11 @@ public class PhoenixRpcScheduler extends RpcScheduler {
}
@Override
+ public CallQueueInfo getCallQueueInfo() {
+ return delegate.getCallQueueInfo();
+ }
+
+ @Override
public int getGeneralQueueLength() {
// not the best way to calculate, but don't have a better way to hook
// into metrics at the moment
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6db352aa/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
index 3a2780d..3b80159 100644
--- a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
+++ b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
@@ -17,18 +17,23 @@
*/
package org.apache.hadoop.hbase.ipc;
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
import java.net.InetSocketAddress;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ipc.RpcScheduler.Context;
-import org.apache.hadoop.hbase.ipc.RpcServer.Connection;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.junit.Test;
import org.mockito.Mockito;
@@ -41,12 +46,28 @@ public class PhoenixIndexRpcSchedulerTest {
private static final Configuration conf = HBaseConfiguration.create();
private static final InetSocketAddress isa = new InetSocketAddress("localhost", 0);
+
+ private class AbortServer implements Abortable {
+ private boolean aborted = false;
+
+ @Override
+ public void abort(String why, Throwable e) {
+ aborted = true;
+ }
+
+ @Override
+ public boolean isAborted() {
+ return aborted;
+ }
+ }
+
@Test
public void testIndexPriorityWritesToIndexHandler() throws Exception {
RpcScheduler mock = Mockito.mock(RpcScheduler.class);
-
- PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250);
- BalancedQueueRpcExecutor executor = new BalancedQueueRpcExecutor("test-queue", 1, 1, 1);
+ PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class);
+ Abortable abortable = new AbortServer();
+ PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250,qosFunction,abortable);
+ BalancedQueueRpcExecutor executor = new BalancedQueueRpcExecutor("test-queue", 1, 1,qosFunction,conf,abortable);
scheduler.setIndexExecutorForTesting(executor);
dispatchCallWithPriority(scheduler, 200);
List<BlockingQueue<CallRunner>> queues = executor.getQueues();
@@ -55,7 +76,7 @@ public class PhoenixIndexRpcSchedulerTest {
queue.poll(20, TimeUnit.SECONDS);
// try again, this time we tweak the ranges we support
- scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110);
+ scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110,qosFunction,abortable);
scheduler.setIndexExecutorForTesting(executor);
dispatchCallWithPriority(scheduler, 101);
queue.poll(20, TimeUnit.SECONDS);
@@ -72,13 +93,15 @@ public class PhoenixIndexRpcSchedulerTest {
*/
@Test
public void testDelegateWhenOutsideRange() throws Exception {
+ PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class);
+ Abortable abortable = new AbortServer();
RpcScheduler mock = Mockito.mock(RpcScheduler.class);
- PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250);
+ PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250,qosFunction,abortable);
dispatchCallWithPriority(scheduler, 100);
dispatchCallWithPriority(scheduler, 251);
// try again, this time we tweak the ranges we support
- scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110);
+ scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110,qosFunction,abortable);
dispatchCallWithPriority(scheduler, 200);
dispatchCallWithPriority(scheduler, 111);
@@ -88,12 +111,13 @@ public class PhoenixIndexRpcSchedulerTest {
}
private void dispatchCallWithPriority(RpcScheduler scheduler, int priority) throws Exception {
- Connection connection = Mockito.mock(Connection.class);
CallRunner task = Mockito.mock(CallRunner.class);
RequestHeader header = RequestHeader.newBuilder().setPriority(priority).build();
- RpcServer server = new RpcServer(null, "test-rpcserver", null, isa, conf, scheduler);
- RpcServer.Call call =
- server.new Call(0, null, null, header, null, null, connection, null, 10, null, null, 0);
+ RpcServer server = RpcServerFactory.createRpcServer(null, "test-rpcserver", Lists.newArrayList(new BlockingServiceAndInterface(
+ SERVICE, null)), isa, conf, scheduler);
+ ServerCall call = Mockito.mock(ServerCall.class);
+ when(call.getHeader()).thenReturn(header);
+ when(call.getRequestUser()).thenReturn(Optional.empty());
Mockito.when(task.getCall()).thenReturn(call);
scheduler.dispatch(task);