You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/05/20 13:46:05 UTC

flink git commit: [FLINK-6640] Ensure registration of shared state happens before externalizing a checkpoint

Repository: flink
Updated Branches:
  refs/heads/master b93396c5a -> 040356391


[FLINK-6640] Ensure registration of shared state happens before externalizing a checkpoint


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04035639
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04035639
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04035639

Branch: refs/heads/master
Commit: 040356391f621858199948b15160b1e382152cc1
Parents: b93396c
Author: Stefan Richter <s....@data-artisans.com>
Authored: Fri May 19 16:46:06 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Sat May 20 15:45:45 2017 +0200

----------------------------------------------------------------------
 .../AbstractCompletedCheckpointStore.java       | 43 --------------
 .../checkpoint/CheckpointCoordinator.java       | 17 +++---
 .../runtime/checkpoint/CompletedCheckpoint.java |  6 +-
 .../checkpoint/CompletedCheckpointStore.java    |  5 +-
 .../runtime/checkpoint/PendingCheckpoint.java   |  1 -
 .../StandaloneCompletedCheckpointStore.java     | 11 ++--
 .../ZooKeeperCompletedCheckpointStore.java      | 26 ++++-----
 .../runtime/state/SharedStateRegistry.java      | 16 +++++-
 .../CheckpointCoordinatorFailureTest.java       |  3 +-
 .../CompletedCheckpointStoreTest.java           | 33 +++++++----
 .../checkpoint/CompletedCheckpointTest.java     | 14 ++---
 .../StandaloneCompletedCheckpointStoreTest.java | 21 +++----
 ...ZooKeeperCompletedCheckpointStoreITCase.java | 60 ++++++++++++--------
 .../ZooKeeperCompletedCheckpointStoreTest.java  |  4 +-
 .../RecoverableCompletedCheckpointStore.java    | 13 ++---
 .../JobManagerHACheckpointRecoveryITCase.java   | 37 +++++++-----
 16 files changed, 156 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
