You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/06/22 15:57:21 UTC
[flink] 02/02: [FLINK-18403][checkpointing] Ensure that unaligned
checkpointing is only activated for EXACTLY_ONCE.
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6b4d170b7872b1fe3451ebe7b675a12f3343a82a
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Mon Jun 22 11:51:42 2020 +0200
[FLINK-18403][checkpointing] Ensure that unaligned checkpointing is only activated for EXACTLY_ONCE.
---
.../api/graph/StreamingJobGraphGenerator.java | 7 ++++-
.../streaming/runtime/io/InputProcessorUtil.java | 4 +++
.../api/graph/StreamingJobGraphGeneratorTest.java | 32 ++++++++++++++++++++++
3 files changed, 42 insertions(+), 1 deletion(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 9022eaa..0e0b4b6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -224,6 +224,11 @@ public class StreamingJobGraphGenerator {
}
}
}
+
+ if (checkpointConfig.isUnalignedCheckpointsEnabled() && getCheckpointingMode(checkpointConfig) != CheckpointingMode.EXACTLY_ONCE) {
+ LOG.warn("Unaligned checkpoints can only be used with checkpointing mode EXACTLY_ONCE");
+ checkpointConfig.enableUnalignedCheckpoints(false);
+ }
}
private void setPhysicalEdges() {
@@ -500,8 +505,8 @@ public class StreamingJobGraphGenerator {
config.setStateBackend(streamGraph.getStateBackend());
config.setCheckpointingEnabled(checkpointCfg.isCheckpointingEnabled());
- config.setUnalignedCheckpointsEnabled(checkpointCfg.isUnalignedCheckpointsEnabled());
config.setCheckpointMode(getCheckpointingMode(checkpointCfg));
+ config.setUnalignedCheckpointsEnabled(checkpointCfg.isUnalignedCheckpointsEnabled());
for (int i = 0; i < vertex.getStatePartitioners().length; i++) {
config.setStatePartitioner(i, vertex.getStatePartitioners()[i]);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 3ed8584..4969edc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -111,6 +111,10 @@ public class InputProcessorUtil {
}
return new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, inputGates);
case AT_LEAST_ONCE:
+ if (config.isUnalignedCheckpointsEnabled()) {
+ throw new IllegalStateException("Cannot use unaligned checkpoints with AT_LEAST_ONCE " +
+ "checkpointing mode");
+ }
int numInputChannels = Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels).sum();
return new CheckpointBarrierTracker(numInputChannels, toNotifyOnCheckpoint);
default:
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index eca6883..42edc70 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -193,6 +193,38 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
}
@Test
+ public void testEnabledUnalignedCheckAndDisabledCheckpointing() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.fromElements(0).print();
+ StreamGraph streamGraph = env.getStreamGraph();
+ assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled());
+ env.getCheckpointConfig().enableUnalignedCheckpoints(true);
+
+ JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+ List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
+ StreamConfig streamConfig = new StreamConfig(verticesSorted.get(0).getConfiguration());
+ assertEquals(CheckpointingMode.AT_LEAST_ONCE, streamConfig.getCheckpointMode());
+ assertFalse(streamConfig.isUnalignedCheckpointsEnabled());
+ }
+
+ @Test
+ public void testUnalignedCheckAndAtLeastOnce() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.fromElements(0).print();
+ StreamGraph streamGraph = env.getStreamGraph();
+ env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
+ env.getCheckpointConfig().enableUnalignedCheckpoints(true);
+
+ JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+ List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
+ StreamConfig streamConfig = new StreamConfig(verticesSorted.get(0).getConfiguration());
+ assertEquals(CheckpointingMode.AT_LEAST_ONCE, streamConfig.getCheckpointMode());
+ assertFalse(streamConfig.isUnalignedCheckpointsEnabled());
+ }
+
+ @Test
public void generatorForwardsSavepointRestoreSettings() {
StreamGraph streamGraph = new StreamGraph(
new ExecutionConfig(),