You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/04 08:25:33 UTC

[flink] branch master updated (664fdae -> e932aad)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 664fdae  [FLINK-24714][table-api-java] Validate partition keys for ResolvedCatalogTable
     new 44e2891  [FLINK-24706][tests] Increase restart delay to 5 seconds
     new 250cc81  [FLINK-24706][coordination] Strip completion exception in HeartbeatManager
     new e932aad  [FLINK-24706][rpc] Forward deserialization errors to returned future

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/rpc/akka/AkkaInvocationHandler.java    | 13 ++--
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java   | 76 ++++++++++++++++++++++
 .../runtime/heartbeat/HeartbeatManagerImpl.java    |  4 +-
 ...skManagerProcessFailureBatchRecoveryITCase.java |  2 +-
 4 files changed, 89 insertions(+), 6 deletions(-)

[flink] 02/03: [FLINK-24706][coordination] Strip completion exception in HeartbeatManager

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 250cc81adbcf1264defa00ef97fe0f26241a6808
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Nov 2 14:05:39 2021 +0100

    [FLINK-24706][coordination] Strip completion exception in HeartbeatManager
---
 .../java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
index 3063581..71e7c2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.heartbeat;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
@@ -244,7 +245,8 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
     protected BiConsumer<Void, Throwable> handleHeartbeatRpc(ResourceID heartbeatTarget) {
         return (unused, failure) -> {
             if (failure != null) {
-                handleHeartbeatRpcFailure(heartbeatTarget, failure);
+                handleHeartbeatRpcFailure(
+                        heartbeatTarget, ExceptionUtils.stripCompletionException(failure));
             } else {
                 handleHeartbeatRpcSuccess(heartbeatTarget);
             }

[flink] 03/03: [FLINK-24706][rpc] Forward deserialization errors to returned future

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e932aad8ff5ac953f1e73193dda22da9f242851e
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Oct 29 15:34:53 2021 +0200

    [FLINK-24706][rpc] Forward deserialization errors to returned future
---
 .../runtime/rpc/akka/AkkaInvocationHandler.java    | 13 ++--
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java   | 76 ++++++++++++++++++++++
 2 files changed, 85 insertions(+), 4 deletions(-)

diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index b4fd6ab..db73771 100644
--- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -237,7 +237,10 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
             final Throwable callStackCapture = captureAskCallStack ? new Throwable() : null;
 
             // execute an asynchronous call
-            final CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
+            final CompletableFuture<?> resultFuture =
+                    ask(rpcInvocation, futureTimeout)
+                            .thenApply(
+                                    resultValue -> deserializeValueIfNeeded(resultValue, method));
 
             final CompletableFuture<Object> completableFuture = new CompletableFuture<>();
             resultFuture.whenComplete(
@@ -245,10 +248,12 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
                         if (failure != null) {
                             completableFuture.completeExceptionally(
                                     resolveTimeoutException(
-                                            failure, callStackCapture, address, rpcInvocation));
+                                            ExceptionUtils.stripCompletionException(failure),
+                                            callStackCapture,
+                                            address,
+                                            rpcInvocation));
                         } else {
-                            completableFuture.complete(
-                                    deserializeValueIfNeeded(resultValue, method));
+                            completableFuture.complete(resultValue);
                         }
                     });
 
diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index e99b571..29f973e 100644
--- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
 import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
+import org.apache.flink.runtime.rpc.exceptions.RpcException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -53,6 +54,8 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
 import java.io.UncheckedIOException;
 import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
@@ -237,6 +240,53 @@ public class AkkaRpcActorTest extends TestLogger {
         }
     }
 
+    /**
+     * Tests that the AkkaInvocationHandler properly fails the returned future if the response
+     * cannot be deserialized.
+     */
+    @Test
+    public void testResultFutureFailsOnDeserializationError() throws Exception {
+        // setup 2 actor systems and rpc services that support remote connections (for which RPCs go
+        // through serialization)
+        final AkkaRpcService serverAkkaRpcService =
+                new AkkaRpcService(
+                        AkkaUtils.createActorSystem(
+                                "serverActorSystem",
+                                AkkaUtils.getAkkaConfig(
+                                        new Configuration(), new HostAndPort("localhost", 0))),
+                        AkkaRpcServiceConfiguration.defaultConfiguration());
+
+        final AkkaRpcService clientAkkaRpcService =
+                new AkkaRpcService(
+                        AkkaUtils.createActorSystem(
+                                "clientActorSystem",
+                                AkkaUtils.getAkkaConfig(
+                                        new Configuration(), new HostAndPort("localhost", 0))),
+                        AkkaRpcServiceConfiguration.defaultConfiguration());
+
+        try {
+            final DeserializatonFailingEndpoint rpcEndpoint =
+                    new DeserializatonFailingEndpoint(serverAkkaRpcService);
+            rpcEndpoint.start();
+
+            final DeserializatonFailingGateway rpcGateway =
+                    rpcEndpoint.getSelfGateway(DeserializatonFailingGateway.class);
+
+            final DeserializatonFailingGateway connect =
+                    clientAkkaRpcService
+                            .connect(rpcGateway.getAddress(), DeserializatonFailingGateway.class)
+                            .get();
+
+            assertThat(
+                    connect.doStuff(),
+                    FlinkMatchers.futureWillCompleteExceptionally(
+                            RpcException.class, Duration.ofHours(1)));
+        } finally {
+            RpcUtils.terminateRpcService(clientAkkaRpcService, timeout);
+            RpcUtils.terminateRpcService(serverAkkaRpcService, timeout);
+        }
+    }
+
     /** Tests that exception thrown in the onStop method are returned by the termination future. */
     @Test
     public void testOnStopExceptionPropagation() throws Exception {
@@ -748,6 +798,32 @@ public class AkkaRpcActorTest extends TestLogger {
 
     // ------------------------------------------------------------------------
 
+    private interface DeserializatonFailingGateway extends RpcGateway {
+        CompletableFuture<DeserializationFailingObject> doStuff();
+    }
+
+    private static class DeserializatonFailingEndpoint extends RpcEndpoint
+            implements DeserializatonFailingGateway {
+
+        protected DeserializatonFailingEndpoint(RpcService rpcService) {
+            super(rpcService);
+        }
+
+        @Override
+        public CompletableFuture<DeserializationFailingObject> doStuff() {
+            return CompletableFuture.completedFuture(new DeserializationFailingObject());
+        }
+    }
+
+    private static class DeserializationFailingObject implements Serializable {
+        private void readObject(ObjectInputStream aInputStream)
+                throws ClassNotFoundException, IOException {
+            throw new ClassNotFoundException("test exception");
+        }
+    }
+
+    // ------------------------------------------------------------------------
+
     private static class SimpleRpcEndpoint extends RpcEndpoint implements RpcGateway {
 
         protected SimpleRpcEndpoint(RpcService rpcService, String endpointId) {

[flink] 01/03: [FLINK-24706][tests] Increase restart delay to 5 seconds

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 44e289100f2294c3e5deeccd4e84b9df4a6656de
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Nov 2 15:19:47 2021 +0100

    [FLINK-24706][tests] Increase restart delay to 5 seconds
---
 .../test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
index 5bbd3fb..aca91ea 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
@@ -67,7 +67,7 @@ public class TaskManagerProcessFailureBatchRecoveryITCase
         ExecutionEnvironment env =
                 ExecutionEnvironment.createRemoteEnvironment("localhost", 1337, configuration);
         env.setParallelism(PARALLELISM);
-        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1500L));
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 5000L));
         env.getConfig().setExecutionMode(executionMode);
 
         final long numElements = 100000L;