You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sn...@apache.org on 2023/07/04 16:43:33 UTC

[flink] branch release-1.16 updated (85e11f7a34f -> e5049a09900)

This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a change to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 85e11f7a34f [hotfix] Add missing release note on binding to localhost by default since Flink 1.15
     new 227fe29d9ad [hotfix] Simplify ActorSystem construction
     new e5049a09900 [FLINK-32314][rpc] Ignore classloading errors after actorsystem shutdown

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../TestingUncaughtExceptionHandler.java           |  5 +++
 .../flink/runtime/rpc/akka/RobustActorSystem.java  | 42 ++++++++++++++++++++--
 .../runtime/rpc/akka/RobustActorSystemTest.java    | 28 +++++++++++++++
 3 files changed, 72 insertions(+), 3 deletions(-)


[flink] 02/02: [FLINK-32314][rpc] Ignore classloading errors after actorsystem shutdown

Posted by sn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e5049a09900bacc247ee800f3a2fb1b2b77ab609
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Sat Jun 10 10:31:38 2023 +0200

    [FLINK-32314][rpc] Ignore classloading errors after actorsystem shutdown
---
 .../TestingUncaughtExceptionHandler.java           |  5 +++
 .../flink/runtime/rpc/akka/RobustActorSystem.java  | 38 +++++++++++++++++++++-
 .../runtime/rpc/akka/RobustActorSystemTest.java    | 28 ++++++++++++++++
 3 files changed, 70 insertions(+), 1 deletion(-)

diff --git a/flink-core/src/test/java/org/apache/flink/util/concurrent/TestingUncaughtExceptionHandler.java b/flink-core/src/test/java/org/apache/flink/util/concurrent/TestingUncaughtExceptionHandler.java
index 590260fb93c..6c5c95a00c4 100644
--- a/flink-core/src/test/java/org/apache/flink/util/concurrent/TestingUncaughtExceptionHandler.java
+++ b/flink-core/src/test/java/org/apache/flink/util/concurrent/TestingUncaughtExceptionHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.util.concurrent;
 
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -36,4 +37,8 @@ public class TestingUncaughtExceptionHandler implements Thread.UncaughtException
     public Throwable waitForUncaughtException() {
         return uncaughtExceptionFuture.join();
     }
+
+    public Optional<Throwable> findUncaughtExceptionNow() {
+        return Optional.ofNullable(uncaughtExceptionFuture.getNow(null));
+    }
 }
diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/RobustActorSystem.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/RobustActorSystem.java
index 17b41b22d22..c4a547127d3 100644
--- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/RobustActorSystem.java
+++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/RobustActorSystem.java
@@ -27,6 +27,7 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import scala.Option;
 import scala.concurrent.ExecutionContext;
@@ -85,17 +86,52 @@ public abstract class RobustActorSystem extends ActorSystemImpl {
                                 .map(BootstrapSetup::defaultExecutionContext)
                                 .flatMap(RobustActorSystem::toJavaOptional));
 
+        final PostShutdownClassLoadingErrorFilter postShutdownClassLoadingErrorFilter =
+                new PostShutdownClassLoadingErrorFilter(uncaughtExceptionHandler);
+
         final RobustActorSystem robustActorSystem =
                 new RobustActorSystem(name, appConfig, classLoader, defaultEC, setup) {
                     @Override
                     public Thread.UncaughtExceptionHandler uncaughtExceptionHandler() {
-                        return uncaughtExceptionHandler;
+                        return postShutdownClassLoadingErrorFilter;
                     }
                 };
+        robustActorSystem.registerOnTermination(
+                postShutdownClassLoadingErrorFilter::notifyShutdownComplete);
+
         robustActorSystem.start();
         return robustActorSystem;
     }
 
+    private static class PostShutdownClassLoadingErrorFilter
+            implements Thread.UncaughtExceptionHandler {
+
+        private final AtomicBoolean shutdownComplete = new AtomicBoolean();
+        private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
+
+        public PostShutdownClassLoadingErrorFilter(
+                Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+            this.uncaughtExceptionHandler = uncaughtExceptionHandler;
+        }
+
+        public void notifyShutdownComplete() {
+            shutdownComplete.set(true);
+        }
+
+        @Override
+        public void uncaughtException(Thread t, Throwable e) {
+            if (shutdownComplete.get()
+                    && (e instanceof NoClassDefFoundError || e instanceof ClassNotFoundException)) {
+                // ignore classloading errors after the actor system terminated
+                // some parts of the akka shutdown procedure are not tied to the actor
+                // system termination future, and can occasionally fail if the rpc
+                // classloader has been closed.
+                return;
+            }
+            uncaughtExceptionHandler.uncaughtException(t, e);
+        }
+    }
+
     private static <T> Optional<T> toJavaOptional(Option<T> option) {
         return Optional.ofNullable(option.getOrElse(() -> null));
     }
diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/RobustActorSystemTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/RobustActorSystemTest.java
index 8a10bd0a79b..8bddf57a34c 100644
--- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/RobustActorSystemTest.java
+++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/RobustActorSystemTest.java
@@ -27,6 +27,8 @@ import akka.japi.pf.ReceiveBuilder;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -84,6 +86,32 @@ class RobustActorSystemTest {
         assertThat(uncaughtException).isSameAs(error);
     }
 
+    @Test
+    void testHonorClassloadingErrorBeforeShutdown() {
+        robustActorSystem
+                .uncaughtExceptionHandler()
+                .uncaughtException(Thread.currentThread(), new NoClassDefFoundError(""));
+
+        assertThat(testingUncaughtExceptionHandler.findUncaughtExceptionNow()).isPresent();
+    }
+
+    @ParameterizedTest
+    @ValueSource(classes = {NoClassDefFoundError.class, ClassNotFoundException.class})
+    void testIgnoreClassloadingErrorAfterShutdown(Class<? extends Throwable> exceptionClass)
+            throws Exception {
+        // wait for termination
+        robustActorSystem.terminate();
+        robustActorSystem.getWhenTerminated().toCompletableFuture().join();
+
+        robustActorSystem
+                .uncaughtExceptionHandler()
+                .uncaughtException(
+                        Thread.currentThread(),
+                        exceptionClass.getDeclaredConstructor(String.class).newInstance(""));
+
+        assertThat(testingUncaughtExceptionHandler.findUncaughtExceptionNow()).isEmpty();
+    }
+
     private static class UncaughtExceptionActor extends AbstractActor {
         private final Error failure;
 


[flink] 01/02: [hotfix] Simplify ActorSystem construction

Posted by sn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 227fe29d9ad5fa6fcc6b78c653516d7a732ecd10
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Sat Jun 10 10:11:56 2023 +0200

    [hotfix] Simplify ActorSystem construction
    
    The option was never empty.
---
 .../java/org/apache/flink/runtime/rpc/akka/RobustActorSystem.java   | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/RobustActorSystem.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/RobustActorSystem.java
index 3658c46cdd7..17b41b22d22 100644
--- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/RobustActorSystem.java
+++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/RobustActorSystem.java
@@ -65,13 +65,13 @@ public abstract class RobustActorSystem extends ActorSystemImpl {
                                 Optional.empty(),
                                 Optional.of(applicationConfig),
                                 Optional.empty())),
-                Option.apply(uncaughtExceptionHandler));
+                uncaughtExceptionHandler);
     }
 
     private static RobustActorSystem create(
             String name,
             ActorSystemSetup setup,
-            Option<Thread.UncaughtExceptionHandler> uncaughtExceptionHandler) {
+            Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
         final Optional<BootstrapSetup> bootstrapSettings = setup.get(BootstrapSetup.class);
         final ClassLoader classLoader = RobustActorSystem.class.getClassLoader();
         final Config appConfig =
@@ -89,7 +89,7 @@ public abstract class RobustActorSystem extends ActorSystemImpl {
                 new RobustActorSystem(name, appConfig, classLoader, defaultEC, setup) {
                     @Override
                     public Thread.UncaughtExceptionHandler uncaughtExceptionHandler() {
-                        return uncaughtExceptionHandler.getOrElse(super::uncaughtExceptionHandler);
+                        return uncaughtExceptionHandler;
                     }
                 };
         robustActorSystem.start();