You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/17 13:11:04 UTC

[flink] branch release-1.11 updated: [FLINK-18238][checkpoint] Broadcast CancelCheckpointMarker while executing checkpoint aborted by coordinator RPC

This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new e7c634f   [FLINK-18238][checkpoint] Broadcast CancelCheckpointMarker while executing checkpoint aborted by coordinator RPC
e7c634f is described below

commit e7c634f223c0a2c180ca5abe14a4661115f0afe6
Author: Yun Tang <my...@live.com>
AuthorDate: Wed Jun 17 21:10:02 2020 +0800

     [FLINK-18238][checkpoint] Broadcast CancelCheckpointMarker while executing checkpoint aborted by coordinator RPC
    
    In the case of aborting checkpoint RPC from CheckpointCoordinator, it will prevent executing the
    respective checkpoint which was already triggered before. But we also need to broadcast the
    CancelCheckpointMarker before exiting the execution , otherwise the downstream side would
    probably wait for barrier alignment until deadlock.
    
    This closes #12664.
---
 .../tasks/SubtaskCheckpointCoordinatorImpl.java    |  4 +-
 .../tasks/SubtaskCheckpointCoordinatorTest.java    | 75 ++++++++++++++++++++++
 2 files changed, 78 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index b2fe147..0d6d638 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -235,9 +235,11 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
 			return;
 		}
 
-		// Step (0): Record the last triggered checkpointId.
+		// Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint if necessary.
 		lastCheckpointId = metadata.getCheckpointId();
 		if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
+			// broadcast cancel checkpoint marker to avoid downstream back-pressure due to checkpoint barrier align.
+			operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));
 			LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId());
 			return;
 		}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
index f7655ea..00ef9a1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.MetricGroup;
@@ -28,7 +31,11 @@ import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl;
 import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -39,12 +46,15 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.TestCheckpointStorageWorkerView;
 import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask;
 import org.apache.flink.streaming.util.MockStreamTaskBuilder;
@@ -53,6 +63,8 @@ import org.apache.flink.util.ExceptionUtils;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -219,6 +231,69 @@ public class SubtaskCheckpointCoordinatorTest {
 	}
 
 	@Test
+	public void testBroadcastCancelCheckpointMarkerOnAbortingFromCoordinator() throws Exception {
+		OneInputStreamTaskTestHarness<String, String> testHarness =
+			new OneInputStreamTaskTestHarness<>(
+				OneInputStreamTask::new,
+				1,
+				1,
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setupOutputForSingletonOperatorChain();
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		streamConfig.setStreamOperator(new MapOperator());
+
+		testHarness.invoke();
+		testHarness.waitForTaskRunning();
+
+		MockEnvironment mockEnvironment = MockEnvironment.builder().build();
+		SubtaskCheckpointCoordinator subtaskCheckpointCoordinator = new MockSubtaskCheckpointCoordinatorBuilder()
+			.setEnvironment(mockEnvironment)
+			.build();
+
+		TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(1, 4096);
+		ArrayList<Object> recordOrEvents = new ArrayList<>();
+		StreamElementSerializer<String> stringStreamElementSerializer = new StreamElementSerializer<>(StringSerializer.INSTANCE);
+		ResultPartitionWriter resultPartitionWriter = new RecordOrEventCollectingResultPartitionWriter<>(
+			recordOrEvents, bufferProvider, stringStreamElementSerializer);
+		mockEnvironment.addOutputs(Collections.singletonList(resultPartitionWriter));
+
+		OneInputStreamTask<String, String> task = testHarness.getTask();
+		OperatorChain<String, OneInputStreamOperator<String, String>> operatorChain = new OperatorChain<>(
+			task, StreamTask.createRecordWriterDelegate(streamConfig, mockEnvironment));
+		long checkpointId = 42L;
+		// notify checkpoint aborted before execution.
+		subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true);
+		subtaskCheckpointCoordinator.checkpointState(
+			new CheckpointMetaData(checkpointId, System.currentTimeMillis()),
+			CheckpointOptions.forCheckpointWithDefaultLocation(),
+			new CheckpointMetrics(),
+			operatorChain,
+			() -> true);
+
+		assertEquals(1, recordOrEvents.size());
+		Object recordOrEvent = recordOrEvents.get(0);
+		// ensure CancelCheckpointMarker is broadcast downstream.
+		assertTrue(recordOrEvent instanceof CancelCheckpointMarker);
+		assertEquals(checkpointId, ((CancelCheckpointMarker) recordOrEvent).getCheckpointId());
+		testHarness.endInput();
+		testHarness.waitForTaskCompletion();
+	}
+
+	private static class MapOperator extends StreamMap<String, String> {
+		private static final long serialVersionUID = 1L;
+
+		public MapOperator() {
+			super((MapFunction<String, String>) value -> value);
+		}
+
+		@Override
+		public void notifyCheckpointAborted(long checkpointId) throws Exception {
+		}
+	}
+
+	@Test
 	public void testNotifyCheckpointAbortedDuringAsyncPhase() throws Exception {
 		MockEnvironment mockEnvironment = MockEnvironment.builder().build();
 		SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = (SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder()