You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/07/05 08:40:24 UTC

[flink] 04/04: [FLINK-12889] Set FatalExitExceptionHandler for StreamTask#asyncOperationsThreadPool

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 28ac5ef7f1769e9256b3089ed4853736771ae457
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Jul 2 14:57:21 2019 +0200

    [FLINK-12889] Set FatalExitExceptionHandler for StreamTask#asyncOperationsThreadPool
    
    In order to avoid the swallowing of uncaught exceptions in asynchronous checkpoint operations,
    this commit sets the FatalExitExceptionHandler for the StreamTask#asyncOperationsThreadPool.
    
    For testing purposes the uncaught exception handler was made configurable in the StreamTask.
    
    This closes #8948.
---
 .../flink/streaming/runtime/tasks/StreamTask.java  | 23 ++++++-
 .../runtime/tasks/ExceptionallyDoneFuture.java     | 69 +++++++++++++++++++
 .../streaming/runtime/tasks/StreamTaskTest.java    | 77 ++++++++++++++++++++--
 3 files changed, 159 insertions(+), 10 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 8c72277..2f4dd6a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -42,6 +42,8 @@ import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StateBackendLoader;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
@@ -171,6 +173,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	 */
 	protected ProcessingTimeService timerService;
 
+	private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
+
 	/** The map of user-defined accumulators of this task. */
 	private final Map<String, Accumulator<?, ?>> accumulatorMap;
 
@@ -212,20 +216,33 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	/**
 	 * Constructor for initialization, possibly with initial state (recovery / savepoint / etc).
 	 *
+	 * @param env The task environment for this task.
+	 * @param timeProvider Optionally, a specific time provider to use.
+	 */
+	protected StreamTask(Environment env, @Nullable ProcessingTimeService timeProvider) {
+		this(env, timeProvider, FatalExitExceptionHandler.INSTANCE);
+	}
+
+	/**
+	 * Constructor for initialization, possibly with initial state (recovery / savepoint / etc).
+	 *
 	 * <p>This constructor accepts a special {@link ProcessingTimeService}. By default (and if
 	 * null is passes for the time provider) a {@link SystemProcessingTimeService DefaultTimerService}
 	 * will be used.
 	 *
 	 * @param environment The task environment for this task.
 	 * @param timeProvider Optionally, a specific time provider to use.
+	 * @param uncaughtExceptionHandler to handle uncaught exceptions in the async operations thread pool
 	 */
 	protected StreamTask(
 			Environment environment,
-			@Nullable ProcessingTimeService timeProvider) {
+			@Nullable ProcessingTimeService timeProvider,
+			Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
 
 		super(environment);
 
 		this.timerService = timeProvider;
+		this.uncaughtExceptionHandler = Preconditions.checkNotNull(uncaughtExceptionHandler);
 		this.configuration = new StreamConfig(getTaskConfiguration());
 		this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
 		this.recordWriters = createRecordWriters(configuration, environment);
@@ -332,7 +349,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			// -------- Initialize ---------
 			LOG.debug("Initializing {}.", getName());
 
-			asyncOperationsThreadPool = Executors.newCachedThreadPool();
+			asyncOperationsThreadPool = Executors.newCachedThreadPool(new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler));
 
 			CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();
 
@@ -1204,7 +1221,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 					startAsyncPartNano);
 
 				owner.cancelables.registerCloseable(asyncCheckpointRunnable);
