You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/06/22 15:57:21 UTC

[flink] 02/02: [FLINK-18403][checkpointing] Ensure that unaligned checkpointing is only activated for EXACTLY_ONCE.

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6b4d170b7872b1fe3451ebe7b675a12f3343a82a
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Mon Jun 22 11:51:42 2020 +0200

    [FLINK-18403][checkpointing] Ensure that unaligned checkpointing is only activated for EXACTLY_ONCE.
---
 .../api/graph/StreamingJobGraphGenerator.java      |  7 ++++-
 .../streaming/runtime/io/InputProcessorUtil.java   |  4 +++
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 32 ++++++++++++++++++++++
 3 files changed, 42 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 9022eaa..0e0b4b6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -224,6 +224,11 @@ public class StreamingJobGraphGenerator {
 				}
 			}
 		}
+
+		if (checkpointConfig.isUnalignedCheckpointsEnabled() && getCheckpointingMode(checkpointConfig) != CheckpointingMode.EXACTLY_ONCE) {
+			LOG.warn("Unaligned checkpoints can only be used with checkpointing mode EXACTLY_ONCE");
+			checkpointConfig.enableUnalignedCheckpoints(false);
+		}
 	}
 
 	private void setPhysicalEdges() {
@@ -500,8 +505,8 @@ public class StreamingJobGraphGenerator {
 
 		config.setStateBackend(streamGraph.getStateBackend());
 		config.setCheckpointingEnabled(checkpointCfg.isCheckpointingEnabled());
-		config.setUnalignedCheckpointsEnabled(checkpointCfg.isUnalignedCheckpointsEnabled());
 		config.setCheckpointMode(getCheckpointingMode(checkpointCfg));
+		config.setUnalignedCheckpointsEnabled(checkpointCfg.isUnalignedCheckpointsEnabled());
 
 		for (int i = 0; i < vertex.getStatePartitioners().length; i++) {
 			config.setStatePartitioner(i, vertex.getStatePartitioners()[i]);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 3ed8584..4969edc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -111,6 +111,10 @@ public class InputProcessorUtil {
 				}
 				return new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, inputGates);
 			case AT_LEAST_ONCE:
+				if (config.isUnalignedCheckpointsEnabled()) {
+					throw new IllegalStateException("Cannot use unaligned checkpoints with AT_LEAST_ONCE " +
+						"checkpointing mode");
+				}
 				int numInputChannels = Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels).sum();
 				return new CheckpointBarrierTracker(numInputChannels, toNotifyOnCheckpoint);
 			default:
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index eca6883..42edc70 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -193,6 +193,38 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 	}
 
 	@Test
+	public void testEnabledUnalignedCheckAndDisabledCheckpointing() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.fromElements(0).print();
+		StreamGraph streamGraph = env.getStreamGraph();
+		assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled());
+		env.getCheckpointConfig().enableUnalignedCheckpoints(true);
+
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+		List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
+		StreamConfig streamConfig = new StreamConfig(verticesSorted.get(0).getConfiguration());
+		assertEquals(CheckpointingMode.AT_LEAST_ONCE, streamConfig.getCheckpointMode());
+		assertFalse(streamConfig.isUnalignedCheckpointsEnabled());
+	}
+
+	@Test
+	public void testUnalignedCheckAndAtLeastOnce() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.fromElements(0).print();
+		StreamGraph streamGraph = env.getStreamGraph();
+		env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
+		env.getCheckpointConfig().enableUnalignedCheckpoints(true);
+
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+		List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
+		StreamConfig streamConfig = new StreamConfig(verticesSorted.get(0).getConfiguration());
+		assertEquals(CheckpointingMode.AT_LEAST_ONCE, streamConfig.getCheckpointMode());
+		assertFalse(streamConfig.isUnalignedCheckpointsEnabled());
+	}
+
+	@Test
 	public void generatorForwardsSavepointRestoreSettings() {
 		StreamGraph streamGraph = new StreamGraph(
 				new ExecutionConfig(),