You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/05 13:42:00 UTC
[flink] 03/05: [FLINK-24706][rpc] Forward deserialization errors to
returned future
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4651be9011ff6661d4de9347268a80c8317f11d9
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Oct 29 15:34:53 2021 +0200
[FLINK-24706][rpc] Forward deserialization errors to returned future
---
.../runtime/rpc/akka/AkkaInvocationHandler.java | 13 ++--
.../flink/runtime/rpc/akka/AkkaRpcActorTest.java | 76 ++++++++++++++++++++++
2 files changed, 85 insertions(+), 4 deletions(-)
diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index b4fd6ab..db73771 100644
--- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -237,7 +237,10 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
final Throwable callStackCapture = captureAskCallStack ? new Throwable() : null;
// execute an asynchronous call
- final CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
+ final CompletableFuture<?> resultFuture =
+ ask(rpcInvocation, futureTimeout)
+ .thenApply(
+ resultValue -> deserializeValueIfNeeded(resultValue, method));
final CompletableFuture<Object> completableFuture = new CompletableFuture<>();
resultFuture.whenComplete(
@@ -245,10 +248,12 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
if (failure != null) {
completableFuture.completeExceptionally(
resolveTimeoutException(
- failure, callStackCapture, address, rpcInvocation));
+ ExceptionUtils.stripCompletionException(failure),
+ callStackCapture,
+ address,
+ rpcInvocation));
} else {
- completableFuture.complete(
- deserializeValueIfNeeded(resultValue, method));
+ completableFuture.complete(resultValue);
}
});
diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index e99b571..29f973e 100644
--- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
+import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
@@ -53,6 +54,8 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
@@ -237,6 +240,53 @@ public class AkkaRpcActorTest extends TestLogger {
}
}
+ /**
+ * Tests that the AkkaInvocationHandler properly fails the returned future if the response
+ * cannot be deserialized.
+ */
+ @Test
+ public void testResultFutureFailsOnDeserializationError() throws Exception {
+ // setup 2 actor systems and rpc services that support remote connections (for which RPCs go
+ // through serialization)
+ final AkkaRpcService serverAkkaRpcService =
+ new AkkaRpcService(
+ AkkaUtils.createActorSystem(
+ "serverActorSystem",
+ AkkaUtils.getAkkaConfig(
+ new Configuration(), new HostAndPort("localhost", 0))),
+ AkkaRpcServiceConfiguration.defaultConfiguration());
+
+ final AkkaRpcService clientAkkaRpcService =
+ new AkkaRpcService(
+ AkkaUtils.createActorSystem(
+ "clientActorSystem",
+ AkkaUtils.getAkkaConfig(
+ new Configuration(), new HostAndPort("localhost", 0))),
+ AkkaRpcServiceConfiguration.defaultConfiguration());
+
+ try {
+ final DeserializatonFailingEndpoint rpcEndpoint =
+ new DeserializatonFailingEndpoint(serverAkkaRpcService);
+ rpcEndpoint.start();
+
+ final DeserializatonFailingGateway rpcGateway =
+ rpcEndpoint.getSelfGateway(DeserializatonFailingGateway.class);
+
+ final DeserializatonFailingGateway connect =
+ clientAkkaRpcService
+ .connect(rpcGateway.getAddress(), DeserializatonFailingGateway.class)
+ .get();
+
+ assertThat(
+ connect.doStuff(),
+ FlinkMatchers.futureWillCompleteExceptionally(
+ RpcException.class, Duration.ofHours(1)));
+ } finally {
+ RpcUtils.terminateRpcService(clientAkkaRpcService, timeout);
+ RpcUtils.terminateRpcService(serverAkkaRpcService, timeout);
+ }
+ }
+
/** Tests that exception thrown in the onStop method are returned by the termination future. */
@Test
public void testOnStopExceptionPropagation() throws Exception {
@@ -748,6 +798,32 @@ public class AkkaRpcActorTest extends TestLogger {
// ------------------------------------------------------------------------
+ private interface DeserializatonFailingGateway extends RpcGateway {
+ CompletableFuture<DeserializationFailingObject> doStuff();
+ }
+
+ private static class DeserializatonFailingEndpoint extends RpcEndpoint
+ implements DeserializatonFailingGateway {
+
+ protected DeserializatonFailingEndpoint(RpcService rpcService) {
+ super(rpcService);
+ }
+
+ @Override
+ public CompletableFuture<DeserializationFailingObject> doStuff() {
+ return CompletableFuture.completedFuture(new DeserializationFailingObject());
+ }
+ }
+
+ private static class DeserializationFailingObject implements Serializable {
+ private void readObject(ObjectInputStream aInputStream)
+ throws ClassNotFoundException, IOException {
+ throw new ClassNotFoundException("test exception");
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
private static class SimpleRpcEndpoint extends RpcEndpoint implements RpcGateway {
protected SimpleRpcEndpoint(RpcService rpcService, String endpointId) {