You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/15 12:20:30 UTC
[flink] 02/04: [FLINK-13205][runtime] Make checkpoints injection
ordered again (partial revert of FLINK-11458): use single threaded Task's
dispatcher thread pool
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4f1837297e8f4ffae09b04ad68bd936db1d1231d
Author: Aleksey Pak <al...@ververica.com>
AuthorDate: Tue Jun 4 14:38:13 2019 +0200
[FLINK-13205][runtime] Make checkpoints injection ordered again (partial revert of FLINK-11458): use single threaded Task's dispatcher thread pool
---
.../BlockingCallMonitoringThreadPool.java | 127 ---------------------
.../org/apache/flink/runtime/taskmanager/Task.java | 36 +++---
.../BlockingCallMonitoringThreadPoolTest.java | 112 ------------------
.../runtime/taskmanager/TaskAsyncCallTest.java | 11 +-
4 files changed, 15 insertions(+), 271 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java
deleted file mode 100644
index d0fb868..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.annotation.VisibleForTesting;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A Thread Pool used to monitor the number of in-flight calls that block and wait for another task executed
- * by the same pool in order to get unblocked. When a call (blocking or non-blocking) is submitted, the size
- * of the pool is set to {@code 1 + activeBlockingCalls}. This allows the thread pool size to follow the needs
- * of the system and to avoid any redundant idle threads consuming resources.
- */
-public class BlockingCallMonitoringThreadPool {
-
- private static final Logger LOG = LoggerFactory.getLogger(BlockingCallMonitoringThreadPool.class);
-
- private final AtomicInteger inFlightBlockingCallCounter = new AtomicInteger(0);
-
- private final ThreadPoolExecutor executor;
-
- public BlockingCallMonitoringThreadPool() {
- this(Executors.defaultThreadFactory());
- }
-
- public BlockingCallMonitoringThreadPool(final ThreadFactory dispatcherThreadFactory) {
- this.executor = new ThreadPoolExecutor(
- 1,
- 1,
- 10L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
- checkNotNull(dispatcherThreadFactory));
- }
-
- public CompletableFuture<?> submit(final Runnable runnable, final boolean blocking) {
- if (blocking) {
- return submitBlocking(runnable);
- } else {
- return submit(runnable);
- }
- }
-
- private CompletableFuture<?> submit(final Runnable task) {
- adjustThreadPoolSize(inFlightBlockingCallCounter.get());
- return CompletableFuture.runAsync(task, executor);
- }
-
- private CompletableFuture<?> submitBlocking(final Runnable task) {
- adjustThreadPoolSize(inFlightBlockingCallCounter.incrementAndGet());
- return CompletableFuture.runAsync(task, executor).whenComplete(
- (ignored, e) -> inFlightBlockingCallCounter.decrementAndGet());
- }
-
- private void adjustThreadPoolSize(final int activeBlockingCalls) {
- if (activeBlockingCalls > 1) {
- LOG.debug("There are {} active threads with blocking calls", activeBlockingCalls);
- }
-
- final int newPoolSize = 1 + activeBlockingCalls;
-
- // We have to reset the core pool size because (quoted from the official docs):
- // ``
- // If there are more than corePoolSize but less than maximumPoolSize threads running,
- // ** a new thread will be created ONLY IF THE QUEUE IS FULL **.
- // ``
-
- // ensure that regardless of whether we increase/reduce the pool size, maximum is always >= core
- if (newPoolSize < executor.getCorePoolSize()) {
- executor.setCorePoolSize(newPoolSize);
- executor.setMaximumPoolSize(newPoolSize);
- } else {
- executor.setMaximumPoolSize(newPoolSize);
- executor.setCorePoolSize(newPoolSize);
- }
- }
-
- public void shutdown() {
- executor.shutdown();
- }
-
- public boolean isShutdown() {
- return executor.isShutdown();
- }
-
- public void shutdownNow() {
- executor.shutdownNow();
- }
-
- @VisibleForTesting
- int getMaximumPoolSize() {
- return executor.getMaximumPoolSize();
- }
-
- @VisibleForTesting
- int getQueueSize() {
- return executor.getQueue().size();
- }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 4355821..d4e1d8a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -93,6 +93,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -257,8 +259,8 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
/** The observed exception, in case the task execution failed. */
private volatile Throwable failureCause;
- /** Executor for asynchronous calls (checkpoints, etc), lazily initialized. */
- private volatile BlockingCallMonitoringThreadPool asyncCallDispatcher;
+ /** Serial executor for asynchronous calls (checkpoints, etc), lazily initialized. */
+ private volatile ExecutorService asyncCallDispatcher;
/** Initialized from the Flink configuration. May also be set at the ExecutionConfig */
private long taskCancellationInterval;
@@ -789,7 +791,7 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
// stop the async dispatcher.
// copy dispatcher reference to stack, against concurrent release
- final BlockingCallMonitoringThreadPool dispatcher = this.asyncCallDispatcher;
+ ExecutorService dispatcher = this.asyncCallDispatcher;
if (dispatcher != null && !dispatcher.isShutdown()) {
dispatcher.shutdownNow();
}
@@ -1153,8 +1155,7 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
};
executeAsyncCallRunnable(
runnable,
- String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId),
- checkpointOptions.getCheckpointType().isSynchronous());
+ String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
}
else {
LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
@@ -1189,8 +1190,8 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
};
executeAsyncCallRunnable(
runnable,
- "Checkpoint Confirmation for " + taskNameWithSubtask,
- false);
+ "Checkpoint Confirmation for " + taskNameWithSubtask
+ );
}
else {
LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask);
@@ -1201,11 +1202,10 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
/**
* Utility method to dispatch an asynchronous call on the invokable.
- *
- * @param runnable The async call runnable.
+ * @param runnable The async call runnable.
* @param callName The name of the call, for logging purposes.
*/
- private void executeAsyncCallRunnable(Runnable runnable, String callName, boolean blocking) {
+ private void executeAsyncCallRunnable(Runnable runnable, String callName) {
// make sure the executor is initialized. lock against concurrent calls to this function
synchronized (this) {
if (executionState != ExecutionState.RUNNING) {
@@ -1213,20 +1213,12 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
}
// get ourselves a reference on the stack that cannot be concurrently modified
- BlockingCallMonitoringThreadPool executor = this.asyncCallDispatcher;
+ ExecutorService executor = this.asyncCallDispatcher;
if (executor == null) {
// first time use, initialize
checkState(userCodeClassLoader != null, "userCodeClassLoader must not be null");
- // Under normal execution, we expect that one thread will suffice, this is why we
- // keep the core threads to 1. In the case of a synchronous savepoint, we will block
- // the checkpointing thread, so we need an additional thread to execute the
- // notifyCheckpointComplete() callback. Finally, we aggressively purge (potentially)
- // idle thread so that we do not risk to have many idle thread on machines with multiple
- // tasks on them. Either way, only one of them can execute at a time due to the
- // checkpoint lock.
-
- executor = new BlockingCallMonitoringThreadPool(
+ executor = Executors.newSingleThreadExecutor(
new DispatcherThreadFactory(
TASK_THREADS_GROUP,
"Async calls on " + taskNameWithSubtask,
@@ -1245,13 +1237,13 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
LOG.debug("Invoking async call {} on task {}", callName, taskNameWithSubtask);
try {
- executor.submit(runnable, blocking);
+ executor.submit(runnable);
}
catch (RejectedExecutionException e) {
// may be that we are concurrently finished or canceled.
// if not, report that something is fishy
if (executionState == ExecutionState.RUNNING) {
- throw new RuntimeException("Async call with a " + (blocking ? "" : "non-") + "blocking call was rejected, even though the task is running.", e);
+ throw new RuntimeException("Async call was rejected, even though the task is running.", e);
}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java
deleted file mode 100644
index 2cc3454..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.core.testutils.OneShotLatch;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for {@link BlockingCallMonitoringThreadPool}.
- */
-public class BlockingCallMonitoringThreadPoolTest {
-
- private final static int TIME_OUT = 30;
-
- private final OneShotLatch latch1 = new OneShotLatch();
- private final OneShotLatch latch2 = new OneShotLatch();
- private BlockingCallMonitoringThreadPool blockingCallThreadPool = new BlockingCallMonitoringThreadPool();
-
- @Before
- public void setup() {
- blockingCallThreadPool = new BlockingCallMonitoringThreadPool();
- latch1.reset();
- latch2.reset();
- }
-
- @After
- public void tearDown() {
- latch1.trigger();
- latch2.trigger();
- blockingCallThreadPool.shutdown();
- }
-
- @Test
- public void testSubmitNonBlockingCalls() throws Exception {
- blockingCallThreadPool.submit(() -> await(latch1), false);
- blockingCallThreadPool.submit(() -> await(latch2), false);
-
- assertEquals(1, blockingCallThreadPool.getMaximumPoolSize());
- assertEquals(1, blockingCallThreadPool.getQueueSize());
- }
-
- @Test
- public void testSubmitBlockingCall() throws Exception {
- CompletableFuture<?> latch1Future = blockingCallThreadPool.submit(() -> await(latch1), true);
- CompletableFuture<?> latch2Future = blockingCallThreadPool.submit(() -> await(latch2), false);
-
- assertEquals(2, blockingCallThreadPool.getMaximumPoolSize());
- assertEquals(0, blockingCallThreadPool.getQueueSize());
-
- latch2.trigger();
- latch2Future.get(TIME_OUT, TimeUnit.SECONDS);
-
- assertFalse(latch1Future.isDone());
- assertTrue(latch2Future.isDone());
- }
-
- @Test
- public void testDownsizePool() throws Exception {
- List<CompletableFuture<?>> futures = new ArrayList<>();
-
- futures.add(blockingCallThreadPool.submit(() -> await(latch1), true));
- futures.add(blockingCallThreadPool.submit(() -> await(latch1), true));
- futures.add(blockingCallThreadPool.submit(() -> await(latch1), false));
-
- assertEquals(3, blockingCallThreadPool.getMaximumPoolSize());
-
- latch1.trigger();
-
- for (CompletableFuture<?> future : futures) {
- future.get(TIME_OUT, TimeUnit.SECONDS);
- }
-
- blockingCallThreadPool.submit(() -> await(latch1), false).get(TIME_OUT, TimeUnit.SECONDS);
- assertEquals(1, blockingCallThreadPool.getMaximumPoolSize());
- }
-
- private void await(OneShotLatch latch) {
- try {
- latch.await();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 5e43f68..f7b366b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -62,7 +62,6 @@ import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
@@ -130,16 +129,12 @@ public class TaskAsyncCallTest extends TestLogger {
}
// ------------------------------------------------------------------------
- // Tests
+ // Tests
// ------------------------------------------------------------------------
@Test
- @Ignore
public void testCheckpointCallsInOrder() throws Exception {
- // test ignored because with the changes introduced by [FLINK-11667],
- // there is not guarantee about the order in which checkpoints are executed.
-
Task task = createTask(CheckpointsInOrderInvokable.class);
try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
@@ -160,12 +155,8 @@ public class TaskAsyncCallTest extends TestLogger {
}
@Test
- @Ignore
public void testMixedAsyncCallsInOrder() throws Exception {
- // test ignored because with the changes introduced by [FLINK-11667],
- // there is not guarantee about the order in which checkpoints are executed.
-
Task task = createTask(CheckpointsInOrderInvokable.class);
try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();