You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/13 08:39:46 UTC
flink git commit: [FLINK-5007] [checkpointing] Retain externalized
checkpoint on suspension
Repository: flink
Updated Branches:
refs/heads/master 57f7747bb -> 38ab7164a
[FLINK-5007] [checkpointing] Retain externalized checkpoint on suspension
Handles graceful cluster shut down (non-HA) like cancellation.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/38ab7164
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/38ab7164
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/38ab7164
Branch: refs/heads/master
Commit: 38ab7164af6ec6e011aa489f7ebe2ed1611fee5e
Parents: 57f7747
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Nov 3 15:52:24 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Dec 13 09:39:29 2016 +0100
----------------------------------------------------------------------
.../apache/flink/runtime/checkpoint/CheckpointProperties.java | 4 +++-
.../flink/runtime/checkpoint/CheckpointPropertiesTest.java | 4 ++--
.../flink/streaming/api/environment/CheckpointConfig.java | 5 +++--
3 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/38ab7164/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index e4856cf..68a4998 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -264,6 +264,8 @@ public class CheckpointProperties implements Serializable {
* @return Checkpoint properties for an external checkpoint.
*/
public static CheckpointProperties forExternalizedCheckpoint(boolean deleteOnCancellation) {
- return new CheckpointProperties(false, true, true, true, deleteOnCancellation, false, true);
+ // Handle suspension like cancellation as graceful cluster shut down
+ // suspends all jobs (non-HA).
+ return new CheckpointProperties(false, true, true, true, deleteOnCancellation, false, deleteOnCancellation);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/38ab7164/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
index c996886..11bddb9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
@@ -48,7 +48,7 @@ public class CheckpointPropertiesTest {
* Tests the external checkpoints properties.
*/
@Test
- public void testPersistentCheckpointProperties() {
+ public void testExternalizedCheckpointProperties() {
CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(true);
assertFalse(props.forceCheckpoint());
@@ -67,7 +67,7 @@ public class CheckpointPropertiesTest {
assertTrue(props.discardOnJobFinished());
assertFalse(props.discardOnJobCancelled());
assertFalse(props.discardOnJobFailed());
- assertTrue(props.discardOnJobSuspended());
+ assertFalse(props.discardOnJobSuspended());
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/38ab7164/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 0a7f65e..eb7833a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -235,8 +235,9 @@ public class CheckpointConfig implements java.io.Serializable {
*
* <p>Externalized checkpoints write their meta data out to persistent
* storage and are <strong>not</strong> automatically cleaned up when
- * the owning job fails (terminating with job status {@link JobStatus#FAILED}).
- * In this case, you have to manually clean up the checkpoint state, both
+ * the owning job fails or is suspended (terminating with job status
+ * {@link JobStatus#FAILED} or {@link JobStatus#SUSPENDED}). In this
+ * case, you have to manually clean up the checkpoint state, both
* the meta data and actual program state.
*
* <p>The {@link ExternalizedCheckpointCleanup} mode defines how an