You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/01/04 07:39:13 UTC
[11/50] [abbrv] hbase git commit: HBASE-17262 Refactor RpcServer so
as to make it extendable and/or pluggable
HBASE-17262 Refactor RpcServer so as to make it extendable and/or pluggable
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fc93de51
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fc93de51
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fc93de51
Branch: refs/heads/hbase-12439
Commit: fc93de51aff2c917a2b89694cf16ca37ccde6723
Parents: d787155
Author: binlijin <bi...@gmail.com>
Authored: Thu Dec 22 14:49:56 2016 +0800
Committer: binlijin <bi...@gmail.com>
Committed: Thu Dec 22 14:49:56 2016 +0800
----------------------------------------------------------------------
.../hbase/ipc/IntegrationTestRpcClient.java | 53 +-
.../org/apache/hadoop/hbase/ipc/CallRunner.java | 3 +-
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 2127 ++----------------
.../hadoop/hbase/ipc/RpcServerFactory.java | 58 +
.../hadoop/hbase/ipc/SimpleRpcServer.java | 1997 ++++++++++++++++
.../hbase/regionserver/RSRpcServices.java | 3 +-
.../hadoop/hbase/ipc/AbstractTestIPC.java | 85 +-
.../hadoop/hbase/ipc/TestProtoBufRpc.java | 2 +-
.../hbase/ipc/TestRpcHandlerException.java | 19 +-
.../hadoop/hbase/security/TestSecureIPC.java | 3 +-
.../security/token/TestTokenAuthentication.java | 11 +-
11 files changed, 2283 insertions(+), 2078 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/fc93de51/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
index 7ce86bd..219a4e0 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -25,11 +25,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -45,20 +40,20 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
-import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import com.google.common.collect.Lists;
+
@Category(IntegrationTests.class)
public class IntegrationTestRpcClient {
@@ -72,26 +67,6 @@ public class IntegrationTestRpcClient {
conf = HBaseConfiguration.create();
}
- static class TestRpcServer extends RpcServer {
-
- TestRpcServer(Configuration conf) throws IOException {
- this(new FifoRpcScheduler(conf, 1), conf);
- }
-
- TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
- super(null, "testRpcServer", Lists
- .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
- "localhost", 0), conf, scheduler);
- }
-
- @Override
- public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
- Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
- throws IOException {
- return super.call(service, md, param, cellScanner, receiveTime, status);
- }
- }
-
protected AbstractRpcClient<?> createRpcClient(Configuration conf, boolean isSyncClient) {
return isSyncClient ? new BlockingRpcClient(conf) : new NettyRpcClient(conf) {
@Override
@@ -116,8 +91,8 @@ public class IntegrationTestRpcClient {
class Cluster {
Random random = new Random();
ReadWriteLock lock = new ReentrantReadWriteLock();
- HashMap<InetSocketAddress, TestRpcServer> rpcServers = new HashMap<>();
- List<TestRpcServer> serverList = new ArrayList<>();
+ HashMap<InetSocketAddress, RpcServer> rpcServers = new HashMap<>();
+ List<RpcServer> serverList = new ArrayList<>();
int maxServers;
int minServers;
@@ -126,14 +101,18 @@ public class IntegrationTestRpcClient {
this.maxServers = maxServers;
}
- TestRpcServer startServer() throws IOException {
+ RpcServer startServer() throws IOException {
lock.writeLock().lock();
try {
if (rpcServers.size() >= maxServers) {
return null;
}
- TestRpcServer rpcServer = new TestRpcServer(conf);
+ RpcServer rpcServer = RpcServerFactory.createRpcServer(null,
+ "testRpcServer", Lists
+ .newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
+ new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(
+ conf, 1));
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
if (address == null) {
@@ -150,7 +129,7 @@ public class IntegrationTestRpcClient {
void stopRandomServer() throws Exception {
lock.writeLock().lock();
- TestRpcServer rpcServer = null;
+ RpcServer rpcServer = null;
try {
if (rpcServers.size() <= minServers) {
return;
@@ -174,7 +153,7 @@ public class IntegrationTestRpcClient {
}
}
- void stopServer(TestRpcServer rpcServer) throws InterruptedException {
+ void stopServer(RpcServer rpcServer) throws InterruptedException {
InetSocketAddress address = rpcServer.getListenerAddress();
LOG.info("Stopping server: " + address);
rpcServer.stop();
@@ -185,7 +164,7 @@ public class IntegrationTestRpcClient {
void stopRunning() throws InterruptedException {
lock.writeLock().lock();
try {
- for (TestRpcServer rpcServer : serverList) {
+ for (RpcServer rpcServer : serverList) {
stopServer(rpcServer);
}
@@ -194,7 +173,7 @@ public class IntegrationTestRpcClient {
}
}
- TestRpcServer getRandomServer() {
+ RpcServer getRandomServer() {
lock.readLock().lock();
try {
int size = rpcServers.size();
@@ -278,7 +257,7 @@ public class IntegrationTestRpcClient {
String message = isBigPayload ? BIG_PAYLOAD : id + numCalls;
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
EchoResponseProto ret;
- TestRpcServer server = cluster.getRandomServer();
+ RpcServer server = cluster.getRandomServer();
try {
sending.set(true);
BlockingInterface stub = newBlockingStub(rpcClient, server.getListenerAddress());
http://git-wip-us.apache.org/repos/asf/hbase/blob/fc93de51/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index 5301a67..0aabc10 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -1,4 +1,3 @@
-package org.apache.hadoop.hbase.ipc;
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -16,6 +15,8 @@ package org.apache.hadoop.hbase.ipc;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.hadoop.hbase.ipc;
+
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;