You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 16:27:54 UTC

[flink] 05/13: [FLINK-17671][tests][refactor] Simplify ManuallyTriggeredScheduledExecutor for better debugability.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8ad1ba3b7cc9d80e7bcd09e140950bce971d7657
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue May 12 16:42:47 2020 +0200

    [FLINK-17671][tests][refactor] Simplify ManuallyTriggeredScheduledExecutor for better debugability.
---
 .../runtime/concurrent/ManuallyTriggeredScheduledExecutor.java   | 9 +--------
 1 file changed, 1 insertion(+), 8 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
index 04d1f48..0453bcc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
@@ -32,7 +32,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -43,8 +42,6 @@ import java.util.stream.Collectors;
  */
 public class ManuallyTriggeredScheduledExecutor implements ScheduledExecutor {
 
-	private final Executor executorDelegate;
-
 	private final ArrayDeque<Runnable> queuedRunnables = new ArrayDeque<>();
 
 	private final ConcurrentLinkedQueue<ScheduledTask<?>> nonPeriodicScheduledTasks =
@@ -53,10 +50,6 @@ public class ManuallyTriggeredScheduledExecutor implements ScheduledExecutor {
 	private final ConcurrentLinkedQueue<ScheduledTask<?>> periodicScheduledTasks =
 		new ConcurrentLinkedQueue<>();
 
-	public ManuallyTriggeredScheduledExecutor() {
-		this.executorDelegate = Runnable::run;
-	}
-
 	@Override
 	public void execute(@Nonnull Runnable command) {
 		synchronized (queuedRunnables) {
@@ -82,7 +75,7 @@ public class ManuallyTriggeredScheduledExecutor implements ScheduledExecutor {
 			next = queuedRunnables.removeFirst();
 		}
 
-		CompletableFuture.runAsync(next, executorDelegate).join();
+		next.run();
 	}
 
 	/**