You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/05/19 08:58:15 UTC

[2/4] flink git commit: [FLINK-6633] Register shared state before adding to CompletedCheckpointStore

[FLINK-6633] Register shared state before adding to CompletedCheckpointStore


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0162543a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0162543a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0162543a

Branch: refs/heads/master
Commit: 0162543ac13f048ef67a6586d8a6e8021ec9dcd4
Parents: 3d119e1
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue May 16 12:32:05 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri May 19 10:57:32 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |  56 +--
 .../state/RocksDBStateBackendTest.java          |  88 ++++-
 .../runtime/checkpoint/CompletedCheckpoint.java | 144 ++-----
 .../flink/runtime/checkpoint/OperatorState.java |   7 -
 .../checkpoint/OperatorSubtaskState.java        |  11 -
 .../StandaloneCompletedCheckpointStore.java     |   4 +-
 .../flink/runtime/checkpoint/SubtaskState.java  |  11 -
 .../flink/runtime/checkpoint/TaskState.java     |   7 -
 .../ZooKeeperCompletedCheckpointStore.java      | 149 +++-----
 .../savepoint/SavepointV2Serializer.java        |  17 +-
 .../runtime/state/CompositeStateHandle.java     |  15 +-
 .../state/IncrementalKeyedStateHandle.java      | 171 ++++-----
 .../runtime/state/KeyGroupsStateHandle.java     |   5 -
 .../state/PlaceholderStreamStateHandle.java     |  44 +--
 .../runtime/state/SharedStateRegistry.java      |  54 +--
 .../state/memory/ByteStreamStateHandle.java     |   7 +
 .../checkpoint/CheckpointCoordinatorTest.java   |  25 --
 .../CompletedCheckpointStoreTest.java           |  61 +--
 .../checkpoint/CompletedCheckpointTest.java     |   3 -
 .../checkpoint/PendingCheckpointTest.java       |   1 -
 ...ZooKeeperCompletedCheckpointStoreITCase.java |   7 +-
 .../savepoint/CheckpointTestUtils.java          |  25 +-
 .../state/IncrementalKeyedStateHandleTest.java  | 206 ++++++++++
 .../runtime/state/SharedStateRegistryTest.java  |  14 +-
 .../runtime/state/StateBackendTestBase.java     |   2 -
 .../RecoverableCompletedCheckpointStore.java    |   5 +-
 ...tractEventTimeWindowCheckpointingITCase.java |   9 +-
 .../JobManagerHACheckpointRecoveryITCase.java   | 375 +++++++++----------
 28 files changed, 747 insertions(+), 776 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 88a759d..1f32a89 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -105,6 +105,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.UUID;
@@ -170,8 +171,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/** True if incremental checkpointing is enabled */
 	private final boolean enableIncrementalCheckpointing;
 
-	/** The sst files materialized in pending checkpoints */
-	private final SortedMap<Long, Map<StateHandleID, StreamStateHandle>> materializedSstFiles = new TreeMap<>();
+	/** The state handle ids of all sst files materialized in snapshots for previous checkpoints */
+	private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles = new TreeMap<>();
 
 	/** The identifier of the last completed checkpoint */
 	private long lastCompletedCheckpointId = -1;
@@ -720,7 +721,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private final long checkpointTimestamp;
 
 		/** All sst files that were part of the last previously completed checkpoint */
-		private Map<StateHandleID, StreamStateHandle> baseSstFiles;
+		private Set<StateHandleID> baseSstFiles;
 
 		/** The state meta data */
 		private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = new ArrayList<>();
@@ -732,10 +733,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private final CloseableRegistry closeableRegistry = new CloseableRegistry();
 
 		// new sst files since the last completed checkpoint
-		private final Map<StateHandleID, StreamStateHandle> newSstFiles = new HashMap<>();
-
-		// old sst files which have been materialized in previous completed checkpoints
-		private final Map<StateHandleID, StreamStateHandle> oldSstFiles = new HashMap<>();
+		private final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
 
 		// handles to the misc files in the current snapshot
 		private final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();
@@ -830,7 +828,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			// use the last completed checkpoint as the comparison base.
 			baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
 
-
 			// save meta data
 			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
 					: stateBackend.kvStateInformation.entrySet()) {
@@ -867,18 +864,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					final StateHandleID stateHandleID = new StateHandleID(fileName);
 
 					if (fileName.endsWith(SST_FILE_SUFFIX)) {
-						StreamStateHandle fileHandle =
-							baseSstFiles == null ? null : baseSstFiles.get(fileName);
+						final boolean existsAlready =
+							baseSstFiles == null ? false : baseSstFiles.contains(stateHandleID);
 
-						if (fileHandle == null) {
-							fileHandle = materializeStateData(filePath);
-							newSstFiles.put(stateHandleID, fileHandle);
-						} else {
+						if (existsAlready) {
 							// we introduce a placeholder state handle, that is replaced with the
 							// original from the shared state registry (created from a previous checkpoint)
-							oldSstFiles.put(
+							sstFiles.put(
 								stateHandleID,
-								new PlaceholderStreamStateHandle(fileHandle.getStateSize()));
+								new PlaceholderStreamStateHandle());
+						} else {
+							sstFiles.put(stateHandleID, materializeStateData(filePath));
 						}
 					} else {
 						StreamStateHandle fileHandle = materializeStateData(filePath);
@@ -887,22 +883,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				}
 			}
 
-			Map<StateHandleID, StreamStateHandle> sstFiles =
-				new HashMap<>(newSstFiles.size() + oldSstFiles.size());
 
-			sstFiles.putAll(newSstFiles);
-			sstFiles.putAll(oldSstFiles);
 
 			synchronized (stateBackend.asyncSnapshotLock) {
-				stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
+				stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
 			}
 
 			return new IncrementalKeyedStateHandle(
 				stateBackend.operatorIdentifier,
 				stateBackend.keyGroupRange,
 				checkpointId,
-				newSstFiles,
-				oldSstFiles,
+				sstFiles,
 				miscFiles,
 				metaStateHandle);
 		}
@@ -933,7 +924,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 				statesToDiscard.add(metaStateHandle);
 				statesToDiscard.addAll(miscFiles.values());
-				statesToDiscard.addAll(newSstFiles.values());
+				statesToDiscard.addAll(sstFiles.values());
 
 				try {
 					StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
@@ -1308,15 +1299,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				UUID.randomUUID().toString());
 
 			try {
-				final Map<StateHandleID, StreamStateHandle> newSstFiles =
-					restoreStateHandle.getCreatedSharedState();
-				final Map<StateHandleID, StreamStateHandle> oldSstFiles =
-					restoreStateHandle.getReferencedSharedState();
+				final Map<StateHandleID, StreamStateHandle> sstFiles =
+					restoreStateHandle.getSharedState();
 				final Map<StateHandleID, StreamStateHandle> miscFiles =
 					restoreStateHandle.getPrivateState();
 
-				readAllStateData(newSstFiles, restoreInstancePath);
-				readAllStateData(oldSstFiles, restoreInstancePath);
+				readAllStateData(sstFiles, restoreInstancePath);
 				readAllStateData(miscFiles, restoreInstancePath);
 
 				// read meta data
@@ -1409,8 +1397,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 						throw new IOException("Could not create RocksDB data directory.");
 					}
 
-					createFileHardLinksInRestorePath(newSstFiles, restoreInstancePath);
-					createFileHardLinksInRestorePath(oldSstFiles, restoreInstancePath);
+					createFileHardLinksInRestorePath(sstFiles, restoreInstancePath);
 					createFileHardLinksInRestorePath(miscFiles, restoreInstancePath);
 
 					List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
