You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/12/20 10:04:47 UTC
[flink] 01/02: [FLINK-10461][runtime] Refactor direct executor
service
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 17082604cac8ad596ce42d21ef748eea6e5ae2e2
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Sat Dec 15 10:50:44 2018 +0100
[FLINK-10461][runtime] Refactor direct executor service
Co-authored-by: Andrey Zagrebin <az...@gmail.com>
Co-authored-by: klion26 <qc...@gmail.com>
---
.../runtime/concurrent}/DirectExecutorService.java | 74 ++++++++++++++--------
.../apache/flink/runtime/concurrent/Executors.java | 31 ++++-----
.../runtime/heartbeat/HeartbeatManagerTest.java | 13 ++--
.../streaming/runtime/tasks/StreamTaskTest.java | 4 +-
4 files changed, 65 insertions(+), 57 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/DirectExecutorService.java
similarity index 69%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/DirectExecutorService.java
index 1d7c971..c37adbd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/DirectExecutorService.java
@@ -16,7 +16,9 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.util;
+package org.apache.flink.runtime.concurrent;
+
+import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collection;
@@ -31,37 +33,42 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-public class DirectExecutorService implements ExecutorService {
- private boolean _shutdown = false;
+/** The direct executor service directly executes the runnables and the callables in the calling thread. */
+class DirectExecutorService implements ExecutorService {
+ static final DirectExecutorService INSTANCE = new DirectExecutorService();
+
+ private boolean isShutdown = false;
@Override
public void shutdown() {
- _shutdown = true;
+ isShutdown = true;
}
@Override
+ @Nonnull
public List<Runnable> shutdownNow() {
- _shutdown = true;
+ isShutdown = true;
return Collections.emptyList();
}
@Override
public boolean isShutdown() {
- return _shutdown;
+ return isShutdown;
}
@Override
public boolean isTerminated() {
- return _shutdown;
+ return isShutdown;
}
@Override
- public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
- return _shutdown;
+ public boolean awaitTermination(long timeout, @Nonnull TimeUnit unit) {
+ return isShutdown;
}
@Override
- public <T> Future<T> submit(Callable<T> task) {
+ @Nonnull
+ public <T> Future<T> submit(@Nonnull Callable<T> task) {
try {
T result = task.call();
@@ -72,34 +79,40 @@ public class DirectExecutorService implements ExecutorService {
}
@Override
- public <T> Future<T> submit(Runnable task, T result) {
+ @Nonnull
+ public <T> Future<T> submit(@Nonnull Runnable task, T result) {
task.run();
return new CompletedFuture<>(result, null);
}
@Override
- public Future<?> submit(Runnable task) {
+ @Nonnull
+ public Future<?> submit(@Nonnull Runnable task) {
task.run();
return new CompletedFuture<>(null, null);
}
@Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+ @Nonnull
+ public <T> List<Future<T>> invokeAll(@Nonnull Collection<? extends Callable<T>> tasks) {
ArrayList<Future<T>> result = new ArrayList<>();
for (Callable<T> task : tasks) {
try {
- result.add(new CompletedFuture<T>(task.call(), null));
+ result.add(new CompletedFuture<>(task.call(), null));
} catch (Exception e) {
- result.add(new CompletedFuture<T>(null, e));
+ result.add(new CompletedFuture<>(null, e));
}
}
return result;
}
@Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ @Nonnull
+ public <T> List<Future<T>> invokeAll(
+ @Nonnull Collection<? extends Callable<T>> tasks, long timeout, @Nonnull TimeUnit unit) {
+
long end = System.currentTimeMillis() + unit.toMillis(timeout);
Iterator<? extends Callable<T>> iterator = tasks.iterator();
ArrayList<Future<T>> result = new ArrayList<>();
@@ -108,13 +121,13 @@ public class DirectExecutorService implements ExecutorService {
Callable<T> callable = iterator.next();
try {
- result.add(new CompletedFuture<T>(callable.call(), null));
+ result.add(new CompletedFuture<>(callable.call(), null));
} catch (Exception e) {
- result.add(new CompletedFuture<T>(null, e));
+ result.add(new CompletedFuture<>(null, e));
}
}
- while(iterator.hasNext()) {
+ while (iterator.hasNext()) {
iterator.next();
result.add(new Future<T>() {
@Override
@@ -133,12 +146,12 @@ public class DirectExecutorService implements ExecutorService {
}
@Override
- public T get() throws InterruptedException, ExecutionException {
+ public T get() {
throw new CancellationException("Task has been cancelled.");
}
@Override
- public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ public T get(long timeout, @Nonnull TimeUnit unit) {
throw new CancellationException("Task has been cancelled.");
}
});
@@ -148,7 +161,8 @@ public class DirectExecutorService implements ExecutorService {
}
@Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+ @Nonnull
+ public <T> T invokeAny(@Nonnull Collection<? extends Callable<T>> tasks) throws ExecutionException {
Exception exception = null;
for (Callable<T> task : tasks) {
@@ -164,7 +178,11 @@ public class DirectExecutorService implements ExecutorService {
}
@Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ public <T> T invokeAny(
+ @Nonnull Collection<? extends Callable<T>> tasks,
+ long timeout,
+ @Nonnull TimeUnit unit) throws ExecutionException, TimeoutException {
+
long end = System.currentTimeMillis() + unit.toMillis(timeout);
Exception exception = null;
@@ -189,15 +207,15 @@ public class DirectExecutorService implements ExecutorService {
}
@Override
- public void execute(Runnable command) {
+ public void execute(@Nonnull Runnable command) {
command.run();
}
- public static class CompletedFuture<V> implements Future<V> {
+ static class CompletedFuture<V> implements Future<V> {
private final V value;
private final Exception exception;
- public CompletedFuture(V value, Exception exception) {
+ CompletedFuture(V value, Exception exception) {
this.value = value;
this.exception = exception;
}
@@ -218,7 +236,7 @@ public class DirectExecutorService implements ExecutorService {
}
@Override
- public V get() throws InterruptedException, ExecutionException {
+ public V get() throws ExecutionException {
if (exception != null) {
throw new ExecutionException(exception);
} else {
@@ -227,7 +245,7 @@ public class DirectExecutorService implements ExecutorService {
}
@Override
- public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ public V get(long timeout, @Nonnull TimeUnit unit) throws ExecutionException {
return get();
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
index 703ac4e..41d9a32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -18,22 +18,16 @@
package org.apache.flink.runtime.concurrent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nonnull;
-
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import scala.concurrent.ExecutionContext;
/**
- * Collection of {@link Executor} implementations.
+ * Collection of {@link Executor}, {@link ExecutorService} and {@link ExecutionContext} implementations.
*/
public class Executors {
- private static final Logger LOG = LoggerFactory.getLogger(Executors.class);
-
/**
* Return a direct executor. The direct executor directly executes the runnable in the calling
* thread.
@@ -41,22 +35,19 @@ public class Executors {
* @return Direct executor
*/
public static Executor directExecutor() {
- return DirectExecutor.INSTANCE;
+ return DirectExecutorService.INSTANCE;
}
/**
- * Direct executor implementation.
+ * Return a new direct executor service.
+ *
+ * <p>The direct executor service directly executes the runnables and the callables in the calling
+ * thread.
+ *
+ * @return New direct executor service
*/
- private static class DirectExecutor implements Executor {
-
- static final DirectExecutor INSTANCE = new DirectExecutor();
-
- private DirectExecutor() {}
-
- @Override
- public void execute(@Nonnull Runnable command) {
- command.run();
- }
+ public static ExecutorService newDirectExecutorService() {
+ return new DirectExecutorService();
}
/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
index e4e86bb..f8bfa94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
-import org.apache.flink.runtime.util.DirectExecutorService;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -79,7 +78,7 @@ public class HeartbeatManagerTest extends TestLogger {
heartbeatTimeout,
ownResourceID,
heartbeatListener,
- new DirectExecutorService(),
+ Executors.directExecutor(),
scheduledExecutor,
LOG);
@@ -122,7 +121,7 @@ public class HeartbeatManagerTest extends TestLogger {
heartbeatTimeout,
ownResourceID,
heartbeatListener,
- new DirectExecutorService(),
+ Executors.directExecutor(),
scheduledExecutor,
LOG);
@@ -163,7 +162,7 @@ public class HeartbeatManagerTest extends TestLogger {
heartbeatTimeout,
ownResourceID,
heartbeatListener,
- new DirectExecutorService(),
+ Executors.directExecutor(),
new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)),
LOG);
@@ -215,7 +214,7 @@ public class HeartbeatManagerTest extends TestLogger {
heartbeatTimeout,
resourceID,
heartbeatListener,
- new DirectExecutorService(),
+ Executors.directExecutor(),
new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)),
LOG);
@@ -224,7 +223,7 @@ public class HeartbeatManagerTest extends TestLogger {
heartbeatTimeout,
resourceID2,
heartbeatListener2,
- new DirectExecutorService(),
+ Executors.directExecutor(),
new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)),
LOG);
@@ -264,7 +263,7 @@ public class HeartbeatManagerTest extends TestLogger {
heartbeatTimeout,
resourceID,
heartbeatListener,
- new DirectExecutorService(),
+ Executors.directExecutor(),
new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)),
LOG);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index a593bd5..8dc4b44 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -91,7 +91,6 @@ import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.DirectExecutorService;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -137,6 +136,7 @@ import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
@@ -423,7 +423,7 @@ public class StreamTaskTest extends TestLogger {
Whitebox.setInternalState(streamTask, "lock", new Object());
Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
- Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", new DirectExecutorService());
+ Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", newDirectExecutorService());
Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
Whitebox.setInternalState(streamTask, "checkpointStorage", new MemoryBackendCheckpointStorage(new JobID(), null, null, Integer.MAX_VALUE));