You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/07/03 23:03:35 UTC
[flink] 03/04: [hotfix][runtime] Make
ComponentMainThreadExecutorServiceAdapter accept ScheduledExecutor
This is an automated email from the ASF dual-hosted git repository.
gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ba12e9af4f6af3bff1ae3298e9aaaf1edbdff744
Author: Gary Yao <ga...@apache.org>
AuthorDate: Wed Jul 3 20:19:16 2019 +0200
[hotfix][runtime] Make ComponentMainThreadExecutorServiceAdapter accept ScheduledExecutor
---
.../ComponentMainThreadExecutorServiceAdapter.java | 60 ++++++++++++++++++----
1 file changed, 51 insertions(+), 9 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
index dbd94ab..c71675a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
@@ -18,31 +18,73 @@
package org.apache.flink.runtime.concurrent;
-import javax.annotation.Nonnull;
+import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
+import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * Adapter class for a {@link ScheduledExecutorService} which shall be used as a
+ * Adapter class for a {@link ScheduledExecutorService} or {@link ScheduledExecutor} which shall be used as a
* {@link ComponentMainThreadExecutor}. It enhances the given executor with an assert that the current thread is the
* main thread of the executor.
*/
-public class ComponentMainThreadExecutorServiceAdapter
- extends ScheduledExecutorServiceAdapter implements ComponentMainThreadExecutor {
+public class ComponentMainThreadExecutorServiceAdapter implements ComponentMainThreadExecutor {
+
+ private final ScheduledExecutor scheduledExecutor;
/** A runnable that should assert that the current thread is the expected main thread. */
- @Nonnull
private final Runnable mainThreadCheck;
public ComponentMainThreadExecutorServiceAdapter(
- @Nonnull ScheduledExecutorService scheduledExecutorService,
- @Nonnull Runnable mainThreadCheck) {
- super(scheduledExecutorService);
- this.mainThreadCheck = mainThreadCheck;
+ final ScheduledExecutorService scheduledExecutorService,
+ final Runnable mainThreadCheck) {
+ this(new ScheduledExecutorServiceAdapter(scheduledExecutorService), mainThreadCheck);
+ }
+
+ public ComponentMainThreadExecutorServiceAdapter(
+ final ScheduledExecutor scheduledExecutorService,
+ final Thread mainThread) {
+ this(scheduledExecutorService, () -> MainThreadValidatorUtil.isRunningInExpectedThread(mainThread));
+ }
+
+ private ComponentMainThreadExecutorServiceAdapter(
+ final ScheduledExecutor scheduledExecutor,
+ final Runnable mainThreadCheck) {
+ this.scheduledExecutor = checkNotNull(scheduledExecutor);
+ this.mainThreadCheck = checkNotNull(mainThreadCheck);
}
@Override
public void assertRunningInMainThread() {
mainThreadCheck.run();
}
+
+ @Override
+ public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
+ return scheduledExecutor.schedule(command, delay, unit);
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) {
+ return scheduledExecutor.schedule(callable, delay, unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) {
+ return scheduledExecutor.scheduleAtFixedRate(command, initialDelay, period, unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) {
+ return scheduledExecutor.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ }
+
+ @Override
+ public void execute(final Runnable command) {
+ scheduledExecutor.execute(command);
+ }
}