You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/05 13:42:00 UTC

[flink] 03/05: [FLINK-24706][rpc] Forward deserialization errors to returned future

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4651be9011ff6661d4de9347268a80c8317f11d9
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Oct 29 15:34:53 2021 +0200

    [FLINK-24706][rpc] Forward deserialization errors to returned future
---
 .../runtime/rpc/akka/AkkaInvocationHandler.java    | 13 ++--
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java   | 76 ++++++++++++++++++++++
 2 files changed, 85 insertions(+), 4 deletions(-)

diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index b4fd6ab..db73771 100644
--- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -237,7 +237,10 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
             final Throwable callStackCapture = captureAskCallStack ? new Throwable() : null;
 
             // execute an asynchronous call
-            final CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
+            final CompletableFuture<?> resultFuture =
+                    ask(rpcInvocation, futureTimeout)
+                            .thenApply(
+                                    resultValue -> deserializeValueIfNeeded(resultValue, method));
 
             final CompletableFuture<Object> completableFuture = new CompletableFuture<>();
             resultFuture.whenComplete(
@@ -245,10 +248,12 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
                         if (failure != null) {
                             completableFuture.completeExceptionally(
                                     resolveTimeoutException(
-                                            failure, callStackCapture, address, rpcInvocation));
+                                            ExceptionUtils.stripCompletionException(failure),
+                                            callStackCapture,
+                                            address,
+                                            rpcInvocation));
                         } else {
-                            completableFuture.complete(
-                                    deserializeValueIfNeeded(resultValue, method));
+                            completableFuture.complete(resultValue);
                         }
                     });
 
diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index e99b571..29f973e 100644
--- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
 import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
+import org.apache.flink.runtime.rpc.exceptions.RpcException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -53,6 +54,8 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
 import java.io.UncheckedIOException;
 import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
@@ -237,6 +240,53 @@ public class AkkaRpcActorTest extends TestLogger {
         }
     }
 
+    /**
+     * Tests that the AkkaInvocationHandler properly fails the returned future if the response
+     * cannot be deserialized.
+     */
+    @Test
+    public void testResultFutureFailsOnDeserializationError() throws Exception {
+        // setup 2 actor systems and rpc services that support remote connections (for which RPCs go
+        // through serialization)
+        final AkkaRpcService serverAkkaRpcService =
+                new AkkaRpcService(
+                        AkkaUtils.createActorSystem(
+                                "serverActorSystem",
+                                AkkaUtils.getAkkaConfig(
+                                        new Configuration(), new HostAndPort("localhost", 0))),
+                        AkkaRpcServiceConfiguration.defaultConfiguration());
+
+        final AkkaRpcService clientAkkaRpcService =
+                new AkkaRpcService(
+                        AkkaUtils.createActorSystem(
+                                "clientActorSystem",
+                                AkkaUtils.getAkkaConfig(
+                                        new Configuration(), new HostAndPort("localhost", 0))),
+                        AkkaRpcServiceConfiguration.defaultConfiguration());
+
+        try {
+            final DeserializatonFailingEndpoint rpcEndpoint =
+                    new DeserializatonFailingEndpoint(serverAkkaRpcService);
+            rpcEndpoint.start();
+
+            final DeserializatonFailingGateway rpcGateway =
+                    rpcEndpoint.getSelfGateway(DeserializatonFailingGateway.class);
+
+            final DeserializatonFailingGateway connect =
+                    clientAkkaRpcService
+                            .connect(rpcGateway.getAddress(), DeserializatonFailingGateway.class)
+                            .get();
+
+            assertThat(
+                    connect.doStuff(),
+                    FlinkMatchers.futureWillCompleteExceptionally(
+                            RpcException.class, Duration.ofHours(1)));
+        } finally {
+            RpcUtils.terminateRpcService(clientAkkaRpcService, timeout);
+            RpcUtils.terminateRpcService(serverAkkaRpcService, timeout);
+        }
+    }
+
     /** Tests that exception thrown in the onStop method are returned by the termination future. */
     @Test
     public void testOnStopExceptionPropagation() throws Exception {
@@ -748,6 +798,32 @@ public class AkkaRpcActorTest extends TestLogger {
 
     // ------------------------------------------------------------------------
 
+    private interface DeserializatonFailingGateway extends RpcGateway {
+        CompletableFuture<DeserializationFailingObject> doStuff();
+    }
+
+    private static class DeserializatonFailingEndpoint extends RpcEndpoint
+            implements DeserializatonFailingGateway {
+
+        protected DeserializatonFailingEndpoint(RpcService rpcService) {
+            super(rpcService);
+        }
+
+        @Override
+        public CompletableFuture<DeserializationFailingObject> doStuff() {
+            return CompletableFuture.completedFuture(new DeserializationFailingObject());
+        }
+    }
+
+    private static class DeserializationFailingObject implements Serializable {
+        private void readObject(ObjectInputStream aInputStream)
+                throws ClassNotFoundException, IOException {
+            throw new ClassNotFoundException("test exception");
+        }
+    }
+
+    // ------------------------------------------------------------------------
+
     private static class SimpleRpcEndpoint extends RpcEndpoint implements RpcGateway {
 
         protected SimpleRpcEndpoint(RpcService rpcService, String endpointId) {