You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/08/29 15:13:37 UTC
[flink] 01/02: [FLINK-27030][tests] Prevent race-condition
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit aa3c124b536adb42d219a57594d399379397bf0a
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Aug 29 13:28:25 2022 +0200
[FLINK-27030][tests] Prevent race-condition
---
.../java/org/apache/flink/runtime/rpc/RpcEndpoint.java | 17 ++++++++++++++---
.../org/apache/flink/runtime/rpc/RpcEndpointTest.java | 12 +++++++-----
2 files changed, 21 insertions(+), 8 deletions(-)
diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 2aa90b7df91..1eb32a0549b 100644
--- a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.rpc;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ScheduledFutureAdapter;
@@ -470,11 +471,21 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
MainThreadExecutor(
MainThreadExecutable gateway, Runnable mainThreadCheck, String endpointId) {
+ this(
+ gateway,
+ mainThreadCheck,
+ Executors.newSingleThreadScheduledExecutor(
+ new ExecutorThreadFactory(endpointId + "-main-scheduler")));
+ }
+
+ @VisibleForTesting
+ MainThreadExecutor(
+ MainThreadExecutable gateway,
+ Runnable mainThreadCheck,
+ ScheduledExecutorService mainScheduledExecutor) {
this.gateway = Preconditions.checkNotNull(gateway);
this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);
- this.mainScheduledExecutor =
- Executors.newSingleThreadScheduledExecutor(
- new ExecutorThreadFactory(endpointId + "-main-scheduler"));
+ this.mainScheduledExecutor = mainScheduledExecutor;
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
index 20e45ab92b8..07216016a92 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.rpc;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.util.TestLoggerExtension;
import org.junit.jupiter.api.AfterAll;
@@ -44,7 +45,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
-import static org.junit.jupiter.api.Assumptions.assumeTrue;
/** Tests for the RpcEndpoint, its self gateways and MainThreadExecutor scheduling command. */
@ExtendWith(TestLoggerExtension.class)
@@ -412,20 +412,22 @@ public class RpcEndpointTest {
private static void testCancelScheduledTask(
BiFunction<RpcEndpoint.MainThreadExecutor, CompletableFuture<Long>, ScheduledFuture<?>>
scheduler) {
- final String endpointId = "foobar";
-
final MainThreadExecutable mainThreadExecutable =
new TestMainThreadExecutable(Runnable::run);
+ final ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService =
+ new ManuallyTriggeredScheduledExecutorService();
+
final RpcEndpoint.MainThreadExecutor mainThreadExecutor =
- new RpcEndpoint.MainThreadExecutor(mainThreadExecutable, () -> {}, endpointId);
+ new RpcEndpoint.MainThreadExecutor(
+ mainThreadExecutable, () -> {}, manuallyTriggeredScheduledExecutorService);
final CompletableFuture<Long> actualDelayMsFuture = new CompletableFuture<>();
ScheduledFuture<?> scheduledFuture =
scheduler.apply(mainThreadExecutor, actualDelayMsFuture);
scheduledFuture.cancel(true);
+ manuallyTriggeredScheduledExecutorService.triggerAllNonPeriodicTasks();
- assumeTrue(!actualDelayMsFuture.isDone(), "The command is done and no need to cancel it.");
assertTrue(scheduledFuture.isCancelled());
assertFalse(actualDelayMsFuture.isDone());
mainThreadExecutor.close();