You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/17 06:18:00 UTC

[2/5] flink git commit: [FLINK-6284] Correct sorting of completed checkpoints in ZooKeeperStateHandleStore

[FLINK-6284] Correct sorting of completed checkpoints in ZooKeeperStateHandleStore

In order to store completed checkpoints in an increasing order in ZooKeeper,
the paths for the completed checkpoint is no generated by
String.format("/%019d", checkpointId) instead of String.format("/%s", checkpointId).
This makes sure that the converted long will always have the same length with
leading 0s.

Fix failing ZooKeeperCompletedCheckpointStoreITCase

This closes #3884.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/827d74e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/827d74e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/827d74e6

Branch: refs/heads/release-1.3
Commit: 827d74e69386cff87576972c1b69a16b92b730ae
Parents: 9c6c965
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri May 12 14:23:37 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed May 17 08:16:54 2017 +0200

----------------------------------------------------------------------
 .../ZooKeeperCompletedCheckpointStore.java      |  6 ++--
 ...ZooKeeperCompletedCheckpointStoreITCase.java | 34 +++++++++++++++++---
 2 files changed, 33 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/827d74e6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index c8c68bc..95cfb0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -425,8 +425,8 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 	 * @param checkpointId to convert to the path
 	 * @return Path created from the given checkpoint id
 	 */
-	protected static String checkpointIdToPath(long checkpointId) {
-		return String.format("/%s", checkpointId);
+	public static String checkpointIdToPath(long checkpointId) {
+		return String.format("/%019d", checkpointId);
 	}
 
 	/**
@@ -435,7 +435,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 	 * @param path in ZooKeeper
 	 * @return Checkpoint id parsed from the path
 	 */
-	protected static long pathToCheckpointId(String path) {
+	public static long pathToCheckpointId(String path) {
 		try {
 			String numberString;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/827d74e6/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 73e0ed9..3fd7f1b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -137,11 +137,11 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 
 		store.addCheckpoint(checkpoint);
 		assertEquals(1, store.getNumberOfRetainedCheckpoints());
-		assertNotNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID()));
+		assertNotNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
 		store.shutdown(JobStatus.FINISHED);
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
-		assertNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID()));
+		assertNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
 		store.recover();
 
@@ -161,12 +161,12 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 
 		store.addCheckpoint(checkpoint);
 		assertEquals(1, store.getNumberOfRetainedCheckpoints());
-		assertNotNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID()));
+		assertNotNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
 		store.shutdown(JobStatus.SUSPENDED);
 
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
-		assertNotNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID()));
+		assertNotNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
 		// Recover again
 		store.recover();
@@ -175,6 +175,32 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		assertEquals(checkpoint, recovered);
 	}
 
+	/**
+	 * FLINK-6284
+	 *
+	 * Tests that the latest recovered checkpoint is the one with the highest checkpoint id
+	 */
+	@Test
+	public void testLatestCheckpointRecovery() throws Exception {
+		final int numCheckpoints = 3;
+		AbstractCompletedCheckpointStore checkpointStore = createCompletedCheckpoints(numCheckpoints);
+		List<CompletedCheckpoint> checkpoints = new ArrayList<>(numCheckpoints);
+
+		checkpoints.add(createCheckpoint(9));
+		checkpoints.add(createCheckpoint(10));
+		checkpoints.add(createCheckpoint(11));
+
+		for (CompletedCheckpoint checkpoint : checkpoints) {
+			checkpointStore.addCheckpoint(checkpoint);
+		}
+
+		checkpointStore.recover();
+
+		CompletedCheckpoint latestCheckpoint = checkpointStore.getLatestCheckpoint();
+
+		assertEquals(checkpoints.get(checkpoints.size() -1), latestCheckpoint);
+	}
+
 	static class HeapRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> {
 
 		private static final long serialVersionUID = -268548467968932L;