-				owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
+				owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);
 
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("{} - finished synchronous part of checkpoint {}. " +
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/ExceptionallyDoneFuture.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/ExceptionallyDoneFuture.java
new file mode 100644
index 0000000..55bfc18
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/ExceptionallyDoneFuture.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementation of a {@link RunnableFuture} which can only complete exceptionally.
+ *
+ * @param <V> type of the RunnableFuture
+ */
+class ExceptionallyDoneFuture<V> implements RunnableFuture<V> {
+
+	private final Throwable throwable;
+
+	private ExceptionallyDoneFuture(Throwable throwable) {
+		this.throwable = throwable;
+	}
+
+	@Override
+	public void run() {}
+
+	@Override
+	public boolean cancel(boolean mayInterruptIfRunning) {
+		return false;
+	}
+
+	@Override
+	public boolean isCancelled() {
+		return false;
+	}
+
+	@Override
+	public boolean isDone() {
+		return true;
+	}
+
+	@Override
+	public V get() throws ExecutionException {
+		throw new ExecutionException(throwable);
+	}
+
+	@Override
+	public V get(long timeout, TimeUnit unit) throws ExecutionException {
+		return get();
+	}
+
+	public static <T> ExceptionallyDoneFuture<T> of(Throwable throwable) {
+		return new ExceptionallyDoneFuture<>(throwable);
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 80f47a6..601d72b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.TestingUncaughtExceptionHandler;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.CancelTaskException;
@@ -94,6 +95,7 @@ import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -320,7 +322,7 @@ public class StreamTaskTest extends TestLogger {
 
 		final Exception testException = new Exception("Test exception");
 
-		RunningTask<MockStreamTask> task = runTask(() -> new MockStreamTask(
+		RunningTask<MockStreamTask> task = runTask(() -> createMockStreamTask(
 			declineDummyEnvironment,
 			operatorChain(
 				streamOperatorWithSnapshot(operatorSnapshotResult1),
@@ -345,6 +347,44 @@ public class StreamTaskTest extends TestLogger {
 	}
 
 	/**
+	 * Tests that uncaught exceptions in the async part of a checkpoint operation are forwarded
+	 * to the uncaught exception handler. See <a href="https://issues.apache.org/jira/browse/FLINK-12889">FLINK-12889</a>.
+	 */
+	@Test
+	public void testUncaughtExceptionInAsynchronousCheckpointingOperation() throws Exception {
+		final RuntimeException failingCause = new RuntimeException("Test exception");
+		FailingDummyEnvironment failingDummyEnvironment = new FailingDummyEnvironment(failingCause);
+
+		// mock the returned snapshots
+		OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures(
+			ExceptionallyDoneFuture.of(failingCause),
+			DoneFuture.of(SnapshotResult.empty()),
+			DoneFuture.of(SnapshotResult.empty()),
+			DoneFuture.of(SnapshotResult.empty()));
+
+		final TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
+
+		RunningTask<MockStreamTask> task = runTask(() -> new MockStreamTask(
+			failingDummyEnvironment,
+			operatorChain(streamOperatorWithSnapshot(operatorSnapshotResult)),
+			uncaughtExceptionHandler));
+		MockStreamTask streamTask = task.streamTask;
+
+		waitTaskIsRunning(streamTask, task.invocationFuture);
+
+		streamTask.triggerCheckpoint(
+			new CheckpointMetaData(42L, 1L),
+			CheckpointOptions.forCheckpointWithDefaultLocation(),
+			false);
+
+		final Throwable uncaughtException = uncaughtExceptionHandler.waitForUncaughtException();
+		assertThat(uncaughtException, is(failingCause));
+
+		streamTask.finishInput();
+		task.waitForTaskCompletion(false);
+	}
+
+	/**
 	 * Tests that in case of a failing AsyncCheckpointRunnable all operator snapshot results are
 	 * cancelled and all non partitioned state handles are discarded.
 	 */
@@ -362,7 +402,7 @@ public class StreamTaskTest extends TestLogger {
 		when(operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn(failingFuture);
 
 		try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build()) {
-			RunningTask<MockStreamTask> task = runTask(() -> new MockStreamTask(
+			RunningTask<MockStreamTask> task = runTask(() -> createMockStreamTask(
 				mockEnvironment,
 				operatorChain(
 					streamOperatorWithSnapshot(operatorSnapshotResult1),
@@ -453,7 +493,7 @@ public class StreamTaskTest extends TestLogger {
 			.setTaskStateManager(taskStateManager)
 			.build()) {
 
-			RunningTask<MockStreamTask> task = runTask(() -> new MockStreamTask(
+			RunningTask<MockStreamTask> task = runTask(() -> createMockStreamTask(
 				mockEnvironment,
 				operatorChain(streamOperatorWithSnapshot(operatorSnapshotResult))));
 
@@ -535,7 +575,7 @@ public class StreamTaskTest extends TestLogger {
 
 		final AcknowledgeDummyEnvironment mockEnvironment = new AcknowledgeDummyEnvironment();
 
-		RunningTask<MockStreamTask> task = runTask(() -> new MockStreamTask(
+		RunningTask<MockStreamTask> task = runTask(() -> createMockStreamTask(
 			mockEnvironment,
 			operatorChain(streamOperator)));
 
@@ -614,7 +654,7 @@ public class StreamTaskTest extends TestLogger {
 			.setTaskStateManager(taskStateManager)
 			.build()) {
 
-			RunningTask<MockStreamTask> task = runTask(() -> new MockStreamTask(
+			RunningTask<MockStreamTask> task = runTask(() -> createMockStreamTask(
 				mockEnvironment,
 				operatorChain(statelessOperator)));
 
@@ -980,8 +1020,8 @@ public class StreamTaskTest extends TestLogger {
 		private final OperatorChain<String, AbstractStreamOperator<String>> overrideOperatorChain;
 		private volatile boolean inputFinished;
 
-		MockStreamTask(Environment env, OperatorChain<String, AbstractStreamOperator<String>> operatorChain) {
-			super(env, null);
+		MockStreamTask(Environment env, OperatorChain<String, AbstractStreamOperator<String>> operatorChain, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+			super(env, null, uncaughtExceptionHandler);
 			this.overrideOperatorChain = operatorChain;
 		}
 
@@ -1009,6 +1049,10 @@ public class StreamTaskTest extends TestLogger {
 		}
 	}
 
+	private static MockStreamTask createMockStreamTask(Environment env, OperatorChain<String, AbstractStreamOperator<String>> operatorChain) {
+		return new MockStreamTask(env, operatorChain, FatalExitExceptionHandler.INSTANCE);
+	}
+
 	/**
 	 * Source that instantiates the operator state backend and the keyed state backend.
 	 * The created state backends can be retrieved from the static fields to check if the
@@ -1461,4 +1505,23 @@ public class StreamTaskTest extends TestLogger {
 			signalRunLatch.await();
 		}
 	}
+
+	private static class FailingDummyEnvironment extends DummyEnvironment {
+
+		final RuntimeException failingCause;
+
+		private FailingDummyEnvironment(RuntimeException failingCause) {
+			this.failingCause = failingCause;
+		}
+
+		@Override
+		public void declineCheckpoint(long checkpointId, Throwable cause) {
+			throw failingCause;
+		}
+
+		@Override
+		public void failExternally(Throwable cause) {
+			throw failingCause;
+		}
+	}
 }