You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/07/05 08:40:24 UTC
[flink] 04/04: [FLINK-12889] Set FatalExitExceptionHandler for
StreamTask#asyncOperationsThreadPool
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 28ac5ef7f1769e9256b3089ed4853736771ae457
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Jul 2 14:57:21 2019 +0200
[FLINK-12889] Set FatalExitExceptionHandler for StreamTask#asyncOperationsThreadPool
In order to avoid the swallowing of uncaught exceptions in asynchronous checkpoint operations,
this commit sets the FatalExitExceptionHandler for the StreamTask#asyncOperationsThreadPool.
For testing purposes the uncaught exception handler was made configurable in the StreamTask.
This closes #8948.
---
.../flink/streaming/runtime/tasks/StreamTask.java | 23 ++++++-
.../runtime/tasks/ExceptionallyDoneFuture.java | 69 +++++++++++++++++++
.../streaming/runtime/tasks/StreamTaskTest.java | 77 ++++++++++++++++++++--
3 files changed, 159 insertions(+), 10 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 8c72277..2f4dd6a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -42,6 +42,8 @@ import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
@@ -171,6 +173,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
*/
protected ProcessingTimeService timerService;
+ private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
+
/** The map of user-defined accumulators of this task. */
private final Map<String, Accumulator<?, ?>> accumulatorMap;
@@ -212,20 +216,33 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
/**
* Constructor for initialization, possibly with initial state (recovery / savepoint / etc).
*
+ * @param env The task environment for this task.
+ * @param timeProvider Optionally, a specific time provider to use.
+ */
+ protected StreamTask(Environment env, @Nullable ProcessingTimeService timeProvider) {
+ this(env, timeProvider, FatalExitExceptionHandler.INSTANCE);
+ }
+
+ /**
+ * Constructor for initialization, possibly with initial state (recovery / savepoint / etc).
+ *
* <p>This constructor accepts a special {@link ProcessingTimeService}. By default (and if
* null is passes for the time provider) a {@link SystemProcessingTimeService DefaultTimerService}
* will be used.
*
* @param environment The task environment for this task.
* @param timeProvider Optionally, a specific time provider to use.
+ * @param uncaughtExceptionHandler to handle uncaught exceptions in the async operations thread pool
*/
protected StreamTask(
Environment environment,
- @Nullable ProcessingTimeService timeProvider) {
+ @Nullable ProcessingTimeService timeProvider,
+ Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
super(environment);
this.timerService = timeProvider;
+ this.uncaughtExceptionHandler = Preconditions.checkNotNull(uncaughtExceptionHandler);
this.configuration = new StreamConfig(getTaskConfiguration());
this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
this.recordWriters = createRecordWriters(configuration, environment);
@@ -332,7 +349,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// -------- Initialize ---------
LOG.debug("Initializing {}.", getName());
- asyncOperationsThreadPool = Executors.newCachedThreadPool();
+ asyncOperationsThreadPool = Executors.newCachedThreadPool(new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler));
CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();
@@ -1204,7 +1221,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
startAsyncPartNano);
owner.cancelables.registerCloseable(asyncCheckpointRunnable);
- owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
+ owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);
if (LOG.isDebugEnabled()) {
LOG.debug("{} - finished synchronous part of checkpoint {}. " +
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/ExceptionallyDoneFuture.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/ExceptionallyDoneFuture.java
new file mode 100644
index 0000000..55bfc18
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/ExceptionallyDoneFuture.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementation of a {@link RunnableFuture} which can only complete exceptionally.
+ *
+ * @param <V> type of the RunnableFuture
+ */
+class ExceptionallyDoneFuture<V> implements RunnableFuture<V> {
+
+ private final Throwable throwable;
+
+ private ExceptionallyDoneFuture(Throwable throwable) {
+ this.throwable = throwable;
+ }
+
+ @Override
+ public void run() {}
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return true;
+ }
+
+ @Override
+ public V get() throws ExecutionException {
+ throw new ExecutionException(throwable);
+ }
+
+ @Override
+ public V get(long timeout, TimeUnit unit) throws ExecutionException {
+ return get();
+ }
+
+ public static <T> ExceptionallyDoneFuture<T> of(Throwable throwable) {
+ return new ExceptionallyDoneFuture<>(throwable);
+ }
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 80f47a6..601d72b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.TestingUncaughtExceptionHandler;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.execution.CancelTaskException;
@@ -94,6 +95,7 @@ import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -320,7 +322,7 @@ public class StreamTaskTest extends TestLogger {
final Exception testException = new Exception("Test exception");
- RunningTask<MockStreamTask> task = runTask(() -> new MockStreamTask(
+ RunningTask<MockStreamTask> task = runTask(() -> createMockStreamTask(
declineDummyEnvironment,
operatorChain(
streamOperatorWithSnapshot(operatorSnapshotResult1),
@@ -345,6 +347,44 @@ public class StreamTaskTest extends TestLogger {
}
/**
+ * Tests that uncaught exceptions in the async part of a checkpoint operation are forwarded
+ * to the uncaught exception handler. See <a href="https://issues.apache.org/jira/browse/FLINK-12889">FLINK-12889</a>.
+ */
+ @Test
+ public void testUncaughtExceptionInAsynchronousCheckpointingOperation() throws Exception {
+ final RuntimeException failingCause = new RuntimeException("Test exception");
+ FailingDummyEnvironment failingDummyEnvironment = new FailingDummyEnvironment(failingCause);
+
+ // mock the returned snapshots
+ OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures(
+ ExceptionallyDoneFuture.of(failingCause),
+ DoneFuture.of(SnapshotResult.empty()),
+ DoneFuture.of(SnapshotResult.empty()),
+ DoneFuture.of(SnapshotResult.empty()));
+
+ final TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
+
+ RunningTask<MockStreamTask> task = runTask(() -> new MockStreamTask(
+ failingDummyEnvironment,
+ operatorChain(streamOperatorWithSnapshot(operatorSnapshotResult)),
+ uncaughtExceptionHandler));
+ MockStreamTask streamTask = task.streamTask;
+
+ waitTaskIsRunning(streamTask, task.invocationFuture);
+
+ streamTask.triggerCheckpoint(
+ new CheckpointMetaData(42L, 1L),
+ CheckpointOptions.forCheckpointWithDefaultLocation(),
+ false);
+
+ final Throwable uncaughtException = uncaughtExceptionHandler.waitForUncaughtException();
+ assertThat(uncaughtException, is(failingCause));
+
+ streamTask.finishInput();
+ task.waitForTaskCompletion(false);
+ }
+
+ /**
* Tests that in case of a failing AsyncCheckpointRunnable all operator snapshot results are
* cancelled and all non partitioned state handles are discarded.
*/
@@ -362,7 +402,7 @@ public class StreamTaskTest extends TestLogger {
when(operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn(failingFuture);
try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build()) {
- RunningTask<MockStreamTask> task = runTask(() -> new MockStreamTask(
+ RunningTask<MockStreamTask> task = runTask(() -> createMockStreamTask(
mockEnvironment,
operatorChain(
streamOperatorWithSnapshot(operatorSnapshotResult1),
@@ -453,7 +493,7 @@ public class StreamTaskTest extends TestLogger {
.setTaskStateManager(taskStateManager)
.build()) {
- RunningTask<MockStreamTask> task = runTask(() -> new MockStreamTask(
+ RunningTask<MockStreamTask> task = runTask(() -> createMockStreamTask(
mockEnvironment,
operatorChain(streamOperatorWithSnapshot(operatorSnapshotResult))));
@@ -535,7 +575,7 @@ public class StreamTaskTest extends TestLogger {
final AcknowledgeDummyEnvironment mockEnvironment = new AcknowledgeDummyEnvironment();
- RunningTask<MockStreamTask> task = runTask(() -> new MockStreamTask(
+ RunningTask<MockStreamTask> task = runTask(() -> createMockStreamTask(
mockEnvironment,
operatorChain(streamOperator)));
@@ -614,7 +654,7 @@ public class StreamTaskTest extends TestLogger {
.setTaskStateManager(taskStateManager)
.build()) {
- RunningTask<MockStreamTask> task = runTask(() -> new MockStreamTask(
+ RunningTask<MockStreamTask> task = runTask(() -> createMockStreamTask(
mockEnvironment,
operatorChain(statelessOperator)));
@@ -980,8 +1020,8 @@ public class StreamTaskTest extends TestLogger {
private final OperatorChain<String, AbstractStreamOperator<String>> overrideOperatorChain;
private volatile boolean inputFinished;
- MockStreamTask(Environment env, OperatorChain<String, AbstractStreamOperator<String>> operatorChain) {
- super(env, null);
+ MockStreamTask(Environment env, OperatorChain<String, AbstractStreamOperator<String>> operatorChain, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+ super(env, null, uncaughtExceptionHandler);
this.overrideOperatorChain = operatorChain;
}
@@ -1009,6 +1049,10 @@ public class StreamTaskTest extends TestLogger {
}
}
+ private static MockStreamTask createMockStreamTask(Environment env, OperatorChain<String, AbstractStreamOperator<String>> operatorChain) {
+ return new MockStreamTask(env, operatorChain, FatalExitExceptionHandler.INSTANCE);
+ }
+
/**
* Source that instantiates the operator state backend and the keyed state backend.
* The created state backends can be retrieved from the static fields to check if the
@@ -1461,4 +1505,23 @@ public class StreamTaskTest extends TestLogger {
signalRunLatch.await();
}
}
+
+ private static class FailingDummyEnvironment extends DummyEnvironment {
+
+ final RuntimeException failingCause;
+
+ private FailingDummyEnvironment(RuntimeException failingCause) {
+ this.failingCause = failingCause;
+ }
+
+ @Override
+ public void declineCheckpoint(long checkpointId, Throwable cause) {
+ throw failingCause;
+ }
+
+ @Override
+ public void failExternally(Throwable cause) {
+ throw failingCause;
+ }
+ }
}