You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/17 06:18:00 UTC
[2/5] flink git commit: [FLINK-6284] Correct sorting of completed
checkpoints in ZooKeeperStateHandleStore
[FLINK-6284] Correct sorting of completed checkpoints in ZooKeeperStateHandleStore
In order to store completed checkpoints in an increasing order in ZooKeeper,
the paths for the completed checkpoint is no generated by
String.format("/%019d", checkpointId) instead of String.format("/%s", checkpointId).
This makes sure that the converted long will always have the same length with
leading 0s.
Fix failing ZooKeeperCompletedCheckpointStoreITCase
This closes #3884.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/827d74e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/827d74e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/827d74e6
Branch: refs/heads/release-1.3
Commit: 827d74e69386cff87576972c1b69a16b92b730ae
Parents: 9c6c965
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri May 12 14:23:37 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed May 17 08:16:54 2017 +0200
----------------------------------------------------------------------
.../ZooKeeperCompletedCheckpointStore.java | 6 ++--
...ZooKeeperCompletedCheckpointStoreITCase.java | 34 +++++++++++++++++---
2 files changed, 33 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/827d74e6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index c8c68bc..95cfb0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -425,8 +425,8 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
* @param checkpointId to convert to the path
* @return Path created from the given checkpoint id
*/
- protected static String checkpointIdToPath(long checkpointId) {
- return String.format("/%s", checkpointId);
+ public static String checkpointIdToPath(long checkpointId) {
+ return String.format("/%019d", checkpointId);
}
/**
@@ -435,7 +435,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
* @param path in ZooKeeper
* @return Checkpoint id parsed from the path
*/
- protected static long pathToCheckpointId(String path) {
+ public static long pathToCheckpointId(String path) {
try {
String numberString;
http://git-wip-us.apache.org/repos/asf/flink/blob/827d74e6/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 73e0ed9..3fd7f1b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -137,11 +137,11 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
store.addCheckpoint(checkpoint);
assertEquals(1, store.getNumberOfRetainedCheckpoints());
- assertNotNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID()));
+ assertNotNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
store.shutdown(JobStatus.FINISHED);
assertEquals(0, store.getNumberOfRetainedCheckpoints());
- assertNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID()));
+ assertNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
store.recover();
@@ -161,12 +161,12 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
store.addCheckpoint(checkpoint);
assertEquals(1, store.getNumberOfRetainedCheckpoints());
- assertNotNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID()));
+ assertNotNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
store.shutdown(JobStatus.SUSPENDED);
assertEquals(0, store.getNumberOfRetainedCheckpoints());
- assertNotNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID()));
+ assertNotNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
// Recover again
store.recover();
@@ -175,6 +175,32 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
assertEquals(checkpoint, recovered);
}
+ /**
+ * FLINK-6284
+ *
+ * Tests that the latest recovered checkpoint is the one with the highest checkpoint id
+ */
+ @Test
+ public void testLatestCheckpointRecovery() throws Exception {
+ final int numCheckpoints = 3;
+ AbstractCompletedCheckpointStore checkpointStore = createCompletedCheckpoints(numCheckpoints);
+ List<CompletedCheckpoint> checkpoints = new ArrayList<>(numCheckpoints);
+
+ checkpoints.add(createCheckpoint(9));
+ checkpoints.add(createCheckpoint(10));
+ checkpoints.add(createCheckpoint(11));
+
+ for (CompletedCheckpoint checkpoint : checkpoints) {
+ checkpointStore.addCheckpoint(checkpoint);
+ }
+
+ checkpointStore.recover();
+
+ CompletedCheckpoint latestCheckpoint = checkpointStore.getLatestCheckpoint();
+
+ assertEquals(checkpoints.get(checkpoints.size() -1), latestCheckpoint);
+ }
+
static class HeapRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> {
private static final long serialVersionUID = -268548467968932L;