You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/08/29 15:13:37 UTC

[flink] 01/02: [FLINK-27030][tests] Prevent race-condition

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aa3c124b536adb42d219a57594d399379397bf0a
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Aug 29 13:28:25 2022 +0200

    [FLINK-27030][tests] Prevent race-condition
---
 .../java/org/apache/flink/runtime/rpc/RpcEndpoint.java  | 17 ++++++++++++++---
 .../org/apache/flink/runtime/rpc/RpcEndpointTest.java   | 12 +++++++-----
 2 files changed, 21 insertions(+), 8 deletions(-)

diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 2aa90b7df91..1eb32a0549b 100644
--- a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rpc;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledFutureAdapter;
@@ -470,11 +471,21 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
 
         MainThreadExecutor(
                 MainThreadExecutable gateway, Runnable mainThreadCheck, String endpointId) {
+            this(
+                    gateway,
+                    mainThreadCheck,
+                    Executors.newSingleThreadScheduledExecutor(
+                            new ExecutorThreadFactory(endpointId + "-main-scheduler")));
+        }
+
+        @VisibleForTesting
+        MainThreadExecutor(
+                MainThreadExecutable gateway,
+                Runnable mainThreadCheck,
+                ScheduledExecutorService mainScheduledExecutor) {
             this.gateway = Preconditions.checkNotNull(gateway);
             this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);
-            this.mainScheduledExecutor =
-                    Executors.newSingleThreadScheduledExecutor(
-                            new ExecutorThreadFactory(endpointId + "-main-scheduler"));
+            this.mainScheduledExecutor = mainScheduledExecutor;
         }
 
         @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
index 20e45ab92b8..07216016a92 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.util.TestLoggerExtension;
 
 import org.junit.jupiter.api.AfterAll;
@@ -44,7 +45,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
-import static org.junit.jupiter.api.Assumptions.assumeTrue;
 
 /** Tests for the RpcEndpoint, its self gateways and MainThreadExecutor scheduling command. */
 @ExtendWith(TestLoggerExtension.class)
@@ -412,20 +412,22 @@ public class RpcEndpointTest {
     private static void testCancelScheduledTask(
             BiFunction<RpcEndpoint.MainThreadExecutor, CompletableFuture<Long>, ScheduledFuture<?>>
                     scheduler) {
-        final String endpointId = "foobar";
-
         final MainThreadExecutable mainThreadExecutable =
                 new TestMainThreadExecutable(Runnable::run);
 
+        final ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService =
+                new ManuallyTriggeredScheduledExecutorService();
+
         final RpcEndpoint.MainThreadExecutor mainThreadExecutor =
-                new RpcEndpoint.MainThreadExecutor(mainThreadExecutable, () -> {}, endpointId);
+                new RpcEndpoint.MainThreadExecutor(
+                        mainThreadExecutable, () -> {}, manuallyTriggeredScheduledExecutorService);
         final CompletableFuture<Long> actualDelayMsFuture = new CompletableFuture<>();
 
         ScheduledFuture<?> scheduledFuture =
                 scheduler.apply(mainThreadExecutor, actualDelayMsFuture);
         scheduledFuture.cancel(true);
+        manuallyTriggeredScheduledExecutorService.triggerAllNonPeriodicTasks();
 
-        assumeTrue(!actualDelayMsFuture.isDone(), "The command is done and no need to cancel it.");
         assertTrue(scheduledFuture.isCancelled());
         assertFalse(actualDelayMsFuture.isDone());
         mainThreadExecutor.close();