You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/15 12:20:30 UTC

[flink] 02/04: [FLINK-13205][runtime] Make checkpoints injection ordered again (partial revert of FLINK-11458): use single threaded Task's dispatcher thread pool

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4f1837297e8f4ffae09b04ad68bd936db1d1231d
Author: Aleksey Pak <al...@ververica.com>
AuthorDate: Tue Jun 4 14:38:13 2019 +0200

    [FLINK-13205][runtime] Make checkpoints injection ordered again (partial revert of FLINK-11458): use single threaded Task's dispatcher thread pool
---
 .../BlockingCallMonitoringThreadPool.java          | 127 ---------------------
 .../org/apache/flink/runtime/taskmanager/Task.java |  36 +++---
 .../BlockingCallMonitoringThreadPoolTest.java      | 112 ------------------
 .../runtime/taskmanager/TaskAsyncCallTest.java     |  11 +-
 4 files changed, 15 insertions(+), 271 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java
deleted file mode 100644
index d0fb868..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.annotation.VisibleForTesting;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A Thread Pool used to monitor the number of in-flight calls that block and wait for another task executed
- * by the same pool in order to get unblocked. When a call (blocking or non-blocking) is submitted, the size
- * of the pool is set to {@code 1 + activeBlockingCalls}. This allows the thread pool size to follow the needs
- * of the system and to avoid any redundant idle threads consuming resources.
- */
-public class BlockingCallMonitoringThreadPool {
-
-	private static final Logger LOG = LoggerFactory.getLogger(BlockingCallMonitoringThreadPool.class);
-
-	private final AtomicInteger inFlightBlockingCallCounter = new AtomicInteger(0);
-
-	private final ThreadPoolExecutor executor;
-
-	public BlockingCallMonitoringThreadPool() {
-		this(Executors.defaultThreadFactory());
-	}
-
-	public BlockingCallMonitoringThreadPool(final ThreadFactory dispatcherThreadFactory) {
-		this.executor = new ThreadPoolExecutor(
-				1,
-				1,
-				10L,
-				TimeUnit.SECONDS,
-				new LinkedBlockingQueue<>(),
-				checkNotNull(dispatcherThreadFactory));
-	}
-
-	public CompletableFuture<?> submit(final Runnable runnable, final boolean blocking) {
-		if (blocking) {
-			return submitBlocking(runnable);
-		} else {
-			return submit(runnable);
-		}
-	}
-
-	private CompletableFuture<?> submit(final Runnable task) {
-		adjustThreadPoolSize(inFlightBlockingCallCounter.get());
-		return CompletableFuture.runAsync(task, executor);
-	}
-
-	private CompletableFuture<?> submitBlocking(final Runnable task) {
-		adjustThreadPoolSize(inFlightBlockingCallCounter.incrementAndGet());
-		return CompletableFuture.runAsync(task, executor).whenComplete(
-				(ignored, e) -> inFlightBlockingCallCounter.decrementAndGet());
-	}
-
-	private void adjustThreadPoolSize(final int activeBlockingCalls) {
-		if (activeBlockingCalls > 1) {
-			LOG.debug("There are {} active threads with blocking calls", activeBlockingCalls);
-		}
-
-		final int newPoolSize = 1 + activeBlockingCalls;
-
-		// We have to reset the core pool size because (quoted from the official docs):
-		// ``
-		// If there are more than corePoolSize but less than maximumPoolSize threads running,
-		// ** a new thread will be created ONLY IF THE QUEUE IS FULL **.
-		// ``
-
-		// ensure that regardless of whether we increase/reduce the pool size, maximum is always >= core
-		if (newPoolSize < executor.getCorePoolSize()) {
-			executor.setCorePoolSize(newPoolSize);
-			executor.setMaximumPoolSize(newPoolSize);
-		} else {
-			executor.setMaximumPoolSize(newPoolSize);
-			executor.setCorePoolSize(newPoolSize);
-		}
-	}
-
-	public void shutdown() {
-		executor.shutdown();
-	}
-
-	public boolean isShutdown() {
-		return executor.isShutdown();
-	}
-
-	public void shutdownNow() {
-		executor.shutdownNow();
-	}
-
-	@VisibleForTesting
-	int getMaximumPoolSize() {
-		return executor.getMaximumPoolSize();
-	}
-
-	@VisibleForTesting
-	int getQueueSize() {
-		return executor.getQueue().size();
-	}
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 4355821..d4e1d8a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -93,6 +93,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -257,8 +259,8 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
 	/** The observed exception, in case the task execution failed. */
 	private volatile Throwable failureCause;
 
-	/** Executor for asynchronous calls (checkpoints, etc), lazily initialized. */
-	private volatile BlockingCallMonitoringThreadPool asyncCallDispatcher;
+	/** Serial executor for asynchronous calls (checkpoints, etc), lazily initialized. */
+	private volatile ExecutorService asyncCallDispatcher;
 
 	/** Initialized from the Flink configuration. May also be set at the ExecutionConfig */
 	private long taskCancellationInterval;
@@ -789,7 +791,7 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
 
 				// stop the async dispatcher.
 				// copy dispatcher reference to stack, against concurrent release
-				final BlockingCallMonitoringThreadPool dispatcher = this.asyncCallDispatcher;
+				ExecutorService dispatcher = this.asyncCallDispatcher;
 				if (dispatcher != null && !dispatcher.isShutdown()) {
 					dispatcher.shutdownNow();
 				}
@@ -1153,8 +1155,7 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
 			};
 			executeAsyncCallRunnable(
 					runnable,
-					String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId),
-					checkpointOptions.getCheckpointType().isSynchronous());
+					String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
 		}
 		else {
 			LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
@@ -1189,8 +1190,8 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
 			};
 			executeAsyncCallRunnable(
 					runnable,
-					"Checkpoint Confirmation for " + taskNameWithSubtask,
-					false);
+					"Checkpoint Confirmation for " + taskNameWithSubtask
+			);
 		}
 		else {
 			LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask);
