You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/06/05 05:07:53 UTC
[flink] branch release-1.11 updated: [FLINK-18074][checkpoint]
Ensure task could fail when exception thrown out on notified of checkpoint
completed/aborted
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 29f9918 [FLINK-18074][checkpoint] Ensure task could fail when exception thrown out on notified of checkpoint completed/aborted
29f9918 is described below
commit 29f99180464e4dd1aea9e7b28a2616dda53adb04
Author: Yun Tang <my...@live.com>
AuthorDate: Wed Jun 3 23:49:45 2020 +0800
[FLINK-18074][checkpoint] Ensure task could fail when exception thrown out on notified of checkpoint completed/aborted
---
.../flink/streaming/runtime/tasks/StreamTask.java | 38 ++++++++++++-----
.../streaming/runtime/tasks/StreamTaskTest.java | 48 ++++++++++++++++++++++
2 files changed, 76 insertions(+), 10 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 7ebb9e5..d9add56 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -76,6 +76,7 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
@@ -923,9 +924,33 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
@Override
public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
- return mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).submit(
- () -> notifyCheckpointComplete(checkpointId),
- "checkpoint %d complete", checkpointId);
+ return notifyCheckpointOperation(
+ () -> notifyCheckpointComplete(checkpointId),
+ String.format("checkpoint %d complete", checkpointId));
+ }
+
+ @Override
+ public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
+ return notifyCheckpointOperation(
+ () -> subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, this::isRunning),
+ String.format("checkpoint %d aborted", checkpointId));
+ }
+
+ private Future<Void> notifyCheckpointOperation(RunnableWithException runnable, String description) {
+ CompletableFuture<Void> result = new CompletableFuture<>();
+ mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).execute(
+ () -> {
+ try {
+ runnable.run();
+ }
+ catch (Exception ex) {
+ result.completeExceptionally(ex);
+ throw ex;
+ }
+ result.complete(null);
+ },
+ description);
+ return result;
}
private void notifyCheckpointComplete(long checkpointId) throws Exception {
@@ -937,13 +962,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
}
- @Override
- public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
- return mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).submit(
- () -> subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, this::isRunning),
- "checkpoint %d aborted", checkpointId);
- }
-
private void tryShutdownTimerService() {
if (!timerService.isTerminated()) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index e46dc0b..023f801 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -148,6 +148,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning;
@@ -969,6 +970,34 @@ public class StreamTaskTest extends TestLogger {
assertEquals(true, operator.closed.get());
}
+ @Test
+ public void testFailToConfirmCheckpointCompleted() throws Exception {
+ testFailToConfirmCheckpointMessage(streamTask -> streamTask.notifyCheckpointCompleteAsync(1L));
+ }
+
+ @Test
+ public void testFailToConfirmCheckpointAborted() throws Exception {
+ testFailToConfirmCheckpointMessage(streamTask -> streamTask.notifyCheckpointAbortAsync(1L));
+ }
+
+ private void testFailToConfirmCheckpointMessage(Consumer<StreamTask<?, ?>> consumer) throws Exception {
+ FailOnNotifyCheckpointOperator<Integer> operator = new FailOnNotifyCheckpointOperator<>();
+ MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
+ new MultipleInputStreamTaskTestHarnessBuilder<>(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+ .addInput(BasicTypeInfo.INT_TYPE_INFO);
+ StreamTaskMailboxTestHarness<Integer> harness = builder
+ .setupOutputForSingletonOperatorChain(operator)
+ .build();
+
+ try {
+ consumer.accept(harness.streamTask);
+ harness.streamTask.runMailboxStep();
+ fail();
+ } catch (ExpectedTestException expected) {
+ // expected exception
+ }
+ }
+
/**
* Tests that checkpoints are declined if operators are (partially) closed.
*
@@ -2017,4 +2046,23 @@ public class StreamTaskTest extends TestLogger {
public void processElement(StreamRecord<T> element) throws Exception {
}
}
+
+ private static class FailOnNotifyCheckpointOperator<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ super.notifyCheckpointComplete(checkpointId);
+ throw new ExpectedTestException();
+ }
+
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) throws Exception {
+ super.notifyCheckpointAborted(checkpointId);
+ throw new ExpectedTestException();
+ }
+
+ @Override
+ public void processElement(StreamRecord<T> element) throws Exception {
+
+ }
+ }
}