deleted file mode 100644
index bf70501..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.flink.runtime.state.SharedStateRegistry;
-
-import java.util.concurrent.Executor;
-
-/**
- * This is the base class that provides implementation of some aspects common for all
- * {@link CompletedCheckpointStore}s.
- */
-public abstract class AbstractCompletedCheckpointStore implements CompletedCheckpointStore {
-
-	/**
-	 * Registry for shared states.
-	 */
-	protected final SharedStateRegistry sharedStateRegistry;
-
-	public AbstractCompletedCheckpointStore() {
-		this.sharedStateRegistry = new SharedStateRegistry();
-	}
-
-	public AbstractCompletedCheckpointStore(Executor asyncIOExecutor) {
-		this.sharedStateRegistry = new SharedStateRegistry(asyncIOExecutor);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 0bbf6b5..e224780 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -40,11 +40,11 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -173,6 +173,9 @@ public class CheckpointCoordinator {
 	@Nullable
 	private CheckpointStatsTracker statsTracker;
 
+	/** Registry that tracks state which is shared across (incremental) checkpoints */
+	private final SharedStateRegistry sharedStateRegistry;
+
 	// --------------------------------------------------------------------------------------------
 
 	public CheckpointCoordinator(
@@ -226,6 +229,7 @@ public class CheckpointCoordinator {
 		this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
 		this.checkpointDirectory = checkpointDirectory;
 		this.executor = checkNotNull(executor);
+		this.sharedStateRegistry = new SharedStateRegistry(executor);
 
 		this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
 		this.masterHooks = new HashMap<>();
@@ -836,6 +840,10 @@ public class CheckpointCoordinator {
 		final long checkpointId = pendingCheckpoint.getCheckpointId();
 		final CompletedCheckpoint completedCheckpoint;
 
+		// As a first step to complete the checkpoint, we register its state with the registry
+		Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();
+		sharedStateRegistry.registerAll(operatorStates.values());
+
 		try {
 			try {
 				// externalize the checkpoint if required
@@ -1003,7 +1011,7 @@ public class CheckpointCoordinator {
 			}
 
 			// Recover the checkpoints
-			completedCheckpointStore.recover();
+			completedCheckpointStore.recover(sharedStateRegistry);
 
 			// restore from the latest checkpoint
 			CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
@@ -1120,11 +1128,6 @@ public class CheckpointCoordinator {
 		return completedCheckpointStore;
 	}
 
-//	@VisibleForTesting
-//	SharedStateRegistry getSharedStateRegistry() {
-//		return sharedStateRegistry;
-//	}
-
 	public CheckpointIDCounter getCheckpointIdCounter() {
 		return checkpointIdCounter;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index b382080..7c3edee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -178,7 +178,7 @@ public class CompletedCheckpoint implements Serializable {
 		doDiscard();
 	}
 
-	public boolean discardOnSubsume(SharedStateRegistry sharedStateRegistry) throws Exception {
+	public boolean discardOnSubsume() throws Exception {
 
 		if (props.discardOnSubsumed()) {
 			doDiscard();
@@ -188,7 +188,7 @@ public class CompletedCheckpoint implements Serializable {
 		return false;
 	}
 
-	public boolean discardOnShutdown(JobStatus jobStatus, SharedStateRegistry sharedStateRegistry) throws Exception {
+	public boolean discardOnShutdown(JobStatus jobStatus) throws Exception {
 
 		if (jobStatus == JobStatus.FINISHED && props.discardOnJobFinished() ||
 				jobStatus == JobStatus.CANCELED && props.discardOnJobCancelled() ||
@@ -290,7 +290,7 @@ public class CompletedCheckpoint implements Serializable {
 	 *
 	 * @param sharedStateRegistry The registry where shared states are registered
 	 */
-	public void registerSharedStates(SharedStateRegistry sharedStateRegistry) {
+	public void registerSharedStatesAfterRestored(SharedStateRegistry sharedStateRegistry) {
 		sharedStateRegistry.registerAll(operatorStates.values());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index 82193b5..45d407e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 
 import java.util.List;
 
@@ -32,8 +33,10 @@ public interface CompletedCheckpointStore {
 	 *
 	 * <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest
 	 * available checkpoint.
+	 *
+	 * @param sharedStateRegistry the shared state registry to register recovered states.
 	 */
-	void recover() throws Exception;
+	void recover(SharedStateRegistry sharedStateRegistry) throws Exception;
 
 	/**
 	 * Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints.

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 370032a..0633fec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -35,7 +35,6 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 233cfc8..fbb0198 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,7 +33,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 /**
  * {@link CompletedCheckpointStore} for JobManagers running in {@link HighAvailabilityMode#NONE}.
  */
-public class StandaloneCompletedCheckpointStore extends AbstractCompletedCheckpointStore {
+public class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StandaloneCompletedCheckpointStore.class);
 
@@ -56,21 +57,19 @@ public class StandaloneCompletedCheckpointStore extends AbstractCompletedCheckpo
 	}
 
 	@Override
-	public void recover() throws Exception {
+	public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
 		// Nothing to do
 	}
 
 	@Override
 	public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
-		
-		checkpoint.registerSharedStates(sharedStateRegistry);
 
 		checkpoints.addLast(checkpoint);
 
 		if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
 			try {
 				CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
-				checkpointToSubsume.discardOnSubsume(sharedStateRegistry);
+				checkpointToSubsume.discardOnSubsume();
 			} catch (Exception e) {
 				LOG.warn("Fail to subsume the old checkpoint.", e);
 			}
@@ -103,7 +102,7 @@ public class StandaloneCompletedCheckpointStore extends AbstractCompletedCheckpo
 			LOG.info("Shutting down");
 
 			for (CompletedCheckpoint checkpoint : checkpoints) {
-				checkpoint.discardOnShutdown(jobStatus, sharedStateRegistry);
+				checkpoint.discardOnShutdown(jobStatus);
 			}
 		} finally {
 			checkpoints.clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 4c3c1ff..469c1b1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -64,7 +64,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * checkpoints is consistent. Currently, after recovery we start out with only a single
  * checkpoint to circumvent those situations.
  */
-public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpointStore {
+public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class);
 
@@ -102,8 +102,6 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 			RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage,
 			Executor executor) throws Exception {
 
-		super(executor);
-
 		checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
 		checkNotNull(stateStorage, "State storage");
 
@@ -139,13 +137,14 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 	 * that the history of checkpoints is consistent.
 	 */
 	@Override
-	public void recover() throws Exception {
+	public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
 		LOG.info("Recovering checkpoints from ZooKeeper.");
 
 		// Clear local handles in order to prevent duplicates on
 		// recovery. The local handles should reflect the state
 		// of ZooKeeper.
 		completedCheckpoints.clear();
+		sharedStateRegistry.clear();
 
 		// Get all there is first
 		List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
@@ -171,7 +170,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 				completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
 				if (completedCheckpoint != null) {
 					// Re-register all shared states in the checkpoint.
-					completedCheckpoint.registerSharedStates(sharedStateRegistry);
+					completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
 					completedCheckpoints.add(completedCheckpoint);
 				}
 			} catch (Exception e) {
@@ -195,9 +194,6 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 		
 		final String path = checkpointIdToPath(checkpoint.getCheckpointID());
 
-		// First, register all shared states in the checkpoint to consolidates placeholder.
-		checkpoint.registerSharedStates(sharedStateRegistry);
-
 		// Now add the new one. If it fails, we don't want to loose existing data.
 		checkpointsInZooKeeper.addAndLock(path, checkpoint);
 
@@ -206,7 +202,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 		// Everything worked, let's remove a previous checkpoint if necessary.
 		while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
 			try {
-				removeSubsumed(completedCheckpoints.removeFirst(), sharedStateRegistry);
+				removeSubsumed(completedCheckpoints.removeFirst());
 			} catch (Exception e) {
 				LOG.warn("Failed to subsume the old checkpoint", e);
 			}
@@ -248,7 +244,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 
 			for (CompletedCheckpoint checkpoint : completedCheckpoints) {
 				try {
-					removeShutdown(checkpoint, jobStatus, sharedStateRegistry);
+					removeShutdown(checkpoint, jobStatus);
 				} catch (Exception e) {
 					LOG.error("Failed to discard checkpoint.", e);
 				}
@@ -274,8 +270,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 	// ------------------------------------------------------------------------
 
 	private void removeSubsumed(
-		final CompletedCheckpoint completedCheckpoint,
-		final SharedStateRegistry sharedStateRegistry) throws Exception {
+		final CompletedCheckpoint completedCheckpoint) throws Exception {
 
 		if(completedCheckpoint == null) {
 			return;
@@ -287,7 +282,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 				public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
 					if (value != null) {
 						try {
-							completedCheckpoint.discardOnSubsume(sharedStateRegistry);
+							completedCheckpoint.discardOnSubsume();
 						} catch (Exception e) {
 							throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
 						}
@@ -302,8 +297,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 
 	private void removeShutdown(
 			final CompletedCheckpoint completedCheckpoint,
-			final JobStatus jobStatus,
-			final SharedStateRegistry sharedStateRegistry) throws Exception {
+			final JobStatus jobStatus) throws Exception {
 
 		if(completedCheckpoint == null) {
 			return;
@@ -313,7 +307,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 			@Override
 			public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
 				try {
-					completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry);
+					completedCheckpoint.discardOnShutdown(jobStatus);
 				} catch (Exception e) {
 					throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index a5e0f84..949839b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -29,10 +29,13 @@ import java.util.Objects;
 import java.util.concurrent.Executor;
 
 /**
+ * This registry manages state that is shared across (incremental) checkpoints, and is responsible
+ * for deleting shared state that is no longer used in any valid checkpoint.
+ *
  * A {@code SharedStateRegistry} will be deployed in the 
- * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to
- * maintain the reference count of {@link StreamStateHandle}s which are shared
- * among different incremental checkpoints.
+ * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to
+ * maintain the reference count of {@link StreamStateHandle}s by a key that (logically) identifies
+ * them.
  */
 public class SharedStateRegistry {
 
@@ -247,4 +250,11 @@ public class SharedStateRegistry {
 			}
 		}
 	}
+
+	/**
+	 * Clears the registry.
+	 */
+	public void clear() {
+		registeredStates.clear();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 6e20be3..344b340 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -135,7 +136,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 	private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore {
 
 		@Override
-		public void recover() throws Exception {
+		public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
 			throw new UnsupportedOperationException("Not implemented.");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index fb5d7c3..1fe4e65 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -64,6 +64,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 	 */
 	@Test
 	public void testAddAndGetLatestCheckpoint() throws Exception {
+		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
 		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4);
 		
 		// Empty state
@@ -71,7 +72,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 		assertEquals(0, checkpoints.getAllCheckpoints().size());
 
 		TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] {
-				createCheckpoint(0), createCheckpoint(1) };
+				createCheckpoint(0, sharedStateRegistry), createCheckpoint(1, sharedStateRegistry) };
 
 		// Add and get latest
 		checkpoints.addCheckpoint(expected[0]);
@@ -89,11 +90,12 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 	 */
 	@Test
 	public void testAddCheckpointMoreThanMaxRetained() throws Exception {
+		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
 		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1);
 
 		TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] {
-				createCheckpoint(0), createCheckpoint(1),
-				createCheckpoint(2), createCheckpoint(3)
+				createCheckpoint(0, sharedStateRegistry), createCheckpoint(1, sharedStateRegistry),
+				createCheckpoint(2, sharedStateRegistry), createCheckpoint(3, sharedStateRegistry)
 		};
 
 		// Add checkpoints
@@ -134,11 +136,12 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 	 */
 	@Test
 	public void testGetAllCheckpoints() throws Exception {
+		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
 		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4);
 
 		TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] {
-				createCheckpoint(0), createCheckpoint(1),
-				createCheckpoint(2), createCheckpoint(3)
+				createCheckpoint(0, sharedStateRegistry), createCheckpoint(1, sharedStateRegistry),
+				createCheckpoint(2, sharedStateRegistry), createCheckpoint(3, sharedStateRegistry)
 		};
 
 		for (TestCompletedCheckpoint checkpoint : expected) {
@@ -159,11 +162,12 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 	 */
 	@Test
 	public void testDiscardAllCheckpoints() throws Exception {
+		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
 		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4);
 
 		TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] {
-				createCheckpoint(0), createCheckpoint(1),
-				createCheckpoint(2), createCheckpoint(3)
+				createCheckpoint(0, sharedStateRegistry), createCheckpoint(1, sharedStateRegistry),
+				createCheckpoint(2, sharedStateRegistry), createCheckpoint(3, sharedStateRegistry)
 		};
 
 		for (TestCompletedCheckpoint checkpoint : expected) {
@@ -187,7 +191,10 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 
 	// ---------------------------------------------------------------------------------------------
 
-	protected TestCompletedCheckpoint createCheckpoint(int id) throws IOException {
+	protected TestCompletedCheckpoint createCheckpoint(
+		int id,
+		SharedStateRegistry sharedStateRegistry) throws IOException {
+
 		int numberOfStates = 4;
 		CheckpointProperties props = CheckpointProperties.forStandardCheckpoint();
 
@@ -204,6 +211,8 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 			operatorState.putState(i, subtaskState);
 		}
 
+		operatorState.registerSharedStates(sharedStateRegistry);
+
 		return new TestCompletedCheckpoint(new JobID(), id, 0, operatorGroupState, props);
 	}
 
@@ -251,8 +260,8 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 		}
 
 		@Override
-		public boolean discardOnSubsume(SharedStateRegistry sharedStateRegistry) throws Exception {
-			if (super.discardOnSubsume(sharedStateRegistry)) {
+		public boolean discardOnSubsume() throws Exception {
+			if (super.discardOnSubsume()) {
 				discard();
 				return true;
 			} else {
@@ -261,8 +270,8 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 		}
 
 		@Override
-		public boolean discardOnShutdown(JobStatus jobStatus, SharedStateRegistry sharedStateRegistry) throws Exception {
-			if (super.discardOnShutdown(jobStatus, sharedStateRegistry)) {
+		public boolean discardOnShutdown(JobStatus jobStatus) throws Exception {
+			if (super.discardOnShutdown(jobStatus)) {
 				discard();
 				return true;
 			} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 0bbb961..4846879 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -67,7 +67,7 @@ public class CompletedCheckpointTest {
 				new FileStateHandle(new Path(file.toURI()), file.length()),
 				file.getAbsolutePath());
 
-		checkpoint.discardOnShutdown(JobStatus.FAILED, new SharedStateRegistry());
+		checkpoint.discardOnShutdown(JobStatus.FAILED);
 
 		assertEquals(false, file.exists());
 	}
@@ -93,11 +93,11 @@ public class CompletedCheckpointTest {
 				null);
 
 		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-		checkpoint.registerSharedStates(sharedStateRegistry);
+		checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
 		verify(state, times(1)).registerSharedStates(sharedStateRegistry);
 
 		// Subsume
-		checkpoint.discardOnSubsume(sharedStateRegistry);
+		checkpoint.discardOnSubsume();
 
 		verify(state, times(1)).discardState();
 	}
@@ -132,9 +132,9 @@ public class CompletedCheckpointTest {
 					externalPath);
 
 			SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-			checkpoint.registerSharedStates(sharedStateRegistry);
+			checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
 
-			checkpoint.discardOnShutdown(status, sharedStateRegistry);
+			checkpoint.discardOnShutdown(status);
 			verify(state, times(0)).discardState();
 			assertEquals(true, file.exists());
 
@@ -148,7 +148,7 @@ public class CompletedCheckpointTest {
 					null,
 					null);
 
-			checkpoint.discardOnShutdown(status, sharedStateRegistry);
+			checkpoint.discardOnShutdown(status);
 			verify(state, times(1)).discardState();
 		}
 	}
@@ -176,7 +176,7 @@ public class CompletedCheckpointTest {
 		CompletedCheckpointStats.DiscardCallback callback = mock(CompletedCheckpointStats.DiscardCallback.class);
 		completed.setDiscardCallback(callback);
 
-		completed.discardOnShutdown(JobStatus.FINISHED, new SharedStateRegistry());
+		completed.discardOnShutdown(JobStatus.FINISHED);
 		verify(callback, times(1)).notifyDiscardedCheckpoint();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
index be94762..6f3c60b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
@@ -30,7 +30,6 @@ import java.util.List;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
 import static org.powermock.api.mockito.PowerMockito.doReturn;
 import static org.powermock.api.mockito.PowerMockito.doThrow;
 import static org.powermock.api.mockito.PowerMockito.mock;
@@ -41,7 +40,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
 public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointStoreTest {
 
 	@Override
-	protected AbstractCompletedCheckpointStore createCompletedCheckpoints(
+	protected CompletedCheckpointStore createCompletedCheckpoints(
 			int maxNumberOfCheckpointsToRetain) throws Exception {
 
 		return new StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain);
@@ -52,13 +51,14 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
 	 */
 	@Test
 	public void testShutdownDiscardsCheckpoints() throws Exception {
-		AbstractCompletedCheckpointStore store = createCompletedCheckpoints(1);
-		TestCompletedCheckpoint checkpoint = createCheckpoint(0);
+		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+		CompletedCheckpointStore store = createCompletedCheckpoints(1);
+		TestCompletedCheckpoint checkpoint = createCheckpoint(0, sharedStateRegistry);
 		Collection<OperatorState> operatorStates = checkpoint.getOperatorStates().values();
 
 		store.addCheckpoint(checkpoint);
 		assertEquals(1, store.getNumberOfRetainedCheckpoints());
-		verifyCheckpointRegistered(operatorStates, store.sharedStateRegistry);
+		verifyCheckpointRegistered(operatorStates, sharedStateRegistry);
 
 		store.shutdown(JobStatus.FINISHED);
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
@@ -72,13 +72,14 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
 	 */
 	@Test
 	public void testSuspendDiscardsCheckpoints() throws Exception {
-		AbstractCompletedCheckpointStore store = createCompletedCheckpoints(1);
-		TestCompletedCheckpoint checkpoint = createCheckpoint(0);
+		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+		CompletedCheckpointStore store = createCompletedCheckpoints(1);
+		TestCompletedCheckpoint checkpoint = createCheckpoint(0, sharedStateRegistry);
 		Collection<OperatorState> taskStates = checkpoint.getOperatorStates().values();
 
 		store.addCheckpoint(checkpoint);
 		assertEquals(1, store.getNumberOfRetainedCheckpoints());
-		verifyCheckpointRegistered(taskStates, store.sharedStateRegistry);
+		verifyCheckpointRegistered(taskStates, sharedStateRegistry);
 
 		store.shutdown(JobStatus.SUSPENDED);
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
@@ -92,7 +93,7 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
 	 */
 	@Test
 	public void testAddCheckpointWithFailedRemove() throws Exception {
-		
+
 		final int numCheckpointsToRetain = 1;
 		CompletedCheckpointStore store = createCompletedCheckpoints(numCheckpointsToRetain);
 		
@@ -100,7 +101,7 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
 			CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
 			doReturn(i).when(checkpointToAdd).getCheckpointID();
 			doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
-			doThrow(new IOException()).when(checkpointToAdd).discardOnSubsume(any(SharedStateRegistry.class));
+			doThrow(new IOException()).when(checkpointToAdd).discardOnSubsume();
 			
 			try {
 				store.addCheckpoint(checkpointToAdd);

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 44c802b..81ee4f9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -22,6 +22,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.zookeeper.data.Stat;
@@ -34,7 +35,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
@@ -82,10 +82,14 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 	 */
 	@Test
 	public void testRecover() throws Exception {
-		AbstractCompletedCheckpointStore checkpoints = createCompletedCheckpoints(3);
 
-		TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] {
-				createCheckpoint(0), createCheckpoint(1), createCheckpoint(2)
+		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+		CompletedCheckpointStore checkpoints = createCompletedCheckpoints(3);
+
+		TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[]{
+			createCheckpoint(0, sharedStateRegistry),
+			createCheckpoint(1, sharedStateRegistry),
+			createCheckpoint(2, sharedStateRegistry)
 		};
 
 		// Add multiple checkpoints
@@ -93,16 +97,17 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		checkpoints.addCheckpoint(expected[1]);
 		checkpoints.addCheckpoint(expected[2]);
 
-		verifyCheckpointRegistered(expected[0].getOperatorStates().values(), checkpoints.sharedStateRegistry);
-		verifyCheckpointRegistered(expected[1].getOperatorStates().values(), checkpoints.sharedStateRegistry);
-		verifyCheckpointRegistered(expected[2].getOperatorStates().values(), checkpoints.sharedStateRegistry);
+		verifyCheckpointRegistered(expected[0].getOperatorStates().values(), sharedStateRegistry);
+		verifyCheckpointRegistered(expected[1].getOperatorStates().values(), sharedStateRegistry);
+		verifyCheckpointRegistered(expected[2].getOperatorStates().values(), sharedStateRegistry);
 
 		// All three should be in ZK
 		assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
 		assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
 
 		// Recover
-		checkpoints.recover();
+		sharedStateRegistry.clear();
+		checkpoints.recover(sharedStateRegistry);
 
 		assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
 		assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
@@ -111,7 +116,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		List<CompletedCheckpoint> expectedCheckpoints = new ArrayList<>(3);
 		expectedCheckpoints.add(expected[1]);
 		expectedCheckpoints.add(expected[2]);
-		expectedCheckpoints.add(createCheckpoint(3));
+		expectedCheckpoints.add(createCheckpoint(3, sharedStateRegistry));
 
 		checkpoints.addCheckpoint(expectedCheckpoints.get(2));
 
@@ -120,7 +125,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		assertEquals(expectedCheckpoints, actualCheckpoints);
 
 		for (CompletedCheckpoint actualCheckpoint : actualCheckpoints) {
-			verifyCheckpointRegistered(actualCheckpoint.getOperatorStates().values(), checkpoints.sharedStateRegistry);
+			verifyCheckpointRegistered(actualCheckpoint.getOperatorStates().values(), sharedStateRegistry);
 		}
 	}
 
@@ -131,8 +136,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 	public void testShutdownDiscardsCheckpoints() throws Exception {
 		CuratorFramework client = ZOOKEEPER.getClient();
 
+		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
 		CompletedCheckpointStore store = createCompletedCheckpoints(1);
-		TestCompletedCheckpoint checkpoint = createCheckpoint(0);
+		TestCompletedCheckpoint checkpoint = createCheckpoint(0, sharedStateRegistry);
 
 		store.addCheckpoint(checkpoint);
 		assertEquals(1, store.getNumberOfRetainedCheckpoints());
@@ -142,7 +148,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
 		assertNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
-		store.recover();
+		sharedStateRegistry.clear();
+		store.recover(sharedStateRegistry);
 
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
 	}
@@ -156,8 +163,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 	public void testSuspendKeepsCheckpoints() throws Exception {
 		CuratorFramework client = ZOOKEEPER.getClient();
 
+		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
 		CompletedCheckpointStore store = createCompletedCheckpoints(1);
-		TestCompletedCheckpoint checkpoint = createCheckpoint(0);
+		TestCompletedCheckpoint checkpoint = createCheckpoint(0, sharedStateRegistry);
 
 		store.addCheckpoint(checkpoint);
 		assertEquals(1, store.getNumberOfRetainedCheckpoints());
@@ -174,7 +182,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		assertEquals("The checkpoint node should not be locked.", 0, stat.getNumChildren());
 
 		// Recover again
-		store.recover();
+		sharedStateRegistry.clear();
+		store.recover(sharedStateRegistry);
 
 		CompletedCheckpoint recovered = store.getLatestCheckpoint();
 		assertEquals(checkpoint, recovered);
@@ -188,18 +197,20 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 	@Test
 	public void testLatestCheckpointRecovery() throws Exception {
 		final int numCheckpoints = 3;
-		AbstractCompletedCheckpointStore checkpointStore = createCompletedCheckpoints(numCheckpoints);
+		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+		CompletedCheckpointStore checkpointStore = createCompletedCheckpoints(numCheckpoints);
 		List<CompletedCheckpoint> checkpoints = new ArrayList<>(numCheckpoints);
 
-		checkpoints.add(createCheckpoint(9));
-		checkpoints.add(createCheckpoint(10));
-		checkpoints.add(createCheckpoint(11));
+		checkpoints.add(createCheckpoint(9, sharedStateRegistry));
+		checkpoints.add(createCheckpoint(10, sharedStateRegistry));
+		checkpoints.add(createCheckpoint(11, sharedStateRegistry));
 
 		for (CompletedCheckpoint checkpoint : checkpoints) {
 			checkpointStore.addCheckpoint(checkpoint);
 		}
 
-		checkpointStore.recover();
+		sharedStateRegistry.clear();
+		checkpointStore.recover(sharedStateRegistry);
 
 		CompletedCheckpoint latestCheckpoint = checkpointStore.getLatestCheckpoint();
 
@@ -220,13 +231,16 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		ZooKeeperCompletedCheckpointStore zkCheckpointStore1 = createCompletedCheckpoints(numberOfCheckpoints);
 		ZooKeeperCompletedCheckpointStore zkCheckpointStore2 = createCompletedCheckpoints(numberOfCheckpoints);
 
-		TestCompletedCheckpoint completedCheckpoint = createCheckpoint(1);
+		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+
+		TestCompletedCheckpoint completedCheckpoint = createCheckpoint(1, sharedStateRegistry);
 
 		// complete the first checkpoint
 		zkCheckpointStore1.addCheckpoint(completedCheckpoint);
 
 		// recover the checkpoint by a different checkpoint store
-		zkCheckpointStore2.recover();
+		sharedStateRegistry.clear();
+		zkCheckpointStore2.recover(sharedStateRegistry);
 
 		CompletedCheckpoint recoveredCheckpoint = zkCheckpointStore2.getLatestCheckpoint();
 		assertTrue(recoveredCheckpoint instanceof TestCompletedCheckpoint);
@@ -237,7 +251,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 
 		// complete another checkpoint --> this should remove the first checkpoint from the store
 		// because the number of retained checkpoints == 1
-		TestCompletedCheckpoint completedCheckpoint2 = createCheckpoint(2);
+		TestCompletedCheckpoint completedCheckpoint2 = createCheckpoint(2, sharedStateRegistry);
 		zkCheckpointStore1.addCheckpoint(completedCheckpoint2);
 
 		List<CompletedCheckpoint> allCheckpoints = zkCheckpointStore1.getAllCheckpoints();
@@ -251,7 +265,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		// check that we have not discarded the first completed checkpoint
 		assertFalse(recoveredTestCheckpoint.isDiscarded());
 
-		TestCompletedCheckpoint completedCheckpoint3 = createCheckpoint(3);
+		TestCompletedCheckpoint completedCheckpoint3 = createCheckpoint(3, sharedStateRegistry);
 
 		// this should release the last lock on completedCheckoint and thus discard it
 		zkCheckpointStore2.addCheckpoint(completedCheckpoint3);

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 7d22d8e..23cc8c8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -27,6 +27,7 @@ import org.apache.curator.utils.EnsurePath;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.TestLogger;
@@ -158,7 +159,8 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 			stateStorage,
 			Executors.directExecutor());
 
-		zooKeeperCompletedCheckpointStore.recover();
+		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+		zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry);
 
 		CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
index 2251e46..a0c4412 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.runtime.testutils;
 
-import org.apache.flink.runtime.checkpoint.AbstractCompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,7 +33,7 @@ import java.util.List;
  * A checkpoint store, which supports shutdown and suspend. You can use this to test HA
  * as long as the factory always returns the same store instance.
  */
-public class RecoverableCompletedCheckpointStore extends AbstractCompletedCheckpointStore {
+public class RecoverableCompletedCheckpointStore implements CompletedCheckpointStore {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RecoverableCompletedCheckpointStore.class);
 
@@ -41,26 +42,24 @@ public class RecoverableCompletedCheckpointStore extends AbstractCompletedCheckp
 	private final ArrayDeque<CompletedCheckpoint> suspended = new ArrayDeque<>(2);
 
 	@Override
-	public void recover() throws Exception {
+	public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
 		checkpoints.addAll(suspended);
 		suspended.clear();
 
 		for (CompletedCheckpoint checkpoint : checkpoints) {
-			checkpoint.registerSharedStates(sharedStateRegistry);
+			checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
 		}
 	}
 
 	@Override
 	public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
 
-		checkpoint.registerSharedStates(sharedStateRegistry);
-
 		checkpoints.addLast(checkpoint);
 
 
 		if (checkpoints.size() > 1) {
 			CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
-			checkpointToSubsume.discardOnSubsume(sharedStateRegistry);
+			checkpointToSubsume.discardOnSubsume();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index f9af603..33c3454 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -62,6 +62,7 @@ import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -147,6 +148,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 	private static final int Parallelism = 8;
 
 	private static CountDownLatch CompletedCheckpointsLatch = new CountDownLatch(4);
+	private static CountDownLatch CompletedCheckpointsLatch2 = new CountDownLatch(6);
 
 	private static AtomicLongArray RecoveredStates = new AtomicLongArray(Parallelism);
 
@@ -171,8 +173,8 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 		final String fileStateBackendPath = FileStateBackendBasePath.getAbsoluteFile().toString();
 
 		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
-				zooKeeperQuorum,
-				fileStateBackendPath);
+			zooKeeperQuorum,
+			fileStateBackendPath);
 
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 2);
 
@@ -188,7 +190,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 		try {
 			// Test actor system
 			testActorSystem = AkkaUtils.createActorSystem(new Configuration(),
-					new Some<>(new Tuple2<String, Object>("localhost", 0)));
+				new Some<>(new Tuple2<String, Object>("localhost", 0)));
 
 			// The job managers
 			jobManagerProcess[0] = new JobManagerProcess(0, config);
@@ -204,7 +206,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 
 			// The task manager
 			taskManagerSystem = AkkaUtils.createActorSystem(
-					config, Option.apply(new Tuple2<String, Object>("localhost", 0)));
+				config, Option.apply(new Tuple2<String, Object>("localhost", 0)));
 			TaskManager.startTaskManagerComponentsAndActor(
 				config,
 				ResourceID.generate(),
@@ -223,7 +225,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 
 			// Get the leader ref
 			ActorRef leaderRef = AkkaUtils.getActorRef(
-					leaderAddress, testActorSystem, testDeadline.timeLeft());
+				leaderAddress, testActorSystem, testDeadline.timeLeft());
 			ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
 
 			// Who's the boss?
@@ -248,10 +250,10 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 
 			// Wait for the job to be running
 			JobManagerActorTestUtils.waitForJobStatus(
-					jobGraph.getJobID(),
-					JobStatus.RUNNING,
-					leader,
-					testDeadline.timeLeft());
+				jobGraph.getJobID(),
+				JobStatus.RUNNING,
+				leader,
+				testDeadline.timeLeft());
 
 			// Remove all files
 			FileUtils.deleteDirectory(FileStateBackendBasePath);
@@ -268,7 +270,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 
 				if (output != null) {
 					if (output.contains("Failed to recover job") &&
-							output.contains("java.io.FileNotFoundException")) {
+						output.contains("java.io.FileNotFoundException")) {
 
 						success = true;
 						break;
@@ -352,10 +354,12 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 
 		try {
 			Configuration config = new Configuration();
+
 			config.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS, retainedCheckpoints);
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+			config.setString(ConfigConstants.CHECKPOINTS_DIRECTORY_KEY, temporaryFolder.newFolder().toString());
 
 
 			String tmpFolderString = temporaryFolder.newFolder().toString();
@@ -372,6 +376,8 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setParallelism(Parallelism);
 			env.enableCheckpointing(checkpointingInterval);
+			env.getCheckpointConfig()
+				.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 
 			//TODO parameterize
 			env.setStateBackend(stateBackend);
@@ -427,7 +433,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 	 * A checkpointed source, which emits elements from 0 to a configured number.
 	 */
 	public static class CheckpointedSequenceSource extends RichParallelSourceFunction<Long>
-			implements ListCheckpointed<Tuple2<Long, Integer>> {
+		implements ListCheckpointed<Tuple2<Long, Integer>> {
 
 		private static final Logger LOG = LoggerFactory.getLogger(CheckpointedSequenceSource.class);
 
@@ -465,8 +471,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 						--repeat;
 						current = 0;
 					} else {
-						ctx.collect(LastElement);
-						return;
+						isRunning = false;
 					}
 				}
 
@@ -475,6 +480,11 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 					Thread.sleep(50);
 				}
 			}
+
+			CompletedCheckpointsLatch2.await();
+			synchronized (ctx.getCheckpointLock()) {
+				ctx.collect(LastElement);
+			}
 		}
 
 		@Override
@@ -563,6 +573,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 		public void notifyCheckpointComplete(long checkpointId) throws Exception {
 			LOG.debug("Checkpoint {} completed.", checkpointId);
 			CompletedCheckpointsLatch.countDown();
+			CompletedCheckpointsLatch2.countDown();
 		}
 	}