You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:23:07 UTC

[48/52] [abbrv] flink git commit: [FLINK-5239] [distributed coordination] RPC service properly unpacks 'InvocationTargetExceptions'

[FLINK-5239] [distributed coordination] RPC service properly unpacks 'InvocationTargetExceptions'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95c68299
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95c68299
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95c68299

Branch: refs/heads/master
Commit: 95c68299a6e2ca478e100602ad2c24b05059ad21
Parents: 10f7e86
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 2 18:49:21 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:27 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    | 16 +++-
 .../runtime/rpc/akka/AkkaRpcActorTest.java      | 89 +++++++++++++++++++-
 2 files changed, 102 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/95c68299/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index fe6b23b..264ba96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
@@ -180,8 +181,19 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
 				if (rpcMethod.getReturnType().equals(Void.TYPE)) {
 					// No return value to send back
 					rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
-				} else {
-					Object result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
+				}
+				else {
+					final Object result;
+					try {
+						result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
+					}
+					catch (InvocationTargetException e) {
+						LOG.trace("Reporting back error thrown in remote procedure {}", rpcMethod, e);
+
+						// tell the sender about the failure
+						getSender().tell(new Status.Failure(e.getTargetException()), getSelf());
+						return;
+					}
 
 					if (result instanceof Future) {
 						final Future<?> future = (Future<?>) result;

http://git-wip-us.apache.org/repos/asf/flink/blob/95c68299/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 760e1a7..c73240c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -22,6 +22,7 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.util.TestLogger;
+
 import org.hamcrest.core.Is;
 import org.junit.AfterClass;
 import org.junit.Test;
@@ -86,7 +88,7 @@ public class AkkaRpcActorTest extends TestLogger {
 		Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect("foobar", DummyRpcGateway.class);
 
 		try {
-			DummyRpcGateway gateway = futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
+			futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
 
 			fail("The rpc connection resolution should have failed.");
 		} catch (ExecutionException exception) {
@@ -192,6 +194,48 @@ public class AkkaRpcActorTest extends TestLogger {
 		terminationFuture.get();
 	}
 
+	@Test
+	public void testExceptionPropagation() throws Exception {
+		ExceptionalEndpoint rpcEndpoint = new ExceptionalEndpoint(akkaRpcService);
+		rpcEndpoint.start();
+
+		ExceptionalGateway rpcGateway = rpcEndpoint.getSelf();
+		Future<Integer> result = rpcGateway.doStuff();
+
+		try {
+			result.get(timeout.getSize(), timeout.getUnit());
+			fail("this should fail with an exception");
+		}
+		catch (ExecutionException e) {
+			Throwable cause = e.getCause();
+			assertEquals(RuntimeException.class, cause.getClass());
+			assertEquals("my super specific test exception", cause.getMessage());
+		}
+	}
+
+	@Test
+	public void testExceptionPropagationFuturePiping() throws Exception {
+		ExceptionalFutureEndpoint rpcEndpoint = new ExceptionalFutureEndpoint(akkaRpcService);
+		rpcEndpoint.start();
+
+		ExceptionalGateway rpcGateway = rpcEndpoint.getSelf();
+		Future<Integer> result = rpcGateway.doStuff();
+
+		try {
+			result.get(timeout.getSize(), timeout.getUnit());
+			fail("this should fail with an exception");
+		}
+		catch (ExecutionException e) {
+			Throwable cause = e.getCause();
+			assertEquals(Exception.class, cause.getClass());
+			assertEquals("some test", cause.getMessage());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Test Actors and Interfaces
+	// ------------------------------------------------------------------------
+
 	private interface DummyRpcGateway extends RpcGateway {
 		Future<Integer> foobar();
 	}
@@ -218,4 +262,47 @@ public class AkkaRpcActorTest extends TestLogger {
 			_foobar = value;
 		}
 	}
+
+	// ------------------------------------------------------------------------
+
+	private interface ExceptionalGateway extends RpcGateway {
+		Future<Integer> doStuff();
+	}
+
+	private static class ExceptionalEndpoint extends RpcEndpoint<ExceptionalGateway> {
+
+		protected ExceptionalEndpoint(RpcService rpcService) {
+			super(rpcService);
+		}
+
+		@RpcMethod
+		public int doStuff() {
+			throw new RuntimeException("my super specific test exception");
+		}
+	}
+
+	private static class ExceptionalFutureEndpoint extends RpcEndpoint<ExceptionalGateway> {
+
+		protected ExceptionalFutureEndpoint(RpcService rpcService) {
+			super(rpcService);
+		}
+
+		@RpcMethod
+		public Future<Integer> doStuff() {
+			final FlinkCompletableFuture<Integer> future = new FlinkCompletableFuture<>();
+
+			// complete the future slightly in the, well, future...
+			new Thread() {
+				@Override
+				public void run() {
+					try {
+						Thread.sleep(10);
+					} catch (InterruptedException ignored) {}
+					future.completeExceptionally(new Exception("some test"));
+				}
+			}.start();
+
+			return future;
+		}
+	}
 }