You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/07/03 23:03:35 UTC

[flink] 03/04: [hotfix][runtime] Make ComponentMainThreadExecutorServiceAdapter accept ScheduledExecutor

This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ba12e9af4f6af3bff1ae3298e9aaaf1edbdff744
Author: Gary Yao <ga...@apache.org>
AuthorDate: Wed Jul 3 20:19:16 2019 +0200

    [hotfix][runtime] Make ComponentMainThreadExecutorServiceAdapter accept ScheduledExecutor
---
 .../ComponentMainThreadExecutorServiceAdapter.java | 60 ++++++++++++++++++----
 1 file changed, 51 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
index dbd94ab..c71675a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
@@ -18,31 +18,73 @@
 
 package org.apache.flink.runtime.concurrent;
 
-import javax.annotation.Nonnull;
+import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Adapter class for a {@link ScheduledExecutorService} which shall be used as a
+ * Adapter class for a {@link ScheduledExecutorService} or {@link ScheduledExecutor} which shall be used as a
  * {@link ComponentMainThreadExecutor}. It enhances the given executor with an assert that the current thread is the
  * main thread of the executor.
  */
-public class ComponentMainThreadExecutorServiceAdapter
-	extends ScheduledExecutorServiceAdapter implements ComponentMainThreadExecutor {
+public class ComponentMainThreadExecutorServiceAdapter implements ComponentMainThreadExecutor {
+
+	private final ScheduledExecutor scheduledExecutor;
 
 	/** A runnable that should assert that the current thread is the expected main thread. */
-	@Nonnull
 	private final Runnable mainThreadCheck;
 
 	public ComponentMainThreadExecutorServiceAdapter(
-		@Nonnull ScheduledExecutorService scheduledExecutorService,
-		@Nonnull Runnable mainThreadCheck) {
-		super(scheduledExecutorService);
-		this.mainThreadCheck = mainThreadCheck;
+			final ScheduledExecutorService scheduledExecutorService,
+			final Runnable mainThreadCheck) {
+		this(new ScheduledExecutorServiceAdapter(scheduledExecutorService), mainThreadCheck);
+	}
+
+	public ComponentMainThreadExecutorServiceAdapter(
+			final ScheduledExecutor scheduledExecutorService,
+			final Thread mainThread) {
+		this(scheduledExecutorService, () -> MainThreadValidatorUtil.isRunningInExpectedThread(mainThread));
+	}
+
+	private ComponentMainThreadExecutorServiceAdapter(
+			final ScheduledExecutor scheduledExecutor,
+			final Runnable mainThreadCheck) {
+		this.scheduledExecutor = checkNotNull(scheduledExecutor);
+		this.mainThreadCheck = checkNotNull(mainThreadCheck);
 	}
 
 	@Override
 	public void assertRunningInMainThread() {
 		mainThreadCheck.run();
 	}
+
+	@Override
+	public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
+		return scheduledExecutor.schedule(command, delay, unit);
+	}
+
+	@Override
+	public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) {
+		return scheduledExecutor.schedule(callable, delay, unit);
+	}
+
+	@Override
+	public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) {
+		return scheduledExecutor.scheduleAtFixedRate(command, initialDelay, period, unit);
+	}
+
+	@Override
+	public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) {
+		return scheduledExecutor.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+	}
+
+	@Override
+	public void execute(final Runnable command) {
+		scheduledExecutor.execute(command);
+	}
 }