You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/05/28 15:46:40 UTC

[flink] 04/07: [FLINK-17558][runtime] Add Executors#newCachedThreadPool

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8e8dbb8dbd51fa896ed7258cedb955931bc0e03d
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 27 12:59:28 2020 +0200

    [FLINK-17558][runtime] Add Executors#newCachedThreadPool
---
 .../apache/flink/runtime/concurrent/Executors.java | 27 ++++++++++
 .../flink/runtime/concurrent/ExecutorsTest.java    | 63 ++++++++++++++++++++++
 2 files changed, 90 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
index 41d9a32..c758752 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -18,8 +18,14 @@
 
 package org.apache.flink.runtime.concurrent;
 
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.ExecutionContext;
 
@@ -61,6 +67,27 @@ public class Executors {
 	}
 
 	/**
+	 * Returns a new cached thread pool with the desired maximum size.
+	 *
+	 * <p>This method is a variation of {@link java.util.concurrent.Executors#newFixedThreadPool(int, ThreadFactory)},
+	 * with the minimum pool size set to 0.
+	 * In that respect it is similar to {@link java.util.concurrent.Executors#newCachedThreadPool()}, but it uses a
+	 * {@link LinkedBlockingQueue} instead to allow tasks to be queued, instead of failing with an exception if the pool
+	 * is saturated.
+	 *
+	 * @see ExecutorThreadFactory
+	 * @param maxPoolSize maximum size of the thread pool
+	 * @param threadFactory thread factory to use
+	 * @return new cached thread pool
+	 */
+	public static ExecutorService newCachedThreadPool(int maxPoolSize, ThreadFactory threadFactory) {
+		return new ThreadPoolExecutor(0, maxPoolSize,
+			60L, TimeUnit.SECONDS,
+			new LinkedBlockingQueue<>(),
+			threadFactory);
+	}
+
+	/**
 	 * Direct execution context.
 	 */
 	private static class DirectExecutionContext implements ExecutionContext {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java
new file mode 100644
index 0000000..e3be776
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.core.testutils.BlockerSync;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.testutils.executor.TestExecutorResource;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Tests for {@link Executors}.
+ */
+public class ExecutorsTest {
+
+	@Rule
+	public final TestExecutorResource executorResource = new TestExecutorResource(
+		() -> Executors.newCachedThreadPool(1, new ExecutorThreadFactory()));
+
+	/**
+	 * Tests that the {@link ExecutorService} returned by {@link Executors#newCachedThreadPool(int, ThreadFactory)}
+	 * allows tasks to be queued. In a prior implementation the executor used a synchronous queue, rejecting tasks with
+	 * an exception if no thread was available to process it.
+	 */
+	@Test
+	public void testNewCachedThreadPoolDoesNotRejectTasksExceedingActiveThreadCount() throws InterruptedException {
+		Executor executor = executorResource.getExecutor();
+
+		BlockerSync sync = new BlockerSync();
+		try {
+			// submit the first blocking task, which should block the single pool thread
+			executor.execute(sync::blockNonInterruptible);
+
+			// the thread is now blocked
+			sync.awaitBlocker();
+
+			// this task should not be rejected
+			executor.execute(sync::blockNonInterruptible);
+		} finally {
+			sync.releaseBlocker();
+		}
+	}
+}