@@ -1201,11 +1202,10 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
 
 	/**
 	 * Utility method to dispatch an asynchronous call on the invokable.
-	 *
-	 * @param runnable The async call runnable.
+	 *  @param runnable The async call runnable.
 	 * @param callName The name of the call, for logging purposes.
 	 */
-	private void executeAsyncCallRunnable(Runnable runnable, String callName, boolean blocking) {
+	private void executeAsyncCallRunnable(Runnable runnable, String callName) {
 		// make sure the executor is initialized. lock against concurrent calls to this function
 		synchronized (this) {
 			if (executionState != ExecutionState.RUNNING) {
@@ -1213,20 +1213,12 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
 			}
 
 			// get ourselves a reference on the stack that cannot be concurrently modified
-			BlockingCallMonitoringThreadPool executor = this.asyncCallDispatcher;
+			ExecutorService executor = this.asyncCallDispatcher;
 			if (executor == null) {
 				// first time use, initialize
 				checkState(userCodeClassLoader != null, "userCodeClassLoader must not be null");
 
-				// Under normal execution, we expect that one thread will suffice, this is why we
-				// keep the core threads to 1. In the case of a synchronous savepoint, we will block
-				// the checkpointing thread, so we need an additional thread to execute the
-				// notifyCheckpointComplete() callback. Finally, we aggressively purge (potentially)
-				// idle thread so that we do not risk to have many idle thread on machines with multiple
-				// tasks on them. Either way, only one of them can execute at a time due to the
-				// checkpoint lock.
-
-				executor = new BlockingCallMonitoringThreadPool(
+				executor = Executors.newSingleThreadExecutor(
 						new DispatcherThreadFactory(
 							TASK_THREADS_GROUP,
 							"Async calls on " + taskNameWithSubtask,
@@ -1245,13 +1237,13 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
 			LOG.debug("Invoking async call {} on task {}", callName, taskNameWithSubtask);
 
 			try {
-				executor.submit(runnable, blocking);
+				executor.submit(runnable);
 			}
 			catch (RejectedExecutionException e) {
 				// may be that we are concurrently finished or canceled.
 				// if not, report that something is fishy
 				if (executionState == ExecutionState.RUNNING) {
-					throw new RuntimeException("Async call with a " + (blocking ? "" : "non-") + "blocking call was rejected, even though the task is running.", e);
+					throw new RuntimeException("Async call was rejected, even though the task is running.", e);
 				}
 			}
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java
deleted file mode 100644
index 2cc3454..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.core.testutils.OneShotLatch;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for {@link BlockingCallMonitoringThreadPool}.
- */
-public class BlockingCallMonitoringThreadPoolTest {
-
-	private final static int TIME_OUT = 30;
-
-	private final OneShotLatch latch1 = new OneShotLatch();
-	private final OneShotLatch latch2 = new OneShotLatch();
-	private BlockingCallMonitoringThreadPool blockingCallThreadPool = new BlockingCallMonitoringThreadPool();
-
-	@Before
-	public void setup() {
-		blockingCallThreadPool = new BlockingCallMonitoringThreadPool();
-		latch1.reset();
-		latch2.reset();
-	}
-
-	@After
-	public void tearDown() {
-		latch1.trigger();
-		latch2.trigger();
-		blockingCallThreadPool.shutdown();
-	}
-
-	@Test
-	public void testSubmitNonBlockingCalls() throws Exception {
-		blockingCallThreadPool.submit(() -> await(latch1), false);
-		blockingCallThreadPool.submit(() -> await(latch2), false);
-
-		assertEquals(1, blockingCallThreadPool.getMaximumPoolSize());
-		assertEquals(1, blockingCallThreadPool.getQueueSize());
-	}
-
-	@Test
-	public void testSubmitBlockingCall() throws Exception {
-		CompletableFuture<?> latch1Future = blockingCallThreadPool.submit(() -> await(latch1), true);
-		CompletableFuture<?> latch2Future = blockingCallThreadPool.submit(() -> await(latch2), false);
-
-		assertEquals(2, blockingCallThreadPool.getMaximumPoolSize());
-		assertEquals(0, blockingCallThreadPool.getQueueSize());
-
-		latch2.trigger();
-		latch2Future.get(TIME_OUT, TimeUnit.SECONDS);
-
-		assertFalse(latch1Future.isDone());
-		assertTrue(latch2Future.isDone());
-	}
-
-	@Test
-	public void testDownsizePool() throws Exception {
-		List<CompletableFuture<?>> futures = new ArrayList<>();
-
-		futures.add(blockingCallThreadPool.submit(() -> await(latch1), true));
-		futures.add(blockingCallThreadPool.submit(() -> await(latch1), true));
-		futures.add(blockingCallThreadPool.submit(() -> await(latch1), false));
-
-		assertEquals(3, blockingCallThreadPool.getMaximumPoolSize());
-
-		latch1.trigger();
-
-		for (CompletableFuture<?> future : futures) {
-			future.get(TIME_OUT, TimeUnit.SECONDS);
-		}
-
-		blockingCallThreadPool.submit(() -> await(latch1), false).get(TIME_OUT, TimeUnit.SECONDS);
-		assertEquals(1, blockingCallThreadPool.getMaximumPoolSize());
-	}
-
-	private void await(OneShotLatch latch) {
-		try {
-			latch.await();
-		} catch (InterruptedException e) {
-			throw new RuntimeException(e);
-		}
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 5e43f68..f7b366b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -62,7 +62,6 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -130,16 +129,12 @@ public class TaskAsyncCallTest extends TestLogger {
 	}
 
 	// ------------------------------------------------------------------------
-	//  Tests 
+	//  Tests
 	// ------------------------------------------------------------------------
 
 	@Test
-	@Ignore
 	public void testCheckpointCallsInOrder() throws Exception {
 
-		// test ignored because with the changes introduced by [FLINK-11667],
-		// there is not guarantee about the order in which checkpoints are executed.
-
 		Task task = createTask(CheckpointsInOrderInvokable.class);
 		try (TaskCleaner ignored = new TaskCleaner(task)) {
 			task.startTaskThread();
@@ -160,12 +155,8 @@ public class TaskAsyncCallTest extends TestLogger {
 	}
 
 	@Test
-	@Ignore
 	public void testMixedAsyncCallsInOrder() throws Exception {
 
-		// test ignored because with the changes introduced by [FLINK-11667],
-		// there is not guarantee about the order in which checkpoints are executed.
-
 		Task task = createTask(CheckpointsInOrderInvokable.class);
 		try (TaskCleaner ignored = new TaskCleaner(task)) {
 			task.startTaskThread();