You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/05/31 21:44:54 UTC

[flink] branch release-1.10 updated (006b8d0 -> 6f247b4)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 006b8d0  [FLINK-17844][build] Update japicmp configuration for 1.10.1
     new d4f940f  [FLINK-18035][runtime] Use fixed thread pool
     new 6f247b4  Revert "[FLINK-17558][runtime] Add Executors#newCachedThreadPool"

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/runtime/concurrent/Executors.java | 27 ----------
 .../runtime/taskexecutor/TaskManagerRunner.java    |  4 +-
 .../flink/runtime/concurrent/ExecutorsTest.java    | 63 ----------------------
 3 files changed, 2 insertions(+), 92 deletions(-)
 delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java


[flink] 02/02: Revert "[FLINK-17558][runtime] Add Executors#newCachedThreadPool"

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6f247b4400d3417795709f311c827c45b2062272
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri May 29 14:12:08 2020 +0200

    Revert "[FLINK-17558][runtime] Add Executors#newCachedThreadPool"
    
    This reverts commit 90b8455d08eda7a6a55f5cc952fa1adf3a48ff96.
---
 .../apache/flink/runtime/concurrent/Executors.java | 27 ----------
 .../flink/runtime/concurrent/ExecutorsTest.java    | 63 ----------------------
 2 files changed, 90 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
index c758752..41d9a32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -18,14 +18,8 @@
 
 package org.apache.flink.runtime.concurrent;
 
-import org.apache.flink.runtime.util.ExecutorThreadFactory;
-
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.ExecutionContext;
 
@@ -67,27 +61,6 @@ public class Executors {
 	}
 
 	/**
-	 * Returns a new cached thread pool with the desired maximum size.
-	 *
-	 * <p>This method is a variation of {@link java.util.concurrent.Executors#newFixedThreadPool(int, ThreadFactory)},
-	 * with the minimum pool size set to 0.
-	 * In that respect it is similar to {@link java.util.concurrent.Executors#newCachedThreadPool()}, but it uses a
-	 * {@link LinkedBlockingQueue} instead to allow tasks to be queued, instead of failing with an exception if the pool
-	 * is saturated.
-	 *
-	 * @see ExecutorThreadFactory
-	 * @param maxPoolSize maximum size of the thread pool
-	 * @param threadFactory thread factory to use
-	 * @return new cached thread pool
-	 */
-	public static ExecutorService newCachedThreadPool(int maxPoolSize, ThreadFactory threadFactory) {
-		return new ThreadPoolExecutor(0, maxPoolSize,
-			60L, TimeUnit.SECONDS,
-			new LinkedBlockingQueue<>(),
-			threadFactory);
-	}
-
-	/**
 	 * Direct execution context.
 	 */
 	private static class DirectExecutionContext implements ExecutionContext {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java
deleted file mode 100644
index e3be776..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.concurrent;
-
-import org.apache.flink.core.testutils.BlockerSync;
-import org.apache.flink.runtime.util.ExecutorThreadFactory;
-import org.apache.flink.testutils.executor.TestExecutorResource;
-
-import org.junit.Rule;
-import org.junit.Test;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadFactory;
-
-/**
- * Tests for {@link Executors}.
- */
-public class ExecutorsTest {
-
-	@Rule
-	public final TestExecutorResource executorResource = new TestExecutorResource(
-		() -> Executors.newCachedThreadPool(1, new ExecutorThreadFactory()));
-
-	/**
-	 * Tests that the {@link ExecutorService} returned by {@link Executors#newCachedThreadPool(int, ThreadFactory)}
-	 * allows tasks to be queued. In a prior implementation the executor used a synchronous queue, rejecting tasks with
-	 * an exception if no thread was available to process it.
-	 */
-	@Test
-	public void testNewCachedThreadPoolDoesNotRejectTasksExceedingActiveThreadCount() throws InterruptedException {
-		Executor executor = executorResource.getExecutor();
-
-		BlockerSync sync = new BlockerSync();
-		try {
-			// submit the first blocking task, which should block the single pool thread
-			executor.execute(sync::blockNonInterruptible);
-
-			// the thread is now blocked
-			sync.awaitBlocker();
-
-			// this task should not be rejected
-			executor.execute(sync::blockNonInterruptible);
-		} finally {
-			sync.releaseBlocker();
-		}
-	}
-}


[flink] 01/02: [FLINK-18035][runtime] Use fixed thread pool

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d4f940f4bf247f47b0304401b064317de981a724
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri May 29 14:11:47 2020 +0200

    [FLINK-18035][runtime] Use fixed thread pool
---
 .../java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 7f66d03..22877a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -31,7 +31,6 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.entrypoint.ClusterConfiguration;
@@ -76,6 +75,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -374,7 +374,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 			resourceID,
 			taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
 
-		final ExecutorService ioExecutor = Executors.newCachedThreadPool(
+		final ExecutorService ioExecutor = Executors.newFixedThreadPool(
 			taskManagerServicesConfiguration.getNumIoThreads(),
 			new ExecutorThreadFactory("flink-taskexecutor-io"));