You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/08/29 15:13:36 UTC

[flink] branch master updated (7ac37c08918 -> d0434e698fc)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 7ac37c08918 [FLINK-29062][build] Fix protobuf plugin proxy issue on flink-protobuf module.
     new aa3c124b536 [FLINK-27030][tests] Prevent race-condition
     new d0434e698fc [hotfix][tests] Minor cleanup

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/flink/runtime/rpc/RpcEndpoint.java  | 17 ++++++++++---
 .../apache/flink/runtime/rpc/RpcEndpointTest.java  | 29 +++++++++++-----------
 2 files changed, 29 insertions(+), 17 deletions(-)


[flink] 01/02: [FLINK-27030][tests] Prevent race-condition

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aa3c124b536adb42d219a57594d399379397bf0a
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Aug 29 13:28:25 2022 +0200

    [FLINK-27030][tests] Prevent race-condition
---
 .../java/org/apache/flink/runtime/rpc/RpcEndpoint.java  | 17 ++++++++++++++---
 .../org/apache/flink/runtime/rpc/RpcEndpointTest.java   | 12 +++++++-----
 2 files changed, 21 insertions(+), 8 deletions(-)

diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 2aa90b7df91..1eb32a0549b 100644
--- a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rpc;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledFutureAdapter;
@@ -470,11 +471,21 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
 
         MainThreadExecutor(
                 MainThreadExecutable gateway, Runnable mainThreadCheck, String endpointId) {
+            this(
+                    gateway,
+                    mainThreadCheck,
+                    Executors.newSingleThreadScheduledExecutor(
+                            new ExecutorThreadFactory(endpointId + "-main-scheduler")));
+        }
+
+        @VisibleForTesting
+        MainThreadExecutor(
+                MainThreadExecutable gateway,
+                Runnable mainThreadCheck,
+                ScheduledExecutorService mainScheduledExecutor) {
             this.gateway = Preconditions.checkNotNull(gateway);
             this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);
-            this.mainScheduledExecutor =
-                    Executors.newSingleThreadScheduledExecutor(
-                            new ExecutorThreadFactory(endpointId + "-main-scheduler"));
+            this.mainScheduledExecutor = mainScheduledExecutor;
         }
 
         @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
index 20e45ab92b8..07216016a92 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.util.TestLoggerExtension;
 
 import org.junit.jupiter.api.AfterAll;
@@ -44,7 +45,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
-import static org.junit.jupiter.api.Assumptions.assumeTrue;
 
 /** Tests for the RpcEndpoint, its self gateways and MainThreadExecutor scheduling command. */
 @ExtendWith(TestLoggerExtension.class)
@@ -412,20 +412,22 @@ public class RpcEndpointTest {
     private static void testCancelScheduledTask(
             BiFunction<RpcEndpoint.MainThreadExecutor, CompletableFuture<Long>, ScheduledFuture<?>>
                     scheduler) {
-        final String endpointId = "foobar";
-
         final MainThreadExecutable mainThreadExecutable =
                 new TestMainThreadExecutable(Runnable::run);
 
+        final ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService =
+                new ManuallyTriggeredScheduledExecutorService();
+
         final RpcEndpoint.MainThreadExecutor mainThreadExecutor =
-                new RpcEndpoint.MainThreadExecutor(mainThreadExecutable, () -> {}, endpointId);
+                new RpcEndpoint.MainThreadExecutor(
+                        mainThreadExecutable, () -> {}, manuallyTriggeredScheduledExecutorService);
         final CompletableFuture<Long> actualDelayMsFuture = new CompletableFuture<>();
 
         ScheduledFuture<?> scheduledFuture =
                 scheduler.apply(mainThreadExecutor, actualDelayMsFuture);
         scheduledFuture.cancel(true);
+        manuallyTriggeredScheduledExecutorService.triggerAllNonPeriodicTasks();
 
-        assumeTrue(!actualDelayMsFuture.isDone(), "The command is done and no need to cancel it.");
         assertTrue(scheduledFuture.isCancelled());
         assertFalse(actualDelayMsFuture.isDone());
         mainThreadExecutor.close();


[flink] 02/02: [hotfix][tests] Minor cleanup

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d0434e698fc39c89aae661aae81efa777ced7ce5
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Aug 29 13:31:05 2022 +0200

    [hotfix][tests] Minor cleanup
---
 .../org/apache/flink/runtime/rpc/RpcEndpointTest.java   | 17 ++++++++---------
 1 file changed, 8 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
index 07216016a92..f412c796e1c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
@@ -321,11 +321,11 @@ public class RpcEndpointTest {
     @Test
     public void testCancelScheduledRunnable() throws Exception {
         testCancelScheduledTask(
-                (mainThreadExecutor, longCompletableFuture) -> {
+                (mainThreadExecutor, future) -> {
                     final Duration delayDuration = Duration.ofMillis(2);
                     return mainThreadExecutor.schedule(
                             () -> {
-                                longCompletableFuture.complete(delayDuration.toMillis());
+                                future.complete(null);
                             },
                             delayDuration.toMillis(),
                             TimeUnit.MILLISECONDS);
@@ -359,11 +359,11 @@ public class RpcEndpointTest {
     @Test
     public void testCancelScheduledCallable() {
         testCancelScheduledTask(
-                (mainThreadExecutor, longCompletableFuture) -> {
+                (mainThreadExecutor, future) -> {
                     final Duration delayDuration = Duration.ofMillis(2);
                     return mainThreadExecutor.schedule(
                             () -> {
-                                longCompletableFuture.complete(delayDuration.toMillis());
+                                future.complete(null);
                                 return null;
                             },
                             delayDuration.toMillis(),
@@ -410,7 +410,7 @@ public class RpcEndpointTest {
     }
 
     private static void testCancelScheduledTask(
-            BiFunction<RpcEndpoint.MainThreadExecutor, CompletableFuture<Long>, ScheduledFuture<?>>
+            BiFunction<RpcEndpoint.MainThreadExecutor, CompletableFuture<Void>, ScheduledFuture<?>>
                     scheduler) {
         final MainThreadExecutable mainThreadExecutable =
                 new TestMainThreadExecutable(Runnable::run);
@@ -421,15 +421,14 @@ public class RpcEndpointTest {
         final RpcEndpoint.MainThreadExecutor mainThreadExecutor =
                 new RpcEndpoint.MainThreadExecutor(
                         mainThreadExecutable, () -> {}, manuallyTriggeredScheduledExecutorService);
-        final CompletableFuture<Long> actualDelayMsFuture = new CompletableFuture<>();
+        final CompletableFuture<Void> actionFuture = new CompletableFuture<>();
 
-        ScheduledFuture<?> scheduledFuture =
-                scheduler.apply(mainThreadExecutor, actualDelayMsFuture);
+        ScheduledFuture<?> scheduledFuture = scheduler.apply(mainThreadExecutor, actionFuture);
         scheduledFuture.cancel(true);
         manuallyTriggeredScheduledExecutorService.triggerAllNonPeriodicTasks();
 
         assertTrue(scheduledFuture.isCancelled());
-        assertFalse(actualDelayMsFuture.isDone());
+        assertFalse(actionFuture.isDone());
         mainThreadExecutor.close();
     }