You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/01/04 07:39:13 UTC

[11/50] [abbrv] hbase git commit: HBASE-17262 Refactor RpcServer so as to make it extendable and/or pluggable

HBASE-17262 Refactor RpcServer so as to make it extendable and/or pluggable


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fc93de51
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fc93de51
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fc93de51

Branch: refs/heads/hbase-12439
Commit: fc93de51aff2c917a2b89694cf16ca37ccde6723
Parents: d787155
Author: binlijin <bi...@gmail.com>
Authored: Thu Dec 22 14:49:56 2016 +0800
Committer: binlijin <bi...@gmail.com>
Committed: Thu Dec 22 14:49:56 2016 +0800

----------------------------------------------------------------------
 .../hbase/ipc/IntegrationTestRpcClient.java     |   53 +-
 .../org/apache/hadoop/hbase/ipc/CallRunner.java |    3 +-
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  | 2127 ++----------------
 .../hadoop/hbase/ipc/RpcServerFactory.java      |   58 +
 .../hadoop/hbase/ipc/SimpleRpcServer.java       | 1997 ++++++++++++++++
 .../hbase/regionserver/RSRpcServices.java       |    3 +-
 .../hadoop/hbase/ipc/AbstractTestIPC.java       |   85 +-
 .../hadoop/hbase/ipc/TestProtoBufRpc.java       |    2 +-
 .../hbase/ipc/TestRpcHandlerException.java      |   19 +-
 .../hadoop/hbase/security/TestSecureIPC.java    |    3 +-
 .../security/token/TestTokenAuthentication.java |   11 +-
 11 files changed, 2283 insertions(+), 2078 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/fc93de51/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
index 7ce86bd..219a4e0 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -25,11 +25,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-import com.google.common.collect.Lists;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -45,20 +40,20 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
-import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.google.common.collect.Lists;
+
 @Category(IntegrationTests.class)
 public class IntegrationTestRpcClient {
 
@@ -72,26 +67,6 @@ public class IntegrationTestRpcClient {
     conf = HBaseConfiguration.create();
   }
 
-  static class TestRpcServer extends RpcServer {
-
-    TestRpcServer(Configuration conf) throws IOException {
-      this(new FifoRpcScheduler(conf, 1), conf);
-    }
-
-    TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
-      super(null, "testRpcServer", Lists
-          .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
-          "localhost", 0), conf, scheduler);
-    }
-
-    @Override
-    public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
-        Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
-        throws IOException {
-      return super.call(service, md, param, cellScanner, receiveTime, status);
-    }
-  }
-
   protected AbstractRpcClient<?> createRpcClient(Configuration conf, boolean isSyncClient) {
     return isSyncClient ? new BlockingRpcClient(conf) : new NettyRpcClient(conf) {
       @Override
@@ -116,8 +91,8 @@ public class IntegrationTestRpcClient {
   class Cluster {
     Random random = new Random();
     ReadWriteLock lock = new ReentrantReadWriteLock();
-    HashMap<InetSocketAddress, TestRpcServer> rpcServers = new HashMap<>();
-    List<TestRpcServer> serverList = new ArrayList<>();
+    HashMap<InetSocketAddress, RpcServer> rpcServers = new HashMap<>();
+    List<RpcServer> serverList = new ArrayList<>();
     int maxServers;
     int minServers;
 
@@ -126,14 +101,18 @@ public class IntegrationTestRpcClient {
       this.maxServers = maxServers;
     }
 
-    TestRpcServer startServer() throws IOException {
+    RpcServer startServer() throws IOException {
       lock.writeLock().lock();
       try {
         if (rpcServers.size() >= maxServers) {
           return null;
         }
 
-        TestRpcServer rpcServer = new TestRpcServer(conf);
+        RpcServer rpcServer = RpcServerFactory.createRpcServer(null,
+            "testRpcServer", Lists
+                .newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
+            new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(
+                conf, 1));
         rpcServer.start();
         InetSocketAddress address = rpcServer.getListenerAddress();
         if (address == null) {
@@ -150,7 +129,7 @@ public class IntegrationTestRpcClient {
 
     void stopRandomServer() throws Exception {
       lock.writeLock().lock();
-      TestRpcServer rpcServer = null;
+      RpcServer rpcServer = null;
       try {
         if (rpcServers.size() <= minServers) {
           return;
@@ -174,7 +153,7 @@ public class IntegrationTestRpcClient {
       }
     }
 
-    void stopServer(TestRpcServer rpcServer) throws InterruptedException {
+    void stopServer(RpcServer rpcServer) throws InterruptedException {
       InetSocketAddress address = rpcServer.getListenerAddress();
       LOG.info("Stopping server: " + address);
       rpcServer.stop();
@@ -185,7 +164,7 @@ public class IntegrationTestRpcClient {
     void stopRunning() throws InterruptedException {
       lock.writeLock().lock();
       try {
-        for (TestRpcServer rpcServer : serverList) {
+        for (RpcServer rpcServer : serverList) {
           stopServer(rpcServer);
         }
 
@@ -194,7 +173,7 @@ public class IntegrationTestRpcClient {
       }
     }
 
-    TestRpcServer getRandomServer() {
+    RpcServer getRandomServer() {
       lock.readLock().lock();
       try {
         int size = rpcServers.size();
@@ -278,7 +257,7 @@ public class IntegrationTestRpcClient {
         String message = isBigPayload ? BIG_PAYLOAD : id + numCalls;
         EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
         EchoResponseProto ret;
-        TestRpcServer server = cluster.getRandomServer();
+        RpcServer server = cluster.getRandomServer();
         try {
           sending.set(true);
           BlockingInterface stub = newBlockingStub(rpcClient, server.getListenerAddress());

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc93de51/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index 5301a67..0aabc10 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -1,4 +1,3 @@
-package org.apache.hadoop.hbase.ipc;
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -16,6 +15,8 @@ package org.apache.hadoop.hbase.ipc;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.hadoop.hbase.ipc;
+
 import java.net.InetSocketAddress;
 import java.nio.channels.ClosedChannelException;