You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:29 UTC

[24/50] [abbrv] flink git commit: [hotfix] Add self rpc gateway registration to TestingSerialRpcService

[hotfix] Add self rpc gateway registration to TestingSerialRpcService


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73618057
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73618057
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73618057

Branch: refs/heads/flip-6
Commit: 73618057b0a40f2a3ba3db509d43d0a45c3f384c
Parents: e114d13
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Sep 2 14:51:16 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 2 23:44:44 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/rpc/TestingSerialRpcService.java  | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/73618057/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 7bdbb99..955edcc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -43,7 +43,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 
 
 /**
- * An RPC Service implementation for testing. This RPC service directly executes all asynchronous calls one by one in the main thread.
+ * An RPC Service implementation for testing. This RPC service directly executes all asynchronous
+ * calls one by one in the calling thread.
  */
 public class TestingSerialRpcService implements RpcService {
 
@@ -52,7 +53,7 @@ public class TestingSerialRpcService implements RpcService {
 
 	public TestingSerialRpcService() {
 		executorService = new DirectExecutorService();
-		this.registeredConnections = new ConcurrentHashMap<>();
+		this.registeredConnections = new ConcurrentHashMap<>(16);
 	}
 
 	@Override
@@ -78,14 +79,14 @@ public class TestingSerialRpcService implements RpcService {
 
 	@Override
 	public void stopServer(RpcGateway selfGateway) {
-
+		registeredConnections.remove(selfGateway.getAddress());
 	}
 
 	@Override
 	public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint) {
 		final String address = UUID.randomUUID().toString();
 
-		InvocationHandler akkaInvocationHandler = new TestingSerialInvocationHandler(address, rpcEndpoint);
+		InvocationHandler akkaInvocationHandler = new TestingSerialRpcService.TestingSerialInvocationHandler<>(address, rpcEndpoint);
 		ClassLoader classLoader = getClass().getClassLoader();
 
 		@SuppressWarnings("unchecked")
@@ -99,6 +100,9 @@ public class TestingSerialRpcService implements RpcService {
 			},
 			akkaInvocationHandler);
 
+		// register self
+		registeredConnections.putIfAbsent(self.getAddress(), self);
+
 		return self;
 	}
 
@@ -133,7 +137,7 @@ public class TestingSerialRpcService implements RpcService {
 		}
 	}
 
-	private static class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable {
+	private static final class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable {
 
 		private final T rpcEndpoint;
 
@@ -197,7 +201,7 @@ public class TestingSerialRpcService implements RpcService {
 			final Method rpcMethod = lookupRpcMethod(methodName, parameterTypes);
 			Object result = rpcMethod.invoke(rpcEndpoint, args);
 
-			if (result != null && result instanceof Future) {
+			if (result instanceof Future) {
 				Future<?> future = (Future<?>) result;
 				return Await.result(future, futureTimeout.duration());
 			} else {