You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/09/13 07:10:37 UTC

[flink] branch master updated (fa199c1b514 -> dbae7aefb21)

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from fa199c1b514 Revert "[FLINK-19869][connectors/jdbc] Add support for postgres UUID type"
     new 4e99d64b491 [FLINK-29253][runtime] Removes synchronous close call from DefaultJobManagerRunnerRegistry#localCleanupAsync
     new dbae7aefb21 [hotfix][tests] Cleaned up JDK/assertj plugin warnings

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../DefaultJobManagerRunnerRegistry.java           |  6 +---
 .../DefaultJobManagerRunnerRegistryTest.java       | 42 ++++++++++++++--------
 2 files changed, 29 insertions(+), 19 deletions(-)


[flink] 01/02: [FLINK-29253][runtime] Removes synchronous close call from DefaultJobManagerRunnerRegistry#localCleanupAsync

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4e99d64b4914ca5fbb1fe338455a8f653c164172
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Mon Sep 12 11:35:44 2022 +0200

    [FLINK-29253][runtime] Removes synchronous close call from DefaultJobManagerRunnerRegistry#localCleanupAsync
    
    localCleanupAsync is meant to be called from the Dispatcher's main thread. Any blocking
    calls should be avoided here. Instead, we could use closeAsync which is implemented by
    JobManagerRunner.
    
    Two additional tests needed to be touched because the previously used
    AutoCloseableAsync#close method added another FlinKException to the Stacktrace.
---
 .../DefaultJobManagerRunnerRegistry.java           |  6 +---
 .../DefaultJobManagerRunnerRegistryTest.java       | 32 ++++++++++++++++------
 2 files changed, 24 insertions(+), 14 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java
index 16ab2cf985c..5c6cdfc3abb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java
@@ -85,11 +85,7 @@ public class DefaultJobManagerRunnerRegistry implements JobManagerRunnerRegistry
     @Override
     public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor unusedExecutor) {
         if (isRegistered(jobId)) {
-            try {
-                unregister(jobId).close();
-            } catch (Exception e) {
-                return FutureUtils.completedExceptionally(e);
-            }
+            return unregister(jobId).closeAsync();
         }
 
         return FutureUtils.completedVoidFuture();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
index 1d4cec95640..cce8ba21357 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.runtime.concurrent.UnsupportedOperationExecutor;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.concurrent.Executors;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -161,10 +161,7 @@ public class DefaultJobManagerRunnerRegistryTest {
                 .failsWithin(Duration.ZERO)
                 .withThrowableOfType(ExecutionException.class)
                 .extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE)
-                .hasExactlyElementsOfTypes(
-                        ExecutionException.class,
-                        FlinkException.class,
-                        expectedException.getClass())
+                .hasExactlyElementsOfTypes(ExecutionException.class, expectedException.getClass())
                 .last()
                 .isEqualTo(expectedException);
         assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
@@ -200,14 +197,31 @@ public class DefaultJobManagerRunnerRegistryTest {
                 .failsWithin(Duration.ZERO)
                 .withThrowableOfType(ExecutionException.class)
                 .extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE)
-                .hasExactlyElementsOfTypes(
-                        ExecutionException.class,
-                        FlinkException.class,
-                        expectedException.getClass())
+                .hasExactlyElementsOfTypes(ExecutionException.class, expectedException.getClass())
                 .last()
                 .isEqualTo(expectedException);
     }
 
+    @Test
+    public void testLocalCleanupAsyncNonBlocking() {
+        final TestingJobManagerRunner jobManagerRunner =
+                TestingJobManagerRunner.newBuilder().setBlockingTermination(true).build();
+        testInstance.register(jobManagerRunner);
+
+        // this call shouldn't block
+        final CompletableFuture<Void> cleanupFuture =
+                testInstance.localCleanupAsync(
+                        jobManagerRunner.getJobID(), UnsupportedOperationExecutor.INSTANCE);
+
+        assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+        assertThat(jobManagerRunner.getTerminationFuture()).isNotCompleted();
+        assertThat(cleanupFuture).isNotCompleted();
+
+        jobManagerRunner.getTerminationFuture().complete(null);
+
+        assertThat(cleanupFuture).isCompleted();
+    }
+
     private TestingJobManagerRunner registerTestingJobManagerRunner() {
         final TestingJobManagerRunner jobManagerRunner =
                 TestingJobManagerRunner.newBuilder().build();


[flink] 02/02: [hotfix][tests] Cleaned up JDK/assertj plugin warnings

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dbae7aefb2111a4805f9aecbbd3ba2422e353531
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Mon Sep 12 11:37:07 2022 +0200

    [hotfix][tests] Cleaned up JDK/assertj plugin warnings
---
 .../dispatcher/DefaultJobManagerRunnerRegistryTest.java        | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
index cce8ba21357..be48235d7e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java
@@ -101,9 +101,9 @@ public class DefaultJobManagerRunnerRegistryTest {
 
     @Test
     public void size() {
-        assertThat(testInstance.size()).isEqualTo(0);
+        assertThat(testInstance.size()).isZero();
         testInstance.register(TestingJobManagerRunner.newBuilder().build());
-        assertThat(testInstance.size()).isEqualTo(1);
+        assertThat(testInstance.size()).isOne();
         testInstance.register(TestingJobManagerRunner.newBuilder().build());
         assertThat(testInstance.size()).isEqualTo(2);
     }
@@ -134,7 +134,7 @@ public class DefaultJobManagerRunnerRegistryTest {
     }
 
     @Test
-    public void testSuccessfulLocalCleanup() throws Throwable {
+    public void testSuccessfulLocalCleanup() {
         final TestingJobManagerRunner jobManagerRunner = registerTestingJobManagerRunner();
 
         assertThat(
@@ -168,7 +168,7 @@ public class DefaultJobManagerRunnerRegistryTest {
     }
 
     @Test
-    public void testSuccessfulLocalCleanupAsync() throws Exception {
+    public void testSuccessfulLocalCleanupAsync() {
         final TestingJobManagerRunner jobManagerRunner = registerTestingJobManagerRunner();
 
         final CompletableFuture<Void> cleanupResult =
@@ -179,7 +179,7 @@ public class DefaultJobManagerRunnerRegistryTest {
     }
 
     @Test
-    public void testFailingLocalCleanupAsync() throws Exception {
+    public void testFailingLocalCleanupAsync() {
         final TestingJobManagerRunner jobManagerRunner = registerTestingJobManagerRunner();
 
         assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isTrue();