You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2023/05/04 15:31:10 UTC

[flink] branch master updated: [FLINK-31995][tests] Adds shutdown check to DirectExecutorService

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d430f51d77 [FLINK-31995][tests] Adds shutdown check to DirectExecutorService
8d430f51d77 is described below

commit 8d430f51d77cf4f0d3291da6a7333f1aa9a87d22
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Thu May 4 09:56:12 2023 +0200

    [FLINK-31995][tests] Adds shutdown check to DirectExecutorService
    
    This change is added to make DirectExecutorService match the contract
    of ExecutorService more closely in terms of RejectedExecutionExceptions.
    
    Signed-off-by: Matthias Pohl <ma...@aiven.io>
---
 .../util/concurrent/DirectExecutorService.java     | 22 ++++++++++++++++++++++
 1 file changed, 22 insertions(+)

diff --git a/flink-core/src/main/java/org/apache/flink/util/concurrent/DirectExecutorService.java b/flink-core/src/main/java/org/apache/flink/util/concurrent/DirectExecutorService.java
index 559fe64a6e2..fa9aca67501 100644
--- a/flink-core/src/main/java/org/apache/flink/util/concurrent/DirectExecutorService.java
+++ b/flink-core/src/main/java/org/apache/flink/util/concurrent/DirectExecutorService.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -72,6 +73,8 @@ class DirectExecutorService implements ExecutorService {
     @Override
     @Nonnull
     public <T> Future<T> submit(@Nonnull Callable<T> task) {
+        throwRejectedExecutionExceptionIfShutdown();
+
         try {
             T result = task.call();
 
@@ -84,6 +87,8 @@ class DirectExecutorService implements ExecutorService {
     @Override
     @Nonnull
     public <T> Future<T> submit(@Nonnull Runnable task, T result) {
+        throwRejectedExecutionExceptionIfShutdown();
+
         task.run();
 
         return new CompletedFuture<>(result, null);
@@ -92,6 +97,8 @@ class DirectExecutorService implements ExecutorService {
     @Override
     @Nonnull
     public Future<?> submit(@Nonnull Runnable task) {
+        throwRejectedExecutionExceptionIfShutdown();
+
         task.run();
         return new CompletedFuture<>(null, null);
     }
@@ -99,6 +106,8 @@ class DirectExecutorService implements ExecutorService {
     @Override
     @Nonnull
     public <T> List<Future<T>> invokeAll(@Nonnull Collection<? extends Callable<T>> tasks) {
+        throwRejectedExecutionExceptionIfShutdown();
+
         ArrayList<Future<T>> result = new ArrayList<>();
 
         for (Callable<T> task : tasks) {
@@ -117,6 +126,7 @@ class DirectExecutorService implements ExecutorService {
             @Nonnull Collection<? extends Callable<T>> tasks,
             long timeout,
             @Nonnull TimeUnit unit) {
+        throwRejectedExecutionExceptionIfShutdown();
 
         long end = System.currentTimeMillis() + unit.toMillis(timeout);
         Iterator<? extends Callable<T>> iterator = tasks.iterator();
@@ -170,6 +180,8 @@ class DirectExecutorService implements ExecutorService {
     @Nonnull
     public <T> T invokeAny(@Nonnull Collection<? extends Callable<T>> tasks)
             throws ExecutionException {
+        throwRejectedExecutionExceptionIfShutdown();
+
         Exception exception = null;
 
         for (Callable<T> task : tasks) {
@@ -188,6 +200,7 @@ class DirectExecutorService implements ExecutorService {
     public <T> T invokeAny(
             @Nonnull Collection<? extends Callable<T>> tasks, long timeout, @Nonnull TimeUnit unit)
             throws ExecutionException, TimeoutException {
+        throwRejectedExecutionExceptionIfShutdown();
 
         long end = System.currentTimeMillis() + unit.toMillis(timeout);
         Exception exception = null;
@@ -214,9 +227,18 @@ class DirectExecutorService implements ExecutorService {
 
     @Override
     public void execute(@Nonnull Runnable command) {
+        throwRejectedExecutionExceptionIfShutdown();
+
         command.run();
     }
 
+    private void throwRejectedExecutionExceptionIfShutdown() {
+        if (isShutdown()) {
+            throw new RejectedExecutionException(
+                    "The ExecutorService is shut down already. No Callables can be executed.");
+        }
+    }
+
     static class CompletedFuture<V> implements Future<V> {
         private final V value;
         private final Exception exception;