You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/06 11:48:58 UTC
[38/50] [abbrv] flink git commit: [FLINK-4580] [rpc] Report rpc
invocation exceptions to the caller
[FLINK-4580] [rpc] Report rpc invocation exceptions to the caller
This closes #2526.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6069d6e5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6069d6e5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6069d6e5
Branch: refs/heads/flip-6
Commit: 6069d6e55a7a5412a23a28683e94beda0c2e2cd7
Parents: 474ace7
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Sep 21 15:18:27 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 6 13:38:44 2016 +0200
----------------------------------------------------------------------
.../flink/runtime/rpc/akka/AkkaRpcActor.java | 53 ++++++++++++--------
.../runtime/rpc/akka/AkkaRpcActorTest.java | 34 +++++++++++++
2 files changed, 66 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6069d6e5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 59daa46..1b456a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.rpc.akka.messages.Processing;
import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
+import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,11 +87,11 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
unstashAll();
getContext().become(new Procedure<Object>() {
@Override
- public void apply(Object message) throws Exception {
- if (message.equals(Processing.STOP)) {
+ public void apply(Object msg) throws Exception {
+ if (msg.equals(Processing.STOP)) {
getContext().unbecome();
} else {
- handleMessage(message);
+ handleMessage(msg);
}
}
});
@@ -130,21 +131,36 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
* @param rpcInvocation Rpc invocation message
*/
private void handleRpcInvocation(RpcInvocation rpcInvocation) {
+ Method rpcMethod = null;
+
try {
String methodName = rpcInvocation.getMethodName();
Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();
- Method rpcMethod = lookupRpcMethod(methodName, parameterTypes);
+ rpcMethod = lookupRpcMethod(methodName, parameterTypes);
+ } catch(ClassNotFoundException e) {
+ LOG.error("Could not load method arguments.", e);
+
+ RpcConnectionException rpcException = new RpcConnectionException("Could not load method arguments.", e);
+ getSender().tell(new Status.Failure(rpcException), getSelf());
+ } catch (IOException e) {
+ LOG.error("Could not deserialize rpc invocation message.", e);
+
+ RpcConnectionException rpcException = new RpcConnectionException("Could not deserialize rpc invocation message.", e);
+ getSender().tell(new Status.Failure(rpcException), getSelf());
+ } catch (final NoSuchMethodException e) {
+ LOG.error("Could not find rpc method for rpc invocation.", e);
+
+ RpcConnectionException rpcException = new RpcConnectionException("Could not find rpc method for rpc invocation.", e);
+ getSender().tell(new Status.Failure(rpcException), getSelf());
+ }
- if (rpcMethod.getReturnType().equals(Void.TYPE)) {
- // No return value to send back
- try {
+ if (rpcMethod != null) {
+ try {
+ if (rpcMethod.getReturnType().equals(Void.TYPE)) {
+ // No return value to send back
rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
- } catch (Throwable e) {
- LOG.error("Error while executing remote procedure call {}.", rpcMethod, e);
- }
- } else {
- try {
+ } else {
Object result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
if (result instanceof Future) {
@@ -169,17 +185,12 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
// tell the sender the result of the computation
getSender().tell(new Status.Success(result), getSelf());
}
- } catch (Throwable e) {
- // tell the sender about the failure
- getSender().tell(new Status.Failure(e), getSelf());
}
+ } catch (Throwable e) {
+ LOG.error("Error while executing remote procedure call {}.", rpcMethod, e);
+ // tell the sender about the failure
+ getSender().tell(new Status.Failure(e), getSelf());
}
- } catch(ClassNotFoundException e) {
- LOG.error("Could not load method arguments.", e);
- } catch (IOException e) {
- LOG.error("Could not deserialize rpc invocation message.", e);
- } catch (final NoSuchMethodException e) {
- LOG.error("Could not find rpc method for rpc invocation: {}.", rpcInvocation, e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6069d6e5/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 5624d12..1e8c9a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -119,10 +119,44 @@ public class AkkaRpcActorTest extends TestLogger {
rpcEndpoint.shutDown();
}
+ /**
+ * Tests that we receive a RpcConnectionException when calling a rpc method (with return type)
+ * on a wrong rpc endpoint.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testWrongGatewayEndpointConnection() throws Exception {
+ DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
+
+ rpcEndpoint.start();
+
+ Future<WrongRpcGateway> futureGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), WrongRpcGateway.class);
+
+ WrongRpcGateway gateway = Await.result(futureGateway, timeout.duration());
+
+ // since it is a tell operation we won't receive a RpcConnectionException, it's only logged
+ gateway.tell("foobar");
+
+ Future<Boolean> result = gateway.barfoo();
+
+ try {
+ Await.result(result, timeout.duration());
+ fail("We expected a RpcConnectionException.");
+ } catch (RpcConnectionException rpcConnectionException) {
+ // we expect this exception here
+ }
+ }
+
private interface DummyRpcGateway extends RpcGateway {
Future<Integer> foobar();
}
+ private interface WrongRpcGateway extends RpcGateway {
+ Future<Boolean> barfoo();
+ void tell(String message);
+ }
+
private static class DummyRpcEndpoint extends RpcEndpoint<DummyRpcGateway> {
private volatile int _foobar = 42;