You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/06 11:48:58 UTC

[38/50] [abbrv] flink git commit: [FLINK-4580] [rpc] Report rpc invocation exceptions to the caller

[FLINK-4580] [rpc] Report rpc invocation exceptions to the caller

This closes #2526.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6069d6e5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6069d6e5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6069d6e5

Branch: refs/heads/flip-6
Commit: 6069d6e55a7a5412a23a28683e94beda0c2e2cd7
Parents: 474ace7
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Sep 21 15:18:27 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 6 13:38:44 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    | 53 ++++++++++++--------
 .../runtime/rpc/akka/AkkaRpcActorTest.java      | 34 +++++++++++++
 2 files changed, 66 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6069d6e5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 59daa46..1b456a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.rpc.akka.messages.Processing;
 import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
 
+import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,11 +87,11 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
 			unstashAll();
 			getContext().become(new Procedure<Object>() {
 				@Override
-				public void apply(Object message) throws Exception {
-					if (message.equals(Processing.STOP)) {
+				public void apply(Object msg) throws Exception {
+					if (msg.equals(Processing.STOP)) {
 						getContext().unbecome();
 					} else {
-						handleMessage(message);
+						handleMessage(msg);
 					}
 				}
 			});
@@ -130,21 +131,36 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
 	 * @param rpcInvocation Rpc invocation message
 	 */
 	private void handleRpcInvocation(RpcInvocation rpcInvocation) {
+		Method rpcMethod = null;
+
 		try {
 			String methodName = rpcInvocation.getMethodName();
 			Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();
 
-			Method rpcMethod = lookupRpcMethod(methodName, parameterTypes);
+			rpcMethod = lookupRpcMethod(methodName, parameterTypes);
+		} catch(ClassNotFoundException e) {
+			LOG.error("Could not load method arguments.", e);
+
+			RpcConnectionException rpcException = new RpcConnectionException("Could not load method arguments.", e);
+			getSender().tell(new Status.Failure(rpcException), getSelf());
+		} catch (IOException e) {
+			LOG.error("Could not deserialize rpc invocation message.", e);
+
+			RpcConnectionException rpcException = new RpcConnectionException("Could not deserialize rpc invocation message.", e);
+			getSender().tell(new Status.Failure(rpcException), getSelf());
+		} catch (final NoSuchMethodException e) {
+			LOG.error("Could not find rpc method for rpc invocation.", e);
+
+			RpcConnectionException rpcException = new RpcConnectionException("Could not find rpc method for rpc invocation.", e);
+			getSender().tell(new Status.Failure(rpcException), getSelf());
+		}
 
-			if (rpcMethod.getReturnType().equals(Void.TYPE)) {
-				// No return value to send back
-				try {
+		if (rpcMethod != null) {
+			try {
+				if (rpcMethod.getReturnType().equals(Void.TYPE)) {
+					// No return value to send back
 					rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
-				} catch (Throwable e) {
-					LOG.error("Error while executing remote procedure call {}.", rpcMethod, e);
-				}
-			} else {
-				try {
+				} else {
 					Object result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
 
 					if (result instanceof Future) {
@@ -169,17 +185,12 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
 						// tell the sender the result of the computation
 						getSender().tell(new Status.Success(result), getSelf());
 					}
-				} catch (Throwable e) {
-					// tell the sender about the failure
-					getSender().tell(new Status.Failure(e), getSelf());
 				}
+			} catch (Throwable e) {
+				LOG.error("Error while executing remote procedure call {}.", rpcMethod, e);
+				// tell the sender about the failure
+				getSender().tell(new Status.Failure(e), getSelf());
 			}
-		} catch(ClassNotFoundException e) {
-			LOG.error("Could not load method arguments.", e);
-		} catch (IOException e) {
-			LOG.error("Could not deserialize rpc invocation message.", e);
-		} catch (final NoSuchMethodException e) {
-			LOG.error("Could not find rpc method for rpc invocation: {}.", rpcInvocation, e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6069d6e5/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 5624d12..1e8c9a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -119,10 +119,44 @@ public class AkkaRpcActorTest extends TestLogger {
 		rpcEndpoint.shutDown();
 	}
 
+	/**
+	 * Tests that we receive a RpcConnectionException when calling a rpc method (with return type)
+	 * on a wrong rpc endpoint.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testWrongGatewayEndpointConnection() throws Exception {
+		DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
+
+		rpcEndpoint.start();
+
+		Future<WrongRpcGateway> futureGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), WrongRpcGateway.class);
+
+		WrongRpcGateway gateway = Await.result(futureGateway, timeout.duration());
+
+		// since it is a tell operation we won't receive a RpcConnectionException, it's only logged
+		gateway.tell("foobar");
+
+		Future<Boolean> result = gateway.barfoo();
+
+		try {
+			Await.result(result, timeout.duration());
+			fail("We expected a RpcConnectionException.");
+		} catch (RpcConnectionException rpcConnectionException) {
+			// we expect this exception here
+		}
+	}
+
 	private interface DummyRpcGateway extends RpcGateway {
 		Future<Integer> foobar();
 	}
 
+	private interface WrongRpcGateway extends RpcGateway {
+		Future<Boolean> barfoo();
+		void tell(String message);
+	}
+
 	private static class DummyRpcEndpoint extends RpcEndpoint<DummyRpcGateway> {
 
 		private volatile int _foobar = 42;