You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/05/28 15:46:40 UTC
[flink] 04/07: [FLINK-17558][runtime] Add
Executors#newCachedThreadPool
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8e8dbb8dbd51fa896ed7258cedb955931bc0e03d
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 27 12:59:28 2020 +0200
[FLINK-17558][runtime] Add Executors#newCachedThreadPool
---
.../apache/flink/runtime/concurrent/Executors.java | 27 ++++++++++
.../flink/runtime/concurrent/ExecutorsTest.java | 63 ++++++++++++++++++++++
2 files changed, 90 insertions(+)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
index 41d9a32..c758752 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -18,8 +18,14 @@
package org.apache.flink.runtime.concurrent;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import scala.concurrent.ExecutionContext;
@@ -61,6 +67,27 @@ public class Executors {
}
/**
+ * Returns a new cached thread pool with the desired maximum size.
+ *
+ * <p>This method is a variation of {@link java.util.concurrent.Executors#newFixedThreadPool(int, ThreadFactory)},
+ * with the minimum pool size set to 0.
+ * In that respect it is similar to {@link java.util.concurrent.Executors#newCachedThreadPool()}, but it uses a
+ * {@link LinkedBlockingQueue} instead to allow tasks to be queued, instead of failing with an exception if the pool
+ * is saturated.
+ *
+ * @see ExecutorThreadFactory
+ * @param maxPoolSize maximum size of the thread pool
+ * @param threadFactory thread factory to use
+ * @return new cached thread pool
+ */
+ public static ExecutorService newCachedThreadPool(int maxPoolSize, ThreadFactory threadFactory) {
+ return new ThreadPoolExecutor(0, maxPoolSize,
+ 60L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ threadFactory);
+ }
+
+ /**
* Direct execution context.
*/
private static class DirectExecutionContext implements ExecutionContext {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java
new file mode 100644
index 0000000..e3be776
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.core.testutils.BlockerSync;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.testutils.executor.TestExecutorResource;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Tests for {@link Executors}.
+ */
+public class ExecutorsTest {
+
+ @Rule
+ public final TestExecutorResource executorResource = new TestExecutorResource(
+ () -> Executors.newCachedThreadPool(1, new ExecutorThreadFactory()));
+
+ /**
+ * Tests that the {@link ExecutorService} returned by {@link Executors#newCachedThreadPool(int, ThreadFactory)}
+ * allows tasks to be queued. In a prior implementation the executor used a synchronous queue, rejecting tasks with
+ * an exception if no thread was available to process it.
+ */
+ @Test
+ public void testNewCachedThreadPoolDoesNotRejectTasksExceedingActiveThreadCount() throws InterruptedException {
+ Executor executor = executorResource.getExecutor();
+
+ BlockerSync sync = new BlockerSync();
+ try {
+ // submit the first blocking task, which should block the single pool thread
+ executor.execute(sync::blockNonInterruptible);
+
+ // the thread is now blocked
+ sync.awaitBlocker();
+
+ // this task should not be rejected
+ executor.execute(sync::blockNonInterruptible);
+ } finally {
+ sync.releaseBlocker();
+ }
+ }
+}