You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/06/05 05:07:53 UTC

[flink] branch release-1.11 updated: [FLINK-18074][checkpoint] Ensure task could fail when exception thrown out on notified of checkpoint completed/aborted

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 29f9918  [FLINK-18074][checkpoint] Ensure task could fail when exception thrown out on notified of checkpoint completed/aborted
29f9918 is described below

commit 29f99180464e4dd1aea9e7b28a2616dda53adb04
Author: Yun Tang <my...@live.com>
AuthorDate: Wed Jun 3 23:49:45 2020 +0800

    [FLINK-18074][checkpoint] Ensure task could fail when exception thrown out on notified of checkpoint completed/aborted
---
 .../flink/streaming/runtime/tasks/StreamTask.java  | 38 ++++++++++++-----
 .../streaming/runtime/tasks/StreamTaskTest.java    | 48 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 10 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 7ebb9e5..d9add56 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -76,6 +76,7 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.function.RunnableWithException;
 import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.slf4j.Logger;
@@ -923,9 +924,33 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 	@Override
 	public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
-		return mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).submit(
-				() -> notifyCheckpointComplete(checkpointId),
-				"checkpoint %d complete", checkpointId);
+		return notifyCheckpointOperation(
+			() -> notifyCheckpointComplete(checkpointId),
+			String.format("checkpoint %d complete", checkpointId));
+	}
+
+	@Override
+	public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
+		return notifyCheckpointOperation(
+			() -> subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, this::isRunning),
+			String.format("checkpoint %d aborted", checkpointId));
+	}
+
+	private Future<Void> notifyCheckpointOperation(RunnableWithException runnable, String description) {
+		CompletableFuture<Void> result = new CompletableFuture<>();
+		mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).execute(
+			() -> {
+				try {
+					runnable.run();
+				}
+				catch (Exception ex) {
+					result.completeExceptionally(ex);
+					throw ex;
+				}
+				result.complete(null);
+			},
+			description);
+		return result;
 	}
 
 	private void notifyCheckpointComplete(long checkpointId) throws Exception {
@@ -937,13 +962,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		}
 	}
 
-	@Override
-	public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
-		return mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).submit(
-			() -> subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, this::isRunning),
-			"checkpoint %d aborted", checkpointId);
-	}
-
 	private void tryShutdownTimerService() {
 
 		if (!timerService.isTerminated()) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index e46dc0b..023f801 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -148,6 +148,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
 import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning;
@@ -969,6 +970,34 @@ public class StreamTaskTest extends TestLogger {
 		assertEquals(true, operator.closed.get());
 	}
 
+	@Test
+	public void testFailToConfirmCheckpointCompleted() throws Exception {
+		testFailToConfirmCheckpointMessage(streamTask -> streamTask.notifyCheckpointCompleteAsync(1L));
+	}
+
+	@Test
+	public void testFailToConfirmCheckpointAborted() throws Exception {
+		testFailToConfirmCheckpointMessage(streamTask -> streamTask.notifyCheckpointAbortAsync(1L));
+	}
+
+	private void testFailToConfirmCheckpointMessage(Consumer<StreamTask<?, ?>> consumer) throws Exception {
+		FailOnNotifyCheckpointOperator<Integer> operator = new FailOnNotifyCheckpointOperator<>();
+		MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
+			new MultipleInputStreamTaskTestHarnessBuilder<>(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+				.addInput(BasicTypeInfo.INT_TYPE_INFO);
+		StreamTaskMailboxTestHarness<Integer> harness = builder
+			.setupOutputForSingletonOperatorChain(operator)
+			.build();
+
+		try {
+			consumer.accept(harness.streamTask);
+			harness.streamTask.runMailboxStep();
+			fail();
+		} catch (ExpectedTestException expected) {
+			// expected exception
+		}
+	}
+
 	/**
 	 * Tests that checkpoints are declined if operators are (partially) closed.
 	 *
@@ -2017,4 +2046,23 @@ public class StreamTaskTest extends TestLogger {
 		public void processElement(StreamRecord<T> element) throws Exception {
 		}
 	}
+
+	private static class FailOnNotifyCheckpointOperator<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			super.notifyCheckpointComplete(checkpointId);
+			throw new ExpectedTestException();
+		}
+
+		@Override
+		public void notifyCheckpointAborted(long checkpointId) throws Exception {
+			super.notifyCheckpointAborted(checkpointId);
+			throw new ExpectedTestException();
+		}
+
+		@Override
+		public void processElement(StreamRecord<T> element) throws Exception {
+
+		}
+	}
 }