@@ -1437,10 +1424,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 
 					// use the restore sst files as the base for succeeding checkpoints
-					Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
-					sstFiles.putAll(newSstFiles);
-					sstFiles.putAll(oldSstFiles);
-					stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles);
+					stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles.keySet());
 
 					stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId();
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 9340455..89eb1d5 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -26,18 +26,22 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateBackendTestBase;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -58,7 +62,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.RunnableFuture;
 
 import static junit.framework.TestCase.assertNotNull;
@@ -67,6 +75,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.mockito.internal.verification.VerificationModeFactory.times;
 import static org.powermock.api.mockito.PowerMockito.mock;
@@ -351,6 +360,83 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 		assertEquals(1, allFilesInDbDir.size());
 	}
 
+	@Test
+	public void testSharedIncrementalStateDeRegistration() throws Exception {
+		if (enableIncrementalCheckpointing) {
+			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+			ValueStateDescriptor<String> kvId =
+				new ValueStateDescriptor<>("id", String.class, null);
+
+			kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+			ValueState<String> state =
+				backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+
+			Queue<IncrementalKeyedStateHandle> previousStateHandles = new LinkedList<>();
+			SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry());
+			for (int checkpointId = 0; checkpointId < 3; ++checkpointId) {
+
+				reset(sharedStateRegistry);
+
+				backend.setCurrentKey(checkpointId);
+				state.update("Hello-" + checkpointId);
+
+				RunnableFuture<KeyedStateHandle> snapshot = backend.snapshot(
+					checkpointId,
+					checkpointId,
+					createStreamFactory(),
+					CheckpointOptions.forFullCheckpoint());
+
+				snapshot.run();
+
+				IncrementalKeyedStateHandle stateHandle = (IncrementalKeyedStateHandle) snapshot.get();
+				Map<StateHandleID, StreamStateHandle> sharedState =
+					new HashMap<>(stateHandle.getSharedState());
+
+				stateHandle.registerSharedStates(sharedStateRegistry);
+
+				for (Map.Entry<StateHandleID, StreamStateHandle> e : sharedState.entrySet()) {
+					verify(sharedStateRegistry).registerReference(
+						stateHandle.createSharedStateRegistryKeyFromFileName(e.getKey()),
+						e.getValue());
+				}
+
+				previousStateHandles.add(stateHandle);
+				backend.notifyCheckpointComplete(checkpointId);
+
+				//-----------------------------------------------------------------
+
+				if (previousStateHandles.size() > 1) {
+					checkRemove(previousStateHandles.remove(), sharedStateRegistry);
+				}
+			}
+
+			while (!previousStateHandles.isEmpty()) {
+
+				reset(sharedStateRegistry);
+
+				checkRemove(previousStateHandles.remove(), sharedStateRegistry);
+			}
+
+			backend.close();
+			backend.dispose();
+		}
+	}
+
+	private void checkRemove(IncrementalKeyedStateHandle remove, SharedStateRegistry registry) throws Exception {
+		for (StateHandleID id : remove.getSharedState().keySet()) {
+			verify(registry, times(0)).unregisterReference(
+				remove.createSharedStateRegistryKeyFromFileName(id));
+		}
+
+		remove.discardState();
+
+		for (StateHandleID id : remove.getSharedState().keySet()) {
+			verify(registry).unregisterReference(
+				remove.createSharedStateRegistryKeyFromFileName(id));
+		}
+	}
 
 	private void runStateUpdates() throws Exception{
 		for (int i = 50; i < 150; ++i) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 1ab5b41..b382080 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -25,8 +25,6 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -177,13 +175,13 @@ public class CompletedCheckpoint implements Serializable {
 	}
 
 	public void discardOnFailedStoring() throws Exception {
-		new UnstoredDiscardStategy().discard();
+		doDiscard();
 	}
 
 	public boolean discardOnSubsume(SharedStateRegistry sharedStateRegistry) throws Exception {
 
 		if (props.discardOnSubsumed()) {
-			new StoredDiscardStrategy(sharedStateRegistry).discard();
+			doDiscard();
 			return true;
 		}
 
@@ -197,7 +195,7 @@ public class CompletedCheckpoint implements Serializable {
 				jobStatus == JobStatus.FAILED && props.discardOnJobFailed() ||
 				jobStatus == JobStatus.SUSPENDED && props.discardOnJobSuspended()) {
 
-			new StoredDiscardStrategy(sharedStateRegistry).discard();
+			doDiscard();
 			return true;
 		} else {
 			if (externalPointer != null) {
@@ -209,6 +207,42 @@ public class CompletedCheckpoint implements Serializable {
 		}
 	}
 
+	private void doDiscard() throws Exception {
+
+		try {
+			// collect exceptions and continue cleanup
+			Exception exception = null;
+
+			// drop the metadata, if we have some
+			if (externalizedMetadata != null) {
+				try {
+					externalizedMetadata.discardState();
+				} catch (Exception e) {
+					exception = e;
+				}
+			}
+
+			// discard private state objects
+			try {
+				StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
+
+			if (exception != null) {
+				throw exception;
+			}
+		} finally {
+			operatorStates.clear();
+
+			// to be null-pointer safe, copy reference to stack
+			CompletedCheckpointStats.DiscardCallback discardCallback = this.discardCallback;
+			if (discardCallback != null) {
+				discardCallback.notifyDiscardedCheckpoint();
+			}
+		}
+	}
+
 	public long getStateSize() {
 		long result = 0L;
 
@@ -252,7 +286,7 @@ public class CompletedCheckpoint implements Serializable {
 
 	/**
 	 * Register all shared states in the given registry. This is method is called
-	 * when the completed checkpoint has been successfully added into the store.
+	 * before the checkpoint is added into the store.
 	 *
 	 * @param sharedStateRegistry The registry where shared states are registered
 	 */
@@ -266,102 +300,4 @@ public class CompletedCheckpoint implements Serializable {
 	public String toString() {
 		return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job);
 	}
-
-	/**
-	 * Base class for the discarding strategies of {@link CompletedCheckpoint}.
-	 */
-	private abstract class DiscardStrategy {
-
-		protected Exception storedException;
-
-		public DiscardStrategy() {
-			this.storedException = null;
-		}
-
-		public void discard() throws Exception {
-
-			try {
-				// collect exceptions and continue cleanup
-				storedException = null;
-
-				doDiscardExternalizedMetaData();
-				doDiscardSharedState();
-				doDiscardPrivateState();
-				doReportStoredExceptions();
-			} finally {
-				clearTaskStatesAndNotifyDiscardCompleted();
-			}
-		}
-
-		protected void doDiscardExternalizedMetaData() {
-			// drop the metadata, if we have some
-			if (externalizedMetadata != null) {
-				try {
-					externalizedMetadata.discardState();
-				} catch (Exception e) {
-					storedException = e;
-				}
-			}
-		}
-
-		protected void doDiscardPrivateState() {
-			// discard private state objects
-			try {
-				StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
-			} catch (Exception e) {
-				storedException = ExceptionUtils.firstOrSuppressed(e, storedException);
-			}
-		}
-
-		protected abstract void doDiscardSharedState();
-
-		protected void doReportStoredExceptions() throws Exception {
-			if (storedException != null) {
-				throw storedException;
-			}
-		}
-
-		protected void clearTaskStatesAndNotifyDiscardCompleted() {
-			operatorStates.clear();
-			// to be null-pointer safe, copy reference to stack
-			CompletedCheckpointStats.DiscardCallback discardCallback =
-				CompletedCheckpoint.this.discardCallback;
-
-			if (discardCallback != null) {
-				discardCallback.notifyDiscardedCheckpoint();
-			}
-		}
-	}
-
-	/**
-	 * Discard all shared states created in the checkpoint. This strategy is applied
-	 * when the completed checkpoint fails to be added into the store.
-	 */
-	private class UnstoredDiscardStategy extends CompletedCheckpoint.DiscardStrategy {
-
-		@Override
-		protected void doDiscardSharedState() {
-			// nothing to do because we did not register any shared state yet. unregistered, new
-			// shared state is then still considered private state and deleted as part of
-			// doDiscardPrivateState().
-		}
-	}
-
-	/**
-	 * Unregister all shared states from the given registry. This is strategy is
-	 * applied when the completed checkpoint is subsumed or the job terminates.
-	 */
-	private class StoredDiscardStrategy extends CompletedCheckpoint.DiscardStrategy {
-
-		SharedStateRegistry sharedStateRegistry;
-
-		public StoredDiscardStrategy(SharedStateRegistry sharedStateRegistry) {
-			this.sharedStateRegistry = Preconditions.checkNotNull(sharedStateRegistry);
-		}
-
-		@Override
-		protected void doDiscardSharedState() {
-			sharedStateRegistry.unregisterAll(operatorStates.values());
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
index aa676e7..b153028 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
@@ -126,13 +126,6 @@ public class OperatorState implements CompositeStateHandle {
 	}
 
 	@Override
-	public void unregisterSharedStates(SharedStateRegistry sharedStateRegistry) {
-		for (OperatorSubtaskState operatorSubtaskState : operatorSubtaskStates.values()) {
-			operatorSubtaskState.unregisterSharedStates(sharedStateRegistry);
-		}
-	}
-
-	@Override
 	public long getStateSize() {
 		long result = 0L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
index 49ef863..e2ae632 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
@@ -158,17 +158,6 @@ public class OperatorSubtaskState implements CompositeStateHandle {
 	}
 
 	@Override
-	public void unregisterSharedStates(SharedStateRegistry sharedStateRegistry) {
-		if (managedKeyedState != null) {
-			managedKeyedState.unregisterSharedStates(sharedStateRegistry);
-		}
-
-		if (rawKeyedState != null) {
-			rawKeyedState.unregisterSharedStates(sharedStateRegistry);
-		}
-	}
-
-	@Override
 	public long getStateSize() {
 		return stateSize;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index f5e1db3..233cfc8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -63,10 +63,10 @@ public class StandaloneCompletedCheckpointStore extends AbstractCompletedCheckpo
 	@Override
 	public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
 		
-		checkpoints.addLast(checkpoint);
-
 		checkpoint.registerSharedStates(sharedStateRegistry);
 
+		checkpoints.addLast(checkpoint);
+
 		if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
 			try {
 				CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
index a77baf3..20d675b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
@@ -162,17 +162,6 @@ public class SubtaskState implements CompositeStateHandle {
 	}
 
 	@Override
-	public void unregisterSharedStates(SharedStateRegistry sharedStateRegistry) {
-		if (managedKeyedState != null) {
-			managedKeyedState.unregisterSharedStates(sharedStateRegistry);
-		}
-
-		if (rawKeyedState != null) {
-			rawKeyedState.unregisterSharedStates(sharedStateRegistry);
-		}
-	}
-
-	@Override
 	public long getStateSize() {
 		return stateSize;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index aa5c516..ed847a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -141,13 +141,6 @@ public class TaskState implements CompositeStateHandle {
 	}
 
 	@Override
-	public void unregisterSharedStates(SharedStateRegistry sharedStateRegistry) {
-		for (SubtaskState subtaskState : subtaskStates.values()) {
-			subtaskState.unregisterSharedStates(sharedStateRegistry);
-		}
-	}
-
-	@Override
 	public long getStateSize() {
 		long result = 0L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 084d93e..4c3c1ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -35,7 +35,6 @@ import javax.annotation.Nullable;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Executor;
 
@@ -79,7 +78,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 	private final int maxNumberOfCheckpointsToRetain;
 
 	/** Local completed checkpoints. */
-	private final ArrayDeque<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointStateHandles;
+	private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
 
 	/**
 	 * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
@@ -122,7 +121,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 
 		this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage, executor);
 
-		this.checkpointStateHandles = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
+		this.completedCheckpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
 		
 		LOG.info("Initialized in '{}'.", checkpointsPath);
 	}
@@ -146,7 +145,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 		// Clear local handles in order to prevent duplicates on
 		// recovery. The local handles should reflect the state
 		// of ZooKeeper.
-		checkpointStateHandles.clear();
+		completedCheckpoints.clear();
 
 		// Get all there is first
 		List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
@@ -170,6 +169,11 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 
 			try {
 				completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
+				if (completedCheckpoint != null) {
+					// Re-register all shared states in the checkpoint.
+					completedCheckpoint.registerSharedStates(sharedStateRegistry);
+					completedCheckpoints.add(completedCheckpoint);
+				}
 			} catch (Exception e) {
 				LOG.warn("Could not retrieve checkpoint. Removing it from the completed " +
 					"checkpoint store.", e);
@@ -177,11 +181,6 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 				// remove the checkpoint with broken state handle
 				removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle.f0);
 			}
-
-			if (completedCheckpoint != null) {
-				completedCheckpoint.registerSharedStates(sharedStateRegistry);
-				checkpointStateHandles.add(checkpointStateHandle);
-			}
 		}
 	}
 
@@ -195,20 +194,19 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 		checkNotNull(checkpoint, "Checkpoint");
 		
 		final String path = checkpointIdToPath(checkpoint.getCheckpointID());
-		final RetrievableStateHandle<CompletedCheckpoint> stateHandle;
 
-		// First add the new one. If it fails, we don't want to loose existing data.
-		stateHandle = checkpointsInZooKeeper.addAndLock(path, checkpoint);
+		// First, register all shared states in the checkpoint to consolidates placeholder.
+		checkpoint.registerSharedStates(sharedStateRegistry);
 
-		checkpointStateHandles.addLast(new Tuple2<>(stateHandle, path));
+		// Now add the new one. If it fails, we don't want to loose existing data.
+		checkpointsInZooKeeper.addAndLock(path, checkpoint);
 
-		// Register all shared states in the checkpoint
-		checkpoint.registerSharedStates(sharedStateRegistry);
+		completedCheckpoints.addLast(checkpoint);
 
 		// Everything worked, let's remove a previous checkpoint if necessary.
-		while (checkpointStateHandles.size() > maxNumberOfCheckpointsToRetain) {
+		while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
 			try {
-				removeSubsumed(checkpointStateHandles.removeFirst().f1, sharedStateRegistry);
+				removeSubsumed(completedCheckpoints.removeFirst(), sharedStateRegistry);
 			} catch (Exception e) {
 				LOG.warn("Failed to subsume the old checkpoint", e);
 			}
@@ -219,60 +217,23 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 
 	@Override
 	public CompletedCheckpoint getLatestCheckpoint() {
-		if (checkpointStateHandles.isEmpty()) {
+		if (completedCheckpoints.isEmpty()) {
 			return null;
 		}
 		else {
-			while(!checkpointStateHandles.isEmpty()) {
-				Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle = checkpointStateHandles.peekLast();
-
-				try {
-					return retrieveCompletedCheckpoint(checkpointStateHandle);
-				} catch (Exception e) {
-					LOG.warn("Could not retrieve latest checkpoint. Removing it from " +
-						"the completed checkpoint store.", e);
-
-					try {
-						// remove the checkpoint with broken state handle
-						Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint = checkpointStateHandles.pollLast();
-						removeBrokenStateHandle(checkpoint.f1, checkpoint.f0);
-					} catch (Exception removeException) {
-						LOG.warn("Could not remove the latest checkpoint with a broken state handle.", removeException);
-					}
-				}
-			}
-
-			return null;
+			return completedCheckpoints.peekLast();
 		}
 	}
 
 	@Override
 	public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
-		List<CompletedCheckpoint> checkpoints = new ArrayList<>(checkpointStateHandles.size());
-
-		Iterator<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> stateHandleIterator = checkpointStateHandles.iterator();
-
-		while (stateHandleIterator.hasNext()) {
-			Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandlePath = stateHandleIterator.next();
-
-			try {
-				checkpoints.add(retrieveCompletedCheckpoint(stateHandlePath));
-			} catch (Exception e) {
-				LOG.warn("Could not retrieve checkpoint. Removing it from the completed " +
-					"checkpoint store.", e);
-
-				// remove the checkpoint with broken state handle
-				stateHandleIterator.remove();
-				removeBrokenStateHandle(stateHandlePath.f1, stateHandlePath.f0);
-			}
-		}
-
+		List<CompletedCheckpoint> checkpoints = new ArrayList<>(completedCheckpoints);
 		return checkpoints;
 	}
 
 	@Override
 	public int getNumberOfRetainedCheckpoints() {
-		return checkpointStateHandles.size();
+		return completedCheckpoints.size();
 	}
 
 	@Override
@@ -285,15 +246,15 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 		if (jobStatus.isGloballyTerminalState()) {
 			LOG.info("Shutting down");
 
-			for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint : checkpointStateHandles) {
+			for (CompletedCheckpoint checkpoint : completedCheckpoints) {
 				try {
-					removeShutdown(checkpoint.f1, jobStatus, sharedStateRegistry);
+					removeShutdown(checkpoint, jobStatus, sharedStateRegistry);
 				} catch (Exception e) {
 					LOG.error("Failed to discard checkpoint.", e);
 				}
 			}
 
-			checkpointStateHandles.clear();
+			completedCheckpoints.clear();
 
 			String path = "/" + client.getNamespace();
 
@@ -303,7 +264,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 			LOG.info("Suspending");
 
 			// Clear the local handles, but don't remove any state
-			checkpointStateHandles.clear();
+			completedCheckpoints.clear();
 
 			// Release the state handle locks in ZooKeeper such that they can be deleted
 			checkpointsInZooKeeper.releaseAll();
@@ -313,21 +274,18 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 	// ------------------------------------------------------------------------
 
 	private void removeSubsumed(
-		final String pathInZooKeeper,
+		final CompletedCheckpoint completedCheckpoint,
 		final SharedStateRegistry sharedStateRegistry) throws Exception {
-		
-		ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> action = new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
-			@Override
-			public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
-				if (value != null) {
-					final CompletedCheckpoint completedCheckpoint;
-					try {
-						completedCheckpoint = value.retrieveState();
-					} catch (Exception e) {
-						throw new FlinkException("Could not retrieve the completed checkpoint from the given state handle.", e);
-					}
 
-					if (completedCheckpoint != null) {
+		if(completedCheckpoint == null) {
+			return;
+		}
+
+		ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> action =
+			new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
+				@Override
+				public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
+					if (value != null) {
 						try {
 							completedCheckpoint.discardOnSubsume(sharedStateRegistry);
 						} catch (Exception e) {
@@ -335,46 +293,41 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 						}
 					}
 				}
-			}
-		};
+			};
 
-		checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, action);
+		checkpointsInZooKeeper.releaseAndTryRemove(
+			checkpointIdToPath(completedCheckpoint.getCheckpointID()),
+			action);
 	}
 
 	private void removeShutdown(
-			final String pathInZooKeeper,
+			final CompletedCheckpoint completedCheckpoint,
 			final JobStatus jobStatus,
 			final SharedStateRegistry sharedStateRegistry) throws Exception {
 
+		if(completedCheckpoint == null) {
+			return;
+		}
+
 		ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> removeAction = new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
 			@Override
 			public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
-				if (value != null) {
-					final CompletedCheckpoint completedCheckpoint;
-
-					try {
-						completedCheckpoint = value.retrieveState();
-					} catch (Exception e) {
-						throw new FlinkException("Could not retrieve the completed checkpoint from the given state handle.", e);
-					}
-
-					if (completedCheckpoint != null) {
-						try {
-							completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry);
-						} catch (Exception e) {
-							throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
-						}
-					}
+				try {
+					completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry);
+				} catch (Exception e) {
+					throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
 				}
 			}
 		};
 
-		checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, removeAction);
+		checkpointsInZooKeeper.releaseAndTryRemove(
+			checkpointIdToPath(completedCheckpoint.getCheckpointID()),
+			removeAction);
 	}
 
 	private void removeBrokenStateHandle(
-			final String pathInZooKeeper,
-			final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle) throws Exception {
+		final String pathInZooKeeper,
+		final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle) throws Exception {
 		checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
 			@Override
 			public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
index b71418b..da0022c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
@@ -75,7 +74,6 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
 	private static final byte KEY_GROUPS_HANDLE = 3;
 	private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
 	private static final byte INCREMENTAL_KEY_GROUPS_HANDLE = 5;
-	private static final byte PLACEHOLDER_STREAM_STATE_HANDLE = 6;
 
 	/** The singleton instance of the serializer */
 	public static final SavepointV2Serializer INSTANCE = new SavepointV2Serializer();
@@ -328,8 +326,7 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
 
 			serializeStreamStateHandle(incrementalKeyedStateHandle.getMetaStateHandle(), dos);
 
-			serializeStreamStateHandleMap(incrementalKeyedStateHandle.getCreatedSharedState(), dos);
-			serializeStreamStateHandleMap(incrementalKeyedStateHandle.getReferencedSharedState(), dos);
+			serializeStreamStateHandleMap(incrementalKeyedStateHandle.getSharedState(), dos);
 			serializeStreamStateHandleMap(incrementalKeyedStateHandle.getPrivateState(), dos);
 		} else {
 			throw new IllegalStateException("Unknown KeyedStateHandle type: " + stateHandle.getClass());
@@ -390,16 +387,14 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
 				KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
 
 			StreamStateHandle metaDataStateHandle = deserializeStreamStateHandle(dis);
-			Map<StateHandleID, StreamStateHandle> createdStates = deserializeStreamStateHandleMap(dis);
-			Map<StateHandleID, StreamStateHandle> referencedStates = deserializeStreamStateHandleMap(dis);
+			Map<StateHandleID, StreamStateHandle> sharedStates = deserializeStreamStateHandleMap(dis);
 			Map<StateHandleID, StreamStateHandle> privateStates = deserializeStreamStateHandleMap(dis);
 
 			return new IncrementalKeyedStateHandle(
 				operatorId,
 				keyGroupRange,
 				checkpointId,
-				createdStates,
-				referencedStates,
+				sharedStates,
 				privateStates,
 				metaDataStateHandle);
 		} else {
@@ -485,10 +480,6 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
 			byte[] internalData = byteStreamStateHandle.getData();
 			dos.writeInt(internalData.length);
 			dos.write(byteStreamStateHandle.getData());
-		} else if (stateHandle instanceof PlaceholderStreamStateHandle) {
-			PlaceholderStreamStateHandle placeholder = (PlaceholderStreamStateHandle) stateHandle;
-			dos.writeByte(PLACEHOLDER_STREAM_STATE_HANDLE);
-			dos.writeLong(placeholder.getStateSize());
 		} else {
 			throw new IOException("Unknown implementation of StreamStateHandle: " + stateHandle.getClass());
 		}
@@ -510,8 +501,6 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
 			byte[] data = new byte[numBytes];
 			dis.readFully(data);
 			return new ByteStreamStateHandle(handleName, data);
-		} else if (PLACEHOLDER_STREAM_STATE_HANDLE == type) {
-			return new PlaceholderStreamStateHandle(dis.readLong());
 		} else {
 			throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java
index 002b7c3..1bc6a0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java
@@ -34,7 +34,8 @@ package org.apache.flink.runtime.state;
  * this handle and considered as private state until it is registered for the first time. Registration
  * transfers ownership to the {@link SharedStateRegistry}.
  * The composite state handle should only delete all private states in the
- * {@link StateObject#discardState()} method.
+ * {@link StateObject#discardState()} method, the {@link SharedStateRegistry} is responsible for
+ * deleting shared states after they were registered.
  */
 public interface CompositeStateHandle extends StateObject {
 
@@ -45,18 +46,10 @@ public interface CompositeStateHandle extends StateObject {
 	 * <p>
 	 * After this is completed, newly created shared state is considered as published is no longer
 	 * owned by this handle. This means that it should no longer be deleted as part of calls to
-	 * {@link #discardState()}.
+	 * {@link #discardState()}. Instead, {@link #discardState()} will trigger an unregistration
+	 * from the registry.
 	 *
 	 * @param stateRegistry The registry where shared states are registered.
 	 */
 	void registerSharedStates(SharedStateRegistry stateRegistry);
-
-	/**
-	 * Unregister both created and referenced shared states in the given
-	 * {@link SharedStateRegistry}. This method is called when the checkpoint is
-	 * subsumed or the job is shut down.
-	 *
-	 * @param stateRegistry The registry where shared states are registered.
-	 */
-	void unregisterSharedStates(SharedStateRegistry stateRegistry);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
index 706e219..770b5a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
@@ -28,18 +28,24 @@ import java.util.Map;
 /**
  * The handle to states of an incremental snapshot.
  * <p>
- * The states contained in an incremental snapshot include
+ * The states contained in an incremental snapshot include:
  * <ul>
- * <li> Created shared state which includes (the supposed to be) shared files produced since the last
+ * <li> Created shared state which includes shared files produced since the last
  * completed checkpoint. These files can be referenced by succeeding checkpoints if the
  * checkpoint succeeds to complete. </li>
  * <li> Referenced shared state which includes the shared files materialized in previous
- * checkpoints. </li>
+ * checkpoints. Until we this is registered to a {@link SharedStateRegistry}, all referenced
+ * shared state handles are only placeholders, so that we do not send state handles twice
+ * from which we know that they already exist on the checkpoint coordinator.</li>
  * <li> Private state which includes all other files, typically mutable, that cannot be shared by
  * other checkpoints. </li>
  * <li> Backend meta state which includes the information of existing states. </li>
  * </ul>
  *
+ * When this should become a completed checkpoint on the checkpoint coordinator, it must first be
+ * registered with a {@link SharedStateRegistry}, so that all placeholder state handles to
+ * previously existing state are replaced with the originals.
+ *
  * IMPORTANT: This class currently overrides equals and hash code only for testing purposes. They
  * should not be called from production code. This means this class is also not suited to serve as
  * a key, e.g. in hash maps.
@@ -66,14 +72,9 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 	private final long checkpointId;
 
 	/**
-	 * State that the incremental checkpoint created new
-	 */
-	private final Map<StateHandleID, StreamStateHandle> createdSharedState;
-
-	/**
-	 * State that the incremental checkpoint references from previous checkpoints
+	 * Shared state in the incremental checkpoint. This i
 	 */
-	private final Map<StateHandleID, StreamStateHandle> referencedSharedState;
+	private final Map<StateHandleID, StreamStateHandle> sharedState;
 
 	/**
 	 * Private state in the incremental checkpoint
@@ -86,32 +87,30 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 	private final StreamStateHandle metaStateHandle;
 
 	/**
-	 * True if the state handle has already registered shared states.
-	 * <p>
-	 * Once the shared states are registered, it's the {@link SharedStateRegistry}'s
-	 * responsibility to maintain the shared states. But in the cases where the
-	 * state handle is discarded before performing the registration, the handle
-	 * should delete all the shared states created by it.
+	 * Once the shared states are registered, it is the {@link SharedStateRegistry}'s
+	 * responsibility to cleanup those shared states.
+	 * But in the cases where the state handle is discarded before performing the registration,
+	 * the handle should delete all the shared states created by it.
+	 *
+	 * This variable is not null iff the handles was registered.
 	 */
-	private boolean registered;
+	private transient SharedStateRegistry sharedStateRegistry;
 
 	public IncrementalKeyedStateHandle(
 		String operatorIdentifier,
 		KeyGroupRange keyGroupRange,
 		long checkpointId,
-		Map<StateHandleID, StreamStateHandle> createdSharedState,
-		Map<StateHandleID, StreamStateHandle> referencedSharedState,
+		Map<StateHandleID, StreamStateHandle> sharedState,
 		Map<StateHandleID, StreamStateHandle> privateState,
 		StreamStateHandle metaStateHandle) {
 
 		this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);
 		this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
 		this.checkpointId = checkpointId;
-		this.createdSharedState = Preconditions.checkNotNull(createdSharedState);
-		this.referencedSharedState = Preconditions.checkNotNull(referencedSharedState);
+		this.sharedState = Preconditions.checkNotNull(sharedState);
 		this.privateState = Preconditions.checkNotNull(privateState);
 		this.metaStateHandle = Preconditions.checkNotNull(metaStateHandle);
-		this.registered = false;
+		this.sharedStateRegistry = null;
 	}
 
 	@Override
@@ -123,12 +122,8 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 		return checkpointId;
 	}
 
-	public Map<StateHandleID, StreamStateHandle> getCreatedSharedState() {
-		return createdSharedState;
-	}
-
-	public Map<StateHandleID, StreamStateHandle> getReferencedSharedState() {
-		return referencedSharedState;
+	public Map<StateHandleID, StreamStateHandle> getSharedState() {
+		return sharedState;
 	}
 
 	public Map<StateHandleID, StreamStateHandle> getPrivateState() {
@@ -155,8 +150,6 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 	@Override
 	public void discardState() throws Exception {
 
-		Preconditions.checkState(!registered, "Attempt to dispose a registered composite state with registered shared state. Must unregister first.");
-
 		try {
 			metaStateHandle.discardState();
 		} catch (Exception e) {
@@ -169,37 +162,35 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 			LOG.warn("Could not properly discard misc file states.", e);
 		}
 
-		try {
-			StateUtil.bestEffortDiscardAllStateObjects(createdSharedState.values());
-		} catch (Exception e) {
-			LOG.warn("Could not properly discard new sst file states.", e);
+		// If this was not registered, we can delete the shared state. We can simply apply this
+		// to all handles, because all handles that have not been created for the first time for this
+		// are only placeholders at this point (disposing them is a NOP).
+		if (sharedStateRegistry == null) {
+			try {
+				StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
+			} catch (Exception e) {
+				LOG.warn("Could not properly discard new sst file states.", e);
+			}
+		} else {
+			// If this was registered, we only unregister all our referenced shared states
+			// from the registry.
+			for (StateHandleID stateHandleID : sharedState.keySet()) {
+				sharedStateRegistry.unregisterReference(
+					createSharedStateRegistryKeyFromFileName(stateHandleID));
+			}
 		}
-
 	}
 
 	@Override
 	public long getStateSize() {
-		long size = getPrivateStateSize();
-
-		for (StreamStateHandle oldSstFileHandle : referencedSharedState.values()) {
-			size += oldSstFileHandle.getStateSize();
-		}
-
-		return size;
-	}
-
-	/**
-	 * Returns the size of the state that is privately owned by this handle.
-	 */
-	public long getPrivateStateSize() {
 		long size = StateUtil.getStateSize(metaStateHandle);
 
-		for (StreamStateHandle newSstFileHandle : createdSharedState.values()) {
-			size += newSstFileHandle.getStateSize();
+		for (StreamStateHandle sharedStateHandle : sharedState.values()) {
+			size += sharedStateHandle.getStateSize();
 		}
 
-		for (StreamStateHandle miscFileHandle : privateState.values()) {
-			size += miscFileHandle.getStateSize();
+		for (StreamStateHandle privateStateHandle : privateState.values()) {
+			size += privateStateHandle.getStateSize();
 		}
 
 		return size;
@@ -208,64 +199,38 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 	@Override
 	public void registerSharedStates(SharedStateRegistry stateRegistry) {
 
-		Preconditions.checkState(!registered, "The state handle has already registered its shared states.");
+		Preconditions.checkState(sharedStateRegistry == null, "The state handle has already registered its shared states.");
+
+		sharedStateRegistry = Preconditions.checkNotNull(stateRegistry);
 
-		for (Map.Entry<StateHandleID, StreamStateHandle> newSstFileEntry : createdSharedState.entrySet()) {
+		for (Map.Entry<StateHandleID, StreamStateHandle> sharedStateHandle : sharedState.entrySet()) {
 			SharedStateRegistryKey registryKey =
-				createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
+				createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey());
 
 			SharedStateRegistry.Result result =
-				stateRegistry.registerNewReference(registryKey, newSstFileEntry.getValue());
-
-			// We update our reference with the result from the registry, to prevent the following
-			// problem:
+				stateRegistry.registerReference(registryKey, sharedStateHandle.getValue());
+
+			// This step consolidates our shared handles with the registry, which does two things:
+			//
+			// 1) Replace placeholder state handle with already registered, actual state handles.
+			//
+			// 2) Deduplicate re-uploads of incremental state due to missing confirmations about
+			// completed checkpoints.
+			//
+			// This prevents the following problem:
 			// A previous checkpoint n has already registered the state. This can happen if a
 			// following checkpoint (n + x) wants to reference the same state before the backend got
 			// notified that checkpoint n completed. In this case, the shared registry did
 			// deduplication and returns the previous reference.
-			newSstFileEntry.setValue(result.getReference());
-		}
-
-		for (Map.Entry<StateHandleID, StreamStateHandle> oldSstFileName : referencedSharedState.entrySet()) {
-			SharedStateRegistryKey registryKey =
-				createSharedStateRegistryKeyFromFileName(oldSstFileName.getKey());
-
-			SharedStateRegistry.Result result = stateRegistry.obtainReference(registryKey);
-
-			// Again we update our state handle with the result from the registry, thus replacing
-			// placeholder state handles with the originals.
-			oldSstFileName.setValue(result.getReference());
-		}
-
-		// Migrate state from unregistered to registered, so that it will not count as private state
-		// for #discardState() from now.
-		referencedSharedState.putAll(createdSharedState);
-		createdSharedState.clear();
-
-		registered = true;
-	}
-
-	@Override
-	public void unregisterSharedStates(SharedStateRegistry stateRegistry) {
-
-		Preconditions.checkState(registered, "The state handle has not registered its shared states yet.");
-
-		for (Map.Entry<StateHandleID, StreamStateHandle> newSstFileEntry : createdSharedState.entrySet()) {
-			SharedStateRegistryKey registryKey =
-				createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
-			stateRegistry.releaseReference(registryKey);
-		}
-
-		for (Map.Entry<StateHandleID, StreamStateHandle> oldSstFileEntry : referencedSharedState.entrySet()) {
-			SharedStateRegistryKey registryKey =
-				createSharedStateRegistryKeyFromFileName(oldSstFileEntry.getKey());
-			stateRegistry.releaseReference(registryKey);
+			sharedStateHandle.setValue(result.getReference());
 		}
-
-		registered = false;
 	}
 
-	private SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
+	/**
+	 * Create a unique key to register one of our shared state handles.
+	 */
+	@VisibleForTesting
+	public SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
 		return new SharedStateRegistryKey(operatorIdentifier + '-' + keyGroupRange, shId);
 	}
 
@@ -293,10 +258,7 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 		if (!getKeyGroupRange().equals(that.getKeyGroupRange())) {
 			return false;
 		}
-		if (!getCreatedSharedState().equals(that.getCreatedSharedState())) {
-			return false;
-		}
-		if (!getReferencedSharedState().equals(that.getReferencedSharedState())) {
+		if (!getSharedState().equals(that.getSharedState())) {
 			return false;
 		}
 		if (!getPrivateState().equals(that.getPrivateState())) {
@@ -314,8 +276,7 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 		int result = getOperatorIdentifier().hashCode();
 		result = 31 * result + getKeyGroupRange().hashCode();
 		result = 31 * result + (int) (getCheckpointId() ^ (getCheckpointId() >>> 32));
-		result = 31 * result + getCreatedSharedState().hashCode();
-		result = 31 * result + getReferencedSharedState().hashCode();
+		result = 31 * result + getSharedState().hashCode();
 		result = 31 * result + getPrivateState().hashCode();
 		result = 31 * result + getMetaStateHandle().hashCode();
 		return result;

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index 8280460..8e38ad4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -98,11 +98,6 @@ public class KeyGroupsStateHandle implements StreamStateHandle, KeyedStateHandle
 	}
 
 	@Override
-	public void unregisterSharedStates(SharedStateRegistry stateRegistry) {
-		// No shared states
-	}
-
-	@Override
 	public void discardState() throws Exception {
 		stateHandle.discardState();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
index 2136061..7c948a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
@@ -18,29 +18,20 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 
 /**
  * A placeholder state handle for shared state that will replaced by an original that was
- * created in a previous checkpoint. So we don't have to send the handle twice, e.g. in
- * case of {@link ByteStreamStateHandle}. To be used in the referenced states of
+ * created in a previous checkpoint. So we don't have to send a state handle twice, e.g. in
+ * case of {@link ByteStreamStateHandle}. This class is used in the referenced states of
  * {@link IncrementalKeyedStateHandle}.
- * <p>
- * IMPORTANT: This class currently overrides equals and hash code only for testing purposes. They
- * should not be called from production code. This means this class is also not suited to serve as
- * a key, e.g. in hash maps.
  */
 public class PlaceholderStreamStateHandle implements StreamStateHandle {
 
 	private static final long serialVersionUID = 1L;
 
-	/** We remember the size of the original file for which this is a placeholder */
-	private final long originalSize;
-
-	public PlaceholderStreamStateHandle(long originalSize) {
-		this.originalSize = originalSize;
+	public PlaceholderStreamStateHandle() {
 	}
 
 	@Override
@@ -56,33 +47,6 @@ public class PlaceholderStreamStateHandle implements StreamStateHandle {
 
 	@Override
 	public long getStateSize() {
-		return originalSize;
-	}
-
-	/**
-	 * This method is should only be called in tests! This should never serve as key in a hash map.
-	 */
-	@VisibleForTesting
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		PlaceholderStreamStateHandle that = (PlaceholderStreamStateHandle) o;
-
-		return originalSize == that.originalSize;
-	}
-
-	/**
-	 * This method is should only be called in tests! This should never serve as key in a hash map.
-	 */
-	@VisibleForTesting
-	@Override
-	public int hashCode() {
-		return (int) (originalSize ^ (originalSize >>> 32));
+		return 0L;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index f9161b0..a5e0f84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -54,7 +54,7 @@ public class SharedStateRegistry {
 	}
 
 	/**
-	 * Register a reference to the given (supposedly new) shared state in the registry.
+	 * Register a reference to the given shared state in the registry.
 	 * This does the following: We check if the state handle is actually new by the
 	 * registrationKey. If it is new, we register it with a reference count of 1. If there is
 	 * already a state handle registered under the given key, we dispose the given "new" state
@@ -62,14 +62,14 @@ public class SharedStateRegistry {
 	 * a replacement with the result.
 	 *
 	 * <p>IMPORTANT: caller should check the state handle returned by the result, because the
-	 * registry is performing deduplication and could potentially return a handle that is supposed
+	 * registry is performing de-duplication and could potentially return a handle that is supposed
 	 * to replace the one from the registration request.
 	 *
 	 * @param state the shared state for which we register a reference.
 	 * @return the result of this registration request, consisting of the state handle that is
 	 * registered under the key by the end of the oepration and its current reference count.
 	 */
-	public Result registerNewReference(SharedStateRegistryKey registrationKey, StreamStateHandle state) {
+	public Result registerReference(SharedStateRegistryKey registrationKey, StreamStateHandle state) {
 
 		Preconditions.checkNotNull(state);
 
@@ -96,28 +96,6 @@ public class SharedStateRegistry {
 	}
 
 	/**
-	 * Obtains one reference to the given shared state in the registry. This increases the
-	 * reference count by one.
-	 *
-	 * @param registrationKey the shared state for which we obtain a reference.
-	 * @return the shared state for which we release a reference.
-	 * @return the result of the request, consisting of the reference count after this operation
-	 * and the state handle.
-	 */
-	public Result obtainReference(SharedStateRegistryKey registrationKey) {
-
-		Preconditions.checkNotNull(registrationKey);
-
-		synchronized (registeredStates) {
-			SharedStateRegistry.SharedStateEntry entry =
-				Preconditions.checkNotNull(registeredStates.get(registrationKey),
-					"Could not find a state for the given registration key!");
-			entry.increaseReferenceCount();
-			return new Result(entry);
-		}
-	}
-
-	/**
 	 * Releases one reference to the given shared state in the registry. This decreases the
 	 * reference count by one. Once the count reaches zero, the shared state is deleted.
 	 *
@@ -125,7 +103,7 @@ public class SharedStateRegistry {
 	 * @return the result of the request, consisting of the reference count after this operation
 	 * and the state handle, or null if the state handle was deleted through this request.
 	 */
-	public Result releaseReference(SharedStateRegistryKey registrationKey) {
+	public Result unregisterReference(SharedStateRegistryKey registrationKey) {
 
 		Preconditions.checkNotNull(registrationKey);
 
@@ -172,30 +150,18 @@ public class SharedStateRegistry {
 		}
 	}
 
-	/**
-	 * Unregister all the shared states referenced by the given.
-	 *
-	 * @param stateHandles The shared states to unregister.
-	 */
-	public void unregisterAll(Iterable<? extends CompositeStateHandle> stateHandles) {
-		if (stateHandles == null) {
-			return;
-		}
-
-		synchronized (registeredStates) {
-			for (CompositeStateHandle stateHandle : stateHandles) {
-				stateHandle.unregisterSharedStates(this);
-			}
-		}
-	}
-
 	private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
-		if (streamStateHandle != null) {
+		// We do the small optimization to not issue discards for placeholders, which are NOPs.
+		if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) {
 			asyncDisposalExecutor.execute(
 				new SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle));
 		}
 	}
 
+	private boolean isPlaceholder(StreamStateHandle stateHandle) {
+		return stateHandle instanceof PlaceholderStreamStateHandle;
+	}
+
 	/**
 	 * An entry in the registry, tracking the handle and the corresponding reference count.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
index 42703f8..9ba9d35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
@@ -91,6 +91,13 @@ public class ByteStreamStateHandle implements StreamStateHandle {
 		return 31 * handleName.hashCode();
 	}
 
+	@Override
+	public String toString() {
+		return "ByteStreamStateHandle{" +
+			"handleName='" + handleName + '\'' +
+			'}';
+	}
+
 	/**
 	 * An input stream view on a byte array.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 9250634..3b44d9a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -639,12 +639,6 @@ public class CheckpointCoordinatorTest {
 			assertEquals(checkpointIdNew, successNew.getCheckpointID());
 			assertTrue(successNew.getOperatorStates().isEmpty());
 
-			// validate that the subtask states in old savepoint have unregister their shared states
-			{
-				verify(subtaskState1, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
-				verify(subtaskState2, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
-			}
-
 			// validate that the relevant tasks got a confirmation message
 			{
 				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class));
@@ -925,9 +919,6 @@ public class CheckpointCoordinatorTest {
 			verify(subtaskState1_2, times(1)).discardState();
 
 			// validate that all subtask states in the second checkpoint are not discarded
-			verify(subtaskState2_1, never()).unregisterSharedStates(any(SharedStateRegistry.class));
-			verify(subtaskState2_2, never()).unregisterSharedStates(any(SharedStateRegistry.class));
-			verify(subtaskState2_3, never()).unregisterSharedStates(any(SharedStateRegistry.class));
 			verify(subtaskState2_1, never()).discardState();
 			verify(subtaskState2_2, never()).discardState();
 			verify(subtaskState2_3, never()).discardState();
@@ -951,9 +942,6 @@ public class CheckpointCoordinatorTest {
 			coord.shutdown(JobStatus.FINISHED);
 
 			// validate that the states in the second checkpoint have been discarded
-			verify(subtaskState2_1, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
-			verify(subtaskState2_2, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
-			verify(subtaskState2_3, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
 			verify(subtaskState2_1, times(1)).discardState();
 			verify(subtaskState2_2, times(1)).discardState();
 			verify(subtaskState2_3, times(1)).discardState();
@@ -1562,10 +1550,6 @@ public class CheckpointCoordinatorTest {
 		verify(subtaskState1, never()).discardState();
 		verify(subtaskState2, never()).discardState();
 
-		// Savepoints are not supposed to have any shared state.
-		verify(subtaskState1, never()).unregisterSharedStates(any(SharedStateRegistry.class));
-		verify(subtaskState2, never()).unregisterSharedStates(any(SharedStateRegistry.class));
-
 		// validate that the relevant tasks got a confirmation message
 		{
 			verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class));
@@ -2088,15 +2072,6 @@ public class CheckpointCoordinatorTest {
 		// shutdown the store
 		store.shutdown(JobStatus.SUSPENDED);
 
-		// All shared states should be unregistered once the store is shut down
-		for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
-			for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) {
-				for (OperatorSubtaskState subtaskState : taskState.getStates()) {
-					verify(subtaskState, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
-				}
-			}
-		}
-
 		// restore the store
 		Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 985c662..fb5d7c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -37,11 +37,6 @@ import java.util.concurrent.TimeUnit;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 
 /**
  * Test for basic {@link CompletedCheckpointStore} contract.
@@ -114,12 +109,6 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 			expected[i - 1].awaitDiscard();
 			assertTrue(expected[i - 1].isDiscarded());
 			assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints());
-
-			for (OperatorState operatorState : taskStates) {
-				for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
-					verify(subtaskState, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
-				}
-			}
 		}
 	}
 
@@ -209,7 +198,8 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 		operatorGroupState.put(operatorID, operatorState);
 
 		for (int i = 0; i < numberOfStates; i++) {
-			OperatorSubtaskState subtaskState = mock(OperatorSubtaskState.class);
+			OperatorSubtaskState subtaskState =
+				new TestOperatorSubtaskState();
 
 			operatorState.putState(i, subtaskState);
 		}
@@ -217,18 +207,10 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 		return new TestCompletedCheckpoint(new JobID(), id, 0, operatorGroupState, props);
 	}
 
-	protected void resetCheckpoint(Collection<OperatorState> operatorStates) {
-		for (OperatorState operatorState : operatorStates) {
-			for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
-				Mockito.reset(subtaskState);
-			}
-		}
-	}
-
 	protected void verifyCheckpointRegistered(Collection<OperatorState> operatorStates, SharedStateRegistry registry) {
 		for (OperatorState operatorState : operatorStates) {
 			for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
-				verify(subtaskState, times(1)).registerSharedStates(eq(registry));
+				Assert.assertTrue(((TestOperatorSubtaskState)subtaskState).registered);
 			}
 		}
 	}
@@ -236,7 +218,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 	protected void verifyCheckpointDiscarded(Collection<OperatorState> operatorStates) {
 		for (OperatorState operatorState : operatorStates) {
 			for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
-				verify(subtaskState, times(1)).discardState();
+				Assert.assertTrue(((TestOperatorSubtaskState)subtaskState).discarded);
 			}
 		}
 	}
@@ -333,4 +315,37 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 		}
 	}
 
+	static class TestOperatorSubtaskState extends OperatorSubtaskState {
+		private static final long serialVersionUID = 522580433699164230L;
+
+		boolean registered;
+		boolean discarded;
+
+		public TestOperatorSubtaskState() {
+			super(null, null, null, null, null);
+			this.registered = false;
+			this.discarded = false;
+		}
+
+		@Override
+		public void discardState() {
+			super.discardState();
+			Assert.assertFalse(discarded);
+			discarded = true;
+			registered = false;
+		}
+
+		@Override
+		public void registerSharedStates(SharedStateRegistry sharedStateRegistry) {
+			super.registerSharedStates(sharedStateRegistry);
+			Assert.assertFalse(discarded);
+			registered = true;
+		}
+
+		public void reset() {
+			registered = false;
+			discarded = false;
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 589ff46..0bbb961 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -100,7 +100,6 @@ public class CompletedCheckpointTest {
 		checkpoint.discardOnSubsume(sharedStateRegistry);
 
 		verify(state, times(1)).discardState();
-		verify(state, times(1)).unregisterSharedStates(sharedStateRegistry);
 	}
 
 	/**
@@ -138,7 +137,6 @@ public class CompletedCheckpointTest {
 			checkpoint.discardOnShutdown(status, sharedStateRegistry);
 			verify(state, times(0)).discardState();
 			assertEquals(true, file.exists());
-			verify(state, times(0)).unregisterSharedStates(sharedStateRegistry);
 
 			// Discard
 			props = new CheckpointProperties(false, false, true, true, true, true, true);
@@ -152,7 +150,6 @@ public class CompletedCheckpointTest {
 
 			checkpoint.discardOnShutdown(status, sharedStateRegistry);
 			verify(state, times(1)).discardState();
-			verify(state, times(1)).unregisterSharedStates(sharedStateRegistry);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 6df01a0..a96b597 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -197,7 +197,6 @@ public class PendingCheckpointTest {
 
 		OperatorState state = mock(OperatorState.class);
 		doNothing().when(state).registerSharedStates(any(SharedStateRegistry.class));
-		doNothing().when(state).unregisterSharedStates(any(SharedStateRegistry.class));
 
 		String targetDir = tmpFolder.newFolder().getAbsolutePath();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 0d93289..44c802b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
@@ -100,11 +101,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
 		assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
 
-		resetCheckpoint(expected[0].getOperatorStates().values());
-		resetCheckpoint(expected[1].getOperatorStates().values());
-		resetCheckpoint(expected[2].getOperatorStates().values());
-
-		// Recover TODO!!! clear registry!
+		// Recover
 		checkpoints.recover();
 
 		assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
index b63782d..f985573 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle.StateMetaInfo;
-import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
@@ -273,18 +272,17 @@ public class CheckpointTestUtils {
 	private CheckpointTestUtils() {}
 
 
-	private static IncrementalKeyedStateHandle createDummyIncrementalKeyedStateHandle(Random rnd) {
+	public static IncrementalKeyedStateHandle createDummyIncrementalKeyedStateHandle(Random rnd) {
 		return new IncrementalKeyedStateHandle(
 			createRandomUUID(rnd).toString(),
 			new KeyGroupRange(1, 1),
 			42L,
-			createRandomOwnedHandleMap(rnd),
-			createRandomReferencedHandleMap(rnd),
-			createRandomOwnedHandleMap(rnd),
+			createRandomStateHandleMap(rnd),
+			createRandomStateHandleMap(rnd),
 			createDummyStreamStateHandle(rnd));
 	}
 
-	private static Map<StateHandleID, StreamStateHandle> createRandomOwnedHandleMap(Random rnd) {
+	public static Map<StateHandleID, StreamStateHandle> createRandomStateHandleMap(Random rnd) {
 		final int size = rnd.nextInt(4);
 		Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size);
 		for (int i = 0; i < size; ++i) {
@@ -296,24 +294,13 @@ public class CheckpointTestUtils {
 		return result;
 	}
 
-	private static Map<StateHandleID, StreamStateHandle> createRandomReferencedHandleMap(Random rnd) {
-		final int size = rnd.nextInt(4);
-		Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size);
-		for (int i = 0; i < size; ++i) {
-			StateHandleID randomId = new StateHandleID(createRandomUUID(rnd).toString());
-			result.put(randomId, new PlaceholderStreamStateHandle(rnd.nextInt(1024)));
-		}
-
-		return result;
-	}
-
-	private static KeyGroupsStateHandle createDummyKeyGroupStateHandle(Random rnd) {
+	public static KeyGroupsStateHandle createDummyKeyGroupStateHandle(Random rnd) {
 		return new KeyGroupsStateHandle(
 			new KeyGroupRangeOffsets(1, 1, new long[]{rnd.nextInt(1024)}),
 			createDummyStreamStateHandle(rnd));
 	}
 
-	private static StreamStateHandle createDummyStreamStateHandle(Random rnd) {
+	public static StreamStateHandle createDummyStreamStateHandle(Random rnd) {
 		return new TestByteStreamStateHandleDeepCompare(
 			String.valueOf(createRandomUUID(rnd)),
 			String.valueOf(createRandomUUID(rnd)).getBytes(ConfigConstants.DEFAULT_CHARSET));