You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2023/05/04 15:31:10 UTC
[flink] branch master updated: [FLINK-31995][tests] Adds shutdown check to DirectExecutorService
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8d430f51d77 [FLINK-31995][tests] Adds shutdown check to DirectExecutorService
8d430f51d77 is described below
commit 8d430f51d77cf4f0d3291da6a7333f1aa9a87d22
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Thu May 4 09:56:12 2023 +0200
[FLINK-31995][tests] Adds shutdown check to DirectExecutorService
This change is added to make DirectExecutorService match the contract
of ExecutorService more closely in terms of RejectedExecutionExceptions.
Signed-off-by: Matthias Pohl <ma...@aiven.io>
---
.../util/concurrent/DirectExecutorService.java | 22 ++++++++++++++++++++++
1 file changed, 22 insertions(+)
diff --git a/flink-core/src/main/java/org/apache/flink/util/concurrent/DirectExecutorService.java b/flink-core/src/main/java/org/apache/flink/util/concurrent/DirectExecutorService.java
index 559fe64a6e2..fa9aca67501 100644
--- a/flink-core/src/main/java/org/apache/flink/util/concurrent/DirectExecutorService.java
+++ b/flink-core/src/main/java/org/apache/flink/util/concurrent/DirectExecutorService.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -72,6 +73,8 @@ class DirectExecutorService implements ExecutorService {
@Override
@Nonnull
public <T> Future<T> submit(@Nonnull Callable<T> task) {
+ throwRejectedExecutionExceptionIfShutdown();
+
try {
T result = task.call();
@@ -84,6 +87,8 @@ class DirectExecutorService implements ExecutorService {
@Override
@Nonnull
public <T> Future<T> submit(@Nonnull Runnable task, T result) {
+ throwRejectedExecutionExceptionIfShutdown();
+
task.run();
return new CompletedFuture<>(result, null);
@@ -92,6 +97,8 @@ class DirectExecutorService implements ExecutorService {
@Override
@Nonnull
public Future<?> submit(@Nonnull Runnable task) {
+ throwRejectedExecutionExceptionIfShutdown();
+
task.run();
return new CompletedFuture<>(null, null);
}
@@ -99,6 +106,8 @@ class DirectExecutorService implements ExecutorService {
@Override
@Nonnull
public <T> List<Future<T>> invokeAll(@Nonnull Collection<? extends Callable<T>> tasks) {
+ throwRejectedExecutionExceptionIfShutdown();
+
ArrayList<Future<T>> result = new ArrayList<>();
for (Callable<T> task : tasks) {
@@ -117,6 +126,7 @@ class DirectExecutorService implements ExecutorService {
@Nonnull Collection<? extends Callable<T>> tasks,
long timeout,
@Nonnull TimeUnit unit) {
+ throwRejectedExecutionExceptionIfShutdown();
long end = System.currentTimeMillis() + unit.toMillis(timeout);
Iterator<? extends Callable<T>> iterator = tasks.iterator();
@@ -170,6 +180,8 @@ class DirectExecutorService implements ExecutorService {
@Nonnull
public <T> T invokeAny(@Nonnull Collection<? extends Callable<T>> tasks)
throws ExecutionException {
+ throwRejectedExecutionExceptionIfShutdown();
+
Exception exception = null;
for (Callable<T> task : tasks) {
@@ -188,6 +200,7 @@ class DirectExecutorService implements ExecutorService {
public <T> T invokeAny(
@Nonnull Collection<? extends Callable<T>> tasks, long timeout, @Nonnull TimeUnit unit)
throws ExecutionException, TimeoutException {
+ throwRejectedExecutionExceptionIfShutdown();
long end = System.currentTimeMillis() + unit.toMillis(timeout);
Exception exception = null;
@@ -214,9 +227,18 @@ class DirectExecutorService implements ExecutorService {
@Override
public void execute(@Nonnull Runnable command) {
+ throwRejectedExecutionExceptionIfShutdown();
+
command.run();
}
+ private void throwRejectedExecutionExceptionIfShutdown() {
+ if (isShutdown()) {
+ throw new RejectedExecutionException(
+ "The ExecutorService is shut down already. No Callables can be executed.");
+ }
+ }
+
static class CompletedFuture<V> implements Future<V> {
private final V value;
private final Exception exception;