You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 16:27:54 UTC
[flink] 05/13: [FLINK-17671][tests][refactor] Simplify
ManuallyTriggeredScheduledExecutor for better debugability.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8ad1ba3b7cc9d80e7bcd09e140950bce971d7657
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue May 12 16:42:47 2020 +0200
[FLINK-17671][tests][refactor] Simplify ManuallyTriggeredScheduledExecutor for better debugability.
---
.../runtime/concurrent/ManuallyTriggeredScheduledExecutor.java | 9 +--------
1 file changed, 1 insertion(+), 8 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
index 04d1f48..0453bcc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
@@ -32,7 +32,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -43,8 +42,6 @@ import java.util.stream.Collectors;
*/
public class ManuallyTriggeredScheduledExecutor implements ScheduledExecutor {
- private final Executor executorDelegate;
-
private final ArrayDeque<Runnable> queuedRunnables = new ArrayDeque<>();
private final ConcurrentLinkedQueue<ScheduledTask<?>> nonPeriodicScheduledTasks =
@@ -53,10 +50,6 @@ public class ManuallyTriggeredScheduledExecutor implements ScheduledExecutor {
private final ConcurrentLinkedQueue<ScheduledTask<?>> periodicScheduledTasks =
new ConcurrentLinkedQueue<>();
- public ManuallyTriggeredScheduledExecutor() {
- this.executorDelegate = Runnable::run;
- }
-
@Override
public void execute(@Nonnull Runnable command) {
synchronized (queuedRunnables) {
@@ -82,7 +75,7 @@ public class ManuallyTriggeredScheduledExecutor implements ScheduledExecutor {
next = queuedRunnables.removeFirst();
}
- CompletableFuture.runAsync(next, executorDelegate).join();
+ next.run();
}
/**