You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/05 01:59:56 UTC
[07/10] flink git commit: [FLINK-5239] [distributed coordination] RPC
service properly unpacks 'InvocationTargetExceptions'
[FLINK-5239] [distributed coordination] RPC service properly unpacks 'InvocationTargetExceptions'
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/887cbb90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/887cbb90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/887cbb90
Branch: refs/heads/flip-6
Commit: 887cbb9095af92e4788c06ba0307cc9db5c5b948
Parents: 44fc46d
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 2 18:49:21 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100
----------------------------------------------------------------------
.../flink/runtime/rpc/akka/AkkaRpcActor.java | 16 +++-
.../runtime/rpc/akka/AkkaRpcActorTest.java | 89 +++++++++++++++++++-
2 files changed, 102 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/887cbb90/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index fe6b23b..264ba96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@@ -180,8 +181,19 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
if (rpcMethod.getReturnType().equals(Void.TYPE)) {
// No return value to send back
rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
- } else {
- Object result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
+ }
+ else {
+ final Object result;
+ try {
+ result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
+ }
+ catch (InvocationTargetException e) {
+ LOG.trace("Reporting back error thrown in remote procedure {}", rpcMethod, e);
+
+ // tell the sender about the failure
+ getSender().tell(new Status.Failure(e.getTargetException()), getSelf());
+ return;
+ }
if (result instanceof Future) {
final Future<?> future = (Future<?>) result;
http://git-wip-us.apache.org/repos/asf/flink/blob/887cbb90/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 760e1a7..c73240c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -22,6 +22,7 @@ import akka.actor.ActorSystem;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.util.TestLogger;
+
import org.hamcrest.core.Is;
import org.junit.AfterClass;
import org.junit.Test;
@@ -86,7 +88,7 @@ public class AkkaRpcActorTest extends TestLogger {
Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect("foobar", DummyRpcGateway.class);
try {
- DummyRpcGateway gateway = futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
+ futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
fail("The rpc connection resolution should have failed.");
} catch (ExecutionException exception) {
@@ -192,6 +194,48 @@ public class AkkaRpcActorTest extends TestLogger {
terminationFuture.get();
}
+ @Test
+ public void testExceptionPropagation() throws Exception {
+ ExceptionalEndpoint rpcEndpoint = new ExceptionalEndpoint(akkaRpcService);
+ rpcEndpoint.start();
+
+ ExceptionalGateway rpcGateway = rpcEndpoint.getSelf();
+ Future<Integer> result = rpcGateway.doStuff();
+
+ try {
+ result.get(timeout.getSize(), timeout.getUnit());
+ fail("this should fail with an exception");
+ }
+ catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertEquals(RuntimeException.class, cause.getClass());
+ assertEquals("my super specific test exception", cause.getMessage());
+ }
+ }
+
+ @Test
+ public void testExceptionPropagationFuturePiping() throws Exception {
+ ExceptionalFutureEndpoint rpcEndpoint = new ExceptionalFutureEndpoint(akkaRpcService);
+ rpcEndpoint.start();
+
+ ExceptionalGateway rpcGateway = rpcEndpoint.getSelf();
+ Future<Integer> result = rpcGateway.doStuff();
+
+ try {
+ result.get(timeout.getSize(), timeout.getUnit());
+ fail("this should fail with an exception");
+ }
+ catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertEquals(Exception.class, cause.getClass());
+ assertEquals("some test", cause.getMessage());
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Test Actors and Interfaces
+ // ------------------------------------------------------------------------
+
private interface DummyRpcGateway extends RpcGateway {
Future<Integer> foobar();
}
@@ -218,4 +262,47 @@ public class AkkaRpcActorTest extends TestLogger {
_foobar = value;
}
}
+
+ // ------------------------------------------------------------------------
+
+ private interface ExceptionalGateway extends RpcGateway {
+ Future<Integer> doStuff();
+ }
+
+ private static class ExceptionalEndpoint extends RpcEndpoint<ExceptionalGateway> {
+
+ protected ExceptionalEndpoint(RpcService rpcService) {
+ super(rpcService);
+ }
+
+ @RpcMethod
+ public int doStuff() {
+ throw new RuntimeException("my super specific test exception");
+ }
+ }
+
+ private static class ExceptionalFutureEndpoint extends RpcEndpoint<ExceptionalGateway> {
+
+ protected ExceptionalFutureEndpoint(RpcService rpcService) {
+ super(rpcService);
+ }
+
+ @RpcMethod
+ public Future<Integer> doStuff() {
+ final FlinkCompletableFuture<Integer> future = new FlinkCompletableFuture<>();
+
+ // complete the future slightly in the, well, future...
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ignored) {}
+ future.completeExceptionally(new Exception("some test"));
+ }
+ }.start();
+
+ return future;
+ }
+ }
}