You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/13 08:39:46 UTC

flink git commit: [FLINK-5007] [checkpointing] Retain externalized checkpoint on suspension

Repository: flink
Updated Branches:
  refs/heads/master 57f7747bb -> 38ab7164a


[FLINK-5007] [checkpointing] Retain externalized checkpoint on suspension

Handles graceful cluster shut down (non-HA) like cancellation.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/38ab7164
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/38ab7164
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/38ab7164

Branch: refs/heads/master
Commit: 38ab7164af6ec6e011aa489f7ebe2ed1611fee5e
Parents: 57f7747
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Nov 3 15:52:24 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Dec 13 09:39:29 2016 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/checkpoint/CheckpointProperties.java   | 4 +++-
 .../flink/runtime/checkpoint/CheckpointPropertiesTest.java      | 4 ++--
 .../flink/streaming/api/environment/CheckpointConfig.java       | 5 +++--
 3 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/38ab7164/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index e4856cf..68a4998 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -264,6 +264,8 @@ public class CheckpointProperties implements Serializable {
 	 * @return Checkpoint properties for an external checkpoint.
 	 */
 	public static CheckpointProperties forExternalizedCheckpoint(boolean deleteOnCancellation) {
-		return new CheckpointProperties(false, true, true, true, deleteOnCancellation, false, true);
+		// Handle suspension like cancellation as graceful cluster shut down
+		// suspends all jobs (non-HA).
+		return new CheckpointProperties(false, true, true, true, deleteOnCancellation, false, deleteOnCancellation);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/38ab7164/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
index c996886..11bddb9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
@@ -48,7 +48,7 @@ public class CheckpointPropertiesTest {
 	 * Tests the external checkpoints properties.
 	 */
 	@Test
-	public void testPersistentCheckpointProperties() {
+	public void testExternalizedCheckpointProperties() {
 		CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(true);
 
 		assertFalse(props.forceCheckpoint());
@@ -67,7 +67,7 @@ public class CheckpointPropertiesTest {
 		assertTrue(props.discardOnJobFinished());
 		assertFalse(props.discardOnJobCancelled());
 		assertFalse(props.discardOnJobFailed());
-		assertTrue(props.discardOnJobSuspended());
+		assertFalse(props.discardOnJobSuspended());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/38ab7164/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 0a7f65e..eb7833a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -235,8 +235,9 @@ public class CheckpointConfig implements java.io.Serializable {
 	 *
 	 * <p>Externalized checkpoints write their meta data out to persistent
 	 * storage and are <strong>not</strong> automatically cleaned up when
-	 * the owning job fails (terminating with job status {@link JobStatus#FAILED}).
-	 * In this case, you have to manually clean up the checkpoint state, both
+	 * the owning job fails or is suspended (terminating with job status
+	 * {@link JobStatus#FAILED} or {@link JobStatus#SUSPENDED}). In this
+	 * case, you have to manually clean up the checkpoint state, both
 	 * the meta data and actual program state.
 	 *
 	 * <p>The {@link ExternalizedCheckpointCleanup} mode defines how an