You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/06/22 15:57:19 UTC

[flink] branch release-1.11 updated (505df8d -> 6b4d170)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 505df8d  [FLINK-18390][docs-zh] Translate "JSON Format" page into Chinese
     new 078fb71  [hotfix][config] Remove CheckpointConfig#enableUnalignedCheckpoints without parameters.
     new 6b4d170  [FLINK-18403][checkpointing] Ensure that unaligned checkpointing is only activated for EXACTLY_ONCE.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../tests/StatefulStreamJobUpgradeTestProgram.java |  2 +-
 .../api/environment/CheckpointConfig.java          | 15 ----------
 .../api/graph/StreamingJobGraphGenerator.java      |  7 ++++-
 .../streaming/runtime/io/InputProcessorUtil.java   |  4 +++
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 32 ++++++++++++++++++++++
 .../checkpointing/UnalignedCheckpointITCase.java   |  2 +-
 6 files changed, 44 insertions(+), 18 deletions(-)


[flink] 01/02: [hotfix][config] Remove CheckpointConfig#enableUnalignedCheckpoints without parameters.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 078fb716b1ffea7d7143bf68437b9406cb2fb56c
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Mon Jun 22 11:41:22 2020 +0200

    [hotfix][config] Remove CheckpointConfig#enableUnalignedCheckpoints without parameters.
    
    CheckpointConfig#enableUnalignedCheckpoints(boolean) makes it explicit and also is more future-proof. When unaligned checkpoints become the default, this method will be mostly useless and we would need to add a #disableUnalignedCheckpoints() for consistency.
---
 .../tests/StatefulStreamJobUpgradeTestProgram.java        |  2 +-
 .../flink/streaming/api/environment/CheckpointConfig.java | 15 ---------------
 .../test/checkpointing/UnalignedCheckpointITCase.java     |  2 +-
 3 files changed, 2 insertions(+), 17 deletions(-)

diff --git a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
index aa94578..df7321c 100644
--- a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
+++ b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
@@ -74,7 +74,7 @@ public class StatefulStreamJobUpgradeTestProgram {
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		setupEnvironment(env, pt);
-		env.getCheckpointConfig().enableUnalignedCheckpoints();
+		env.getCheckpointConfig().enableUnalignedCheckpoints(true);
 
 		if (isOriginalJobVariant(pt)) {
 			executeOriginalVariant(env, pt);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index e3fe262..7d23a05 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -399,21 +399,6 @@ public class CheckpointConfig implements java.io.Serializable {
 	}
 
 	/**
-	 * Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure.
-	 *
-	 * <p>Unaligned checkpoints contain data stored in buffers as part of the checkpoint state, which allows
-	 * checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes independent of the
-	 * current throughput as checkpoint barriers are effectively not embedded into the stream of data anymore.
-	 *
-	 * <p>Unaligned checkpoints can only be enabled if {@link #checkpointingMode} is
-	 * {@link CheckpointingMode#EXACTLY_ONCE}.
-	 */
-	@PublicEvolving
-	public void enableUnalignedCheckpoints() {
-		enableUnalignedCheckpoints(true);
-	}
-
-	/**
 	 * Returns whether checkpoints should be persisted externally.
 	 *
 	 * @return <code>true</code> if checkpoints should be externalized.
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
index 6211077..9102bd9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
@@ -182,7 +182,7 @@ public class UnalignedCheckpointITCase extends TestLogger {
 		env.enableCheckpointing(100);
 		// keep in sync with FailingMapper in #createDAG
 		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(100)));
-		env.getCheckpointConfig().enableUnalignedCheckpoints();
+		env.getCheckpointConfig().enableUnalignedCheckpoints(true);
 		return env;
 	}
 


[flink] 02/02: [FLINK-18403][checkpointing] Ensure that unaligned checkpointing is only activated for EXACTLY_ONCE.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6b4d170b7872b1fe3451ebe7b675a12f3343a82a
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Mon Jun 22 11:51:42 2020 +0200

    [FLINK-18403][checkpointing] Ensure that unaligned checkpointing is only activated for EXACTLY_ONCE.
---
 .../api/graph/StreamingJobGraphGenerator.java      |  7 ++++-
 .../streaming/runtime/io/InputProcessorUtil.java   |  4 +++
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 32 ++++++++++++++++++++++
 3 files changed, 42 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 9022eaa..0e0b4b6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -224,6 +224,11 @@ public class StreamingJobGraphGenerator {
 				}
 			}
 		}
+
+		if (checkpointConfig.isUnalignedCheckpointsEnabled() && getCheckpointingMode(checkpointConfig) != CheckpointingMode.EXACTLY_ONCE) {
+			LOG.warn("Unaligned checkpoints can only be used with checkpointing mode EXACTLY_ONCE");
+			checkpointConfig.enableUnalignedCheckpoints(false);
+		}
 	}
 
 	private void setPhysicalEdges() {
@@ -500,8 +505,8 @@ public class StreamingJobGraphGenerator {
 
 		config.setStateBackend(streamGraph.getStateBackend());
 		config.setCheckpointingEnabled(checkpointCfg.isCheckpointingEnabled());
-		config.setUnalignedCheckpointsEnabled(checkpointCfg.isUnalignedCheckpointsEnabled());
 		config.setCheckpointMode(getCheckpointingMode(checkpointCfg));
+		config.setUnalignedCheckpointsEnabled(checkpointCfg.isUnalignedCheckpointsEnabled());
 
 		for (int i = 0; i < vertex.getStatePartitioners().length; i++) {
 			config.setStatePartitioner(i, vertex.getStatePartitioners()[i]);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 3ed8584..4969edc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -111,6 +111,10 @@ public class InputProcessorUtil {
 				}
 				return new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, inputGates);
 			case AT_LEAST_ONCE:
+				if (config.isUnalignedCheckpointsEnabled()) {
+					throw new IllegalStateException("Cannot use unaligned checkpoints with AT_LEAST_ONCE " +
+						"checkpointing mode");
+				}
 				int numInputChannels = Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels).sum();
 				return new CheckpointBarrierTracker(numInputChannels, toNotifyOnCheckpoint);
 			default:
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index eca6883..42edc70 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -193,6 +193,38 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 	}
 
 	@Test
+	public void testEnabledUnalignedCheckAndDisabledCheckpointing() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.fromElements(0).print();
+		StreamGraph streamGraph = env.getStreamGraph();
+		assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled());
+		env.getCheckpointConfig().enableUnalignedCheckpoints(true);
+
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+		List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
+		StreamConfig streamConfig = new StreamConfig(verticesSorted.get(0).getConfiguration());
+		assertEquals(CheckpointingMode.AT_LEAST_ONCE, streamConfig.getCheckpointMode());
+		assertFalse(streamConfig.isUnalignedCheckpointsEnabled());
+	}
+
+	@Test
+	public void testUnalignedCheckAndAtLeastOnce() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.fromElements(0).print();
+		StreamGraph streamGraph = env.getStreamGraph();
+		env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
+		env.getCheckpointConfig().enableUnalignedCheckpoints(true);
+
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+		List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
+		StreamConfig streamConfig = new StreamConfig(verticesSorted.get(0).getConfiguration());
+		assertEquals(CheckpointingMode.AT_LEAST_ONCE, streamConfig.getCheckpointMode());
+		assertFalse(streamConfig.isUnalignedCheckpointsEnabled());
+	}
+
+	@Test
 	public void generatorForwardsSavepointRestoreSettings() {
 		StreamGraph streamGraph = new StreamGraph(
 				new ExecutionConfig(),