You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/12/20 10:04:47 UTC

[flink] 01/02: [FLINK-10461][runtime] Refactor direct executor service

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 17082604cac8ad596ce42d21ef748eea6e5ae2e2
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Sat Dec 15 10:50:44 2018 +0100

    [FLINK-10461][runtime] Refactor direct executor service
    
    Co-authored-by: Andrey Zagrebin <az...@gmail.com>
    Co-authored-by: klion26 <qc...@gmail.com>
---
 .../runtime/concurrent}/DirectExecutorService.java | 74 ++++++++++++++--------
 .../apache/flink/runtime/concurrent/Executors.java | 31 ++++-----
 .../runtime/heartbeat/HeartbeatManagerTest.java    | 13 ++--
 .../streaming/runtime/tasks/StreamTaskTest.java    |  4 +-
 4 files changed, 65 insertions(+), 57 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/DirectExecutorService.java
similarity index 69%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/DirectExecutorService.java
index 1d7c971..c37adbd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/DirectExecutorService.java
@@ -16,7 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.util;
+package org.apache.flink.runtime.concurrent;
+
+import javax.annotation.Nonnull;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -31,37 +33,42 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-public class DirectExecutorService implements ExecutorService {
-	private boolean _shutdown = false;
+/** The direct executor service directly executes the runnables and the callables in the calling thread. */
+class DirectExecutorService implements ExecutorService {
+	static final DirectExecutorService INSTANCE = new DirectExecutorService();
+
+	private boolean isShutdown = false;
 
 	@Override
 	public void shutdown() {
-		_shutdown = true;
+		isShutdown = true;
 	}
 
 	@Override
+	@Nonnull
 	public List<Runnable> shutdownNow() {
-		_shutdown = true;
+		isShutdown = true;
 		return Collections.emptyList();
 	}
 
 	@Override
 	public boolean isShutdown() {
-		return _shutdown;
+		return isShutdown;
 	}
 
 	@Override
 	public boolean isTerminated() {
-		return _shutdown;
+		return isShutdown;
 	}
 
 	@Override
-	public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
-		return _shutdown;
+	public boolean awaitTermination(long timeout, @Nonnull TimeUnit unit) {
+		return isShutdown;
 	}
 
 	@Override
-	public <T> Future<T> submit(Callable<T> task) {
+	@Nonnull
+	public <T> Future<T> submit(@Nonnull Callable<T> task) {
 		try {
 			T result = task.call();
 
@@ -72,34 +79,40 @@ public class DirectExecutorService implements ExecutorService {
 	}
 
 	@Override
-	public <T> Future<T> submit(Runnable task, T result) {
+	@Nonnull
+	public <T> Future<T> submit(@Nonnull Runnable task, T result) {
 		task.run();
 
 		return new CompletedFuture<>(result, null);
 	}
 
 	@Override
-	public Future<?> submit(Runnable task) {
+	@Nonnull
+	public Future<?> submit(@Nonnull Runnable task) {
 		task.run();
 		return new CompletedFuture<>(null, null);
 	}
 
 	@Override
-	public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+	@Nonnull
+	public <T> List<Future<T>> invokeAll(@Nonnull Collection<? extends Callable<T>> tasks) {
 		ArrayList<Future<T>> result = new ArrayList<>();
 
 		for (Callable<T> task : tasks) {
 			try {
-				result.add(new CompletedFuture<T>(task.call(), null));
+				result.add(new CompletedFuture<>(task.call(), null));
 			} catch (Exception e) {
-				result.add(new CompletedFuture<T>(null, e));
+				result.add(new CompletedFuture<>(null, e));
 			}
 		}
 		return result;
 	}
 
 	@Override
-	public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+	@Nonnull
+	public <T> List<Future<T>> invokeAll(
+		@Nonnull Collection<? extends Callable<T>> tasks, long timeout, @Nonnull TimeUnit unit) {
+
 		long end = System.currentTimeMillis() + unit.toMillis(timeout);
 		Iterator<? extends Callable<T>> iterator = tasks.iterator();
 		ArrayList<Future<T>> result = new ArrayList<>();
@@ -108,13 +121,13 @@ public class DirectExecutorService implements ExecutorService {
 			Callable<T> callable = iterator.next();
 
 			try {
-				result.add(new CompletedFuture<T>(callable.call(), null));
+				result.add(new CompletedFuture<>(callable.call(), null));
 			} catch (Exception e) {
-				result.add(new CompletedFuture<T>(null, e));
+				result.add(new CompletedFuture<>(null, e));
 			}
 		}
 
-		while(iterator.hasNext()) {
+		while (iterator.hasNext()) {
 			iterator.next();
 			result.add(new Future<T>() {
 				@Override
@@ -133,12 +146,12 @@ public class DirectExecutorService implements ExecutorService {
 				}
 
 				@Override
-				public T get() throws InterruptedException, ExecutionException {
+				public T get() {
 					throw new CancellationException("Task has been cancelled.");
 				}
 
 				@Override
-				public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+				public T get(long timeout, @Nonnull TimeUnit unit) {
 					throw new CancellationException("Task has been cancelled.");
 				}
 			});
@@ -148,7 +161,8 @@ public class DirectExecutorService implements ExecutorService {
 	}
 
 	@Override
-	public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+	@Nonnull
+	public <T> T invokeAny(@Nonnull Collection<? extends Callable<T>> tasks) throws ExecutionException {
 		Exception exception = null;
 
 		for (Callable<T> task : tasks) {
@@ -164,7 +178,11 @@ public class DirectExecutorService implements ExecutorService {
 	}
 
 	@Override
-	public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+	public <T> T invokeAny(
+		@Nonnull Collection<? extends Callable<T>> tasks,
+		long timeout,
+		@Nonnull TimeUnit unit) throws ExecutionException, TimeoutException {
+
 		long end = System.currentTimeMillis() + unit.toMillis(timeout);
 		Exception exception = null;
 
@@ -189,15 +207,15 @@ public class DirectExecutorService implements ExecutorService {
 	}
 
 	@Override
-	public void execute(Runnable command) {
+	public void execute(@Nonnull Runnable command) {
 		command.run();
 	}
 
-	public static class CompletedFuture<V> implements Future<V> {
+	static class CompletedFuture<V> implements Future<V> {
 		private final V value;
 		private final Exception exception;
 
-		public CompletedFuture(V value, Exception exception) {
+		CompletedFuture(V value, Exception exception) {
 			this.value = value;
 			this.exception = exception;
 		}
@@ -218,7 +236,7 @@ public class DirectExecutorService implements ExecutorService {
 		}
 
 		@Override
-		public V get() throws InterruptedException, ExecutionException {
+		public V get() throws ExecutionException {
 			if (exception != null) {
 				throw new ExecutionException(exception);
 			} else {
@@ -227,7 +245,7 @@ public class DirectExecutorService implements ExecutorService {
 		}
 
 		@Override
-		public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+		public V get(long timeout, @Nonnull TimeUnit unit) throws ExecutionException {
 			return get();
 		}
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
index 703ac4e..41d9a32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -18,22 +18,16 @@
 
 package org.apache.flink.runtime.concurrent;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nonnull;
-
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 
 import scala.concurrent.ExecutionContext;
 
 /**
- * Collection of {@link Executor} implementations.
+ * Collection of {@link Executor}, {@link ExecutorService} and {@link ExecutionContext} implementations.
  */
 public class Executors {
 
-	private static final Logger LOG = LoggerFactory.getLogger(Executors.class);
-
 	/**
 	 * Return a direct executor. The direct executor directly executes the runnable in the calling
 	 * thread.
@@ -41,22 +35,19 @@ public class Executors {
 	 * @return Direct executor
 	 */
 	public static Executor directExecutor() {
-		return DirectExecutor.INSTANCE;
+		return DirectExecutorService.INSTANCE;
 	}
 
 	/**
-	 * Direct executor implementation.
+	 * Return a new direct executor service.
+	 *
+	 * <p>The direct executor service directly executes the runnables and the callables in the calling
+	 * thread.
+	 *
+	 * @return New direct executor service
 	 */
-	private static class DirectExecutor implements Executor {
-
-		static final DirectExecutor INSTANCE = new DirectExecutor();
-
-		private DirectExecutor() {}
-
-		@Override
-		public void execute(@Nonnull Runnable command) {
-			command.run();
-		}
+	public static ExecutorService newDirectExecutorService() {
+		return new DirectExecutorService();
 	}
 
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
index e4e86bb..f8bfa94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
-import org.apache.flink.runtime.util.DirectExecutorService;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -79,7 +78,7 @@ public class HeartbeatManagerTest extends TestLogger {
 			heartbeatTimeout,
 			ownResourceID,
 			heartbeatListener,
-			new DirectExecutorService(),
+			Executors.directExecutor(),
 			scheduledExecutor,
 			LOG);
 
@@ -122,7 +121,7 @@ public class HeartbeatManagerTest extends TestLogger {
 			heartbeatTimeout,
 			ownResourceID,
 			heartbeatListener,
-			new DirectExecutorService(),
+			Executors.directExecutor(),
 			scheduledExecutor,
 			LOG);
 
@@ -163,7 +162,7 @@ public class HeartbeatManagerTest extends TestLogger {
 			heartbeatTimeout,
 			ownResourceID,
 			heartbeatListener,
-			new DirectExecutorService(),
+			Executors.directExecutor(),
 			new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)),
 			LOG);
 
@@ -215,7 +214,7 @@ public class HeartbeatManagerTest extends TestLogger {
 			heartbeatTimeout,
 			resourceID,
 			heartbeatListener,
-			new DirectExecutorService(),
+			Executors.directExecutor(),
 			new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)),
 			LOG);
 
@@ -224,7 +223,7 @@ public class HeartbeatManagerTest extends TestLogger {
 			heartbeatTimeout,
 			resourceID2,
 			heartbeatListener2,
-			new DirectExecutorService(),
+			Executors.directExecutor(),
 			new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)),
 			LOG);
 
@@ -264,7 +263,7 @@ public class HeartbeatManagerTest extends TestLogger {
 			heartbeatTimeout,
 			resourceID,
 			heartbeatListener,
-			new DirectExecutorService(),
+			Executors.directExecutor(),
 			new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)),
 			LOG);
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index a593bd5..8dc4b44 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -91,7 +91,6 @@ import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.DirectExecutorService;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -137,6 +136,7 @@ import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService;
 import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasSize;
@@ -423,7 +423,7 @@ public class StreamTaskTest extends TestLogger {
 		Whitebox.setInternalState(streamTask, "lock", new Object());
 		Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
 		Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
-		Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", new DirectExecutorService());
+		Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", newDirectExecutorService());
 		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
 		Whitebox.setInternalState(streamTask, "checkpointStorage", new MemoryBackendCheckpointStorage(new JobID(), null, null, Integer.MAX_VALUE));