You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/09/13 07:10:58 UTC

[flink] branch release-1.16 updated: [FLINK-29253][runtime] Removes synchronous close call from DefaultJobManagerRunnerRegistry#localCleanupAsync

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new ef5941e36c0 [FLINK-29253][runtime] Removes synchronous close call from DefaultJobManagerRunnerRegistry#localCleanupAsync
ef5941e36c0 is described below

commit ef5941e36c06070b49b4557e77ddb6750f2af2af
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Mon Sep 12 11:35:44 2022 +0200

    [FLINK-29253][runtime] Removes synchronous close call from DefaultJobManagerRunnerRegistry#localCleanupAsync
    
    localCleanupAsync is meant to be called from the Dispatcher's main thread. Any blocking
    calls should be avoided here. Instead, we could use closeAsync which is implemented by
    JobManagerRunner.
    
    Two additional tests needed to be touched because the previously used
    AutoCloseableAsync#close method added another FlinKException to the Stacktrace.
---
 .../DefaultJobManagerRunnerRegistry.java           |  6 +---
 .../DefaultJobManagerRunnerRegistryTest.java       | 32 ++++++++++++++++------
 2 files changed, 24 insertions(+), 14 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java
index 16ab2cf985c..5c6cdfc3abb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java
@@ -85,11 +85,7 @@ public class DefaultJobManagerRunnerRegistry implements JobManagerRunnerRegistry
     @Override
     public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor unusedExecutor) {
         if (isRegistered(jobId)) {
-            try {
-                unregister(jobId).close();
-            } catch (Exception e) {
-                return FutureUtils.completedExceptionally(e);
-            }
+            return unregister(jobId).closeAsync();
         }
 
         return FutureUtils.completedVoidFuture();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
index 1d4cec95640..cce8ba21357 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.runtime.concurrent.UnsupportedOperationExecutor;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.concurrent.Executors;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -161,10 +161,7 @@ public class DefaultJobManagerRunnerRegistryTest {
                 .failsWithin(Duration.ZERO)
                 .withThrowableOfType(ExecutionException.class)
                 .extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE)
-                .hasExactlyElementsOfTypes(
-                        ExecutionException.class,
-                        FlinkException.class,
-                        expectedException.getClass())
+                .hasExactlyElementsOfTypes(ExecutionException.class, expectedException.getClass())
                 .last()
                 .isEqualTo(expectedException);
         assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
@@ -200,14 +197,31 @@ public class DefaultJobManagerRunnerRegistryTest {
                 .failsWithin(Duration.ZERO)
                 .withThrowableOfType(ExecutionException.class)
                 .extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE)
-                .hasExactlyElementsOfTypes(
-                        ExecutionException.class,
-                        FlinkException.class,
-                        expectedException.getClass())
+                .hasExactlyElementsOfTypes(ExecutionException.class, expectedException.getClass())
                 .last()
                 .isEqualTo(expectedException);
     }
 
+    @Test
+    public void testLocalCleanupAsyncNonBlocking() {
+        final TestingJobManagerRunner jobManagerRunner =
+                TestingJobManagerRunner.newBuilder().setBlockingTermination(true).build();
+        testInstance.register(jobManagerRunner);
+
+        // this call shouldn't block
+        final CompletableFuture<Void> cleanupFuture =
+                testInstance.localCleanupAsync(
+                        jobManagerRunner.getJobID(), UnsupportedOperationExecutor.INSTANCE);
+
+        assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+        assertThat(jobManagerRunner.getTerminationFuture()).isNotCompleted();
+        assertThat(cleanupFuture).isNotCompleted();
+
+        jobManagerRunner.getTerminationFuture().complete(null);
+
+        assertThat(cleanupFuture).isCompleted();
+    }
+
     private TestingJobManagerRunner registerTestingJobManagerRunner() {
         final TestingJobManagerRunner jobManagerRunner =
                 TestingJobManagerRunner.newBuilder().build();