You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/07/28 13:42:53 UTC

[1/2] flink git commit: [FLINK-7268] [checkpoints] Scope SharedStateRegistry objects per (re)start

Repository: flink
Updated Branches:
  refs/heads/release-1.3 0225db288 -> 09caa9ffd


http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 77423c2..dc2b11e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -106,8 +107,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
 
 		// Recover
-		sharedStateRegistry.clear();
-		checkpoints.recover(sharedStateRegistry);
+		sharedStateRegistry.close();
+		sharedStateRegistry = new SharedStateRegistry();
+		checkpoints.recover();
 
 		assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
 		assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
@@ -148,8 +150,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
 		assertNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
-		sharedStateRegistry.clear();
-		store.recover(sharedStateRegistry);
+		sharedStateRegistry.close();
+		store.recover();
 
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
 	}
@@ -182,8 +184,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		assertEquals("The checkpoint node should not be locked.", 0, stat.getNumChildren());
 
 		// Recover again
-		sharedStateRegistry.clear();
-		store.recover(sharedStateRegistry);
+		sharedStateRegistry.close();
+		store.recover();
 
 		CompletedCheckpoint recovered = store.getLatestCheckpoint();
 		assertEquals(checkpoint, recovered);
@@ -209,8 +211,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 			checkpointStore.addCheckpoint(checkpoint);
 		}
 
-		sharedStateRegistry.clear();
-		checkpointStore.recover(sharedStateRegistry);
+		sharedStateRegistry.close();
+		checkpointStore.recover();
 
 		CompletedCheckpoint latestCheckpoint = checkpointStore.getLatestCheckpoint();
 
@@ -239,8 +241,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		zkCheckpointStore1.addCheckpoint(completedCheckpoint);
 
 		// recover the checkpoint by a different checkpoint store
-		sharedStateRegistry.clear();
-		zkCheckpointStore2.recover(sharedStateRegistry);
+		sharedStateRegistry.close();
+		sharedStateRegistry = new SharedStateRegistry();
+		zkCheckpointStore2.recover();
 
 		CompletedCheckpoint recoveredCheckpoint = zkCheckpointStore2.getLatestCheckpoint();
 		assertTrue(recoveredCheckpoint instanceof TestCompletedCheckpoint);

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 91bab85..3171f1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -52,7 +52,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyCollection;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -162,11 +161,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 			stateStorage,
 			Executors.directExecutor());
 
-		SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry());
-		zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry);
-
-		verify(retrievableStateHandle1.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry);
-		verify(retrievableStateHandle2.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry);
+		zooKeeperCompletedCheckpointStore.recover();
 
 		CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
index c1b3ccd..9f6f88e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
@@ -19,12 +19,15 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.runtime.checkpoint.savepoint.CheckpointTestUtils;
+
 import org.junit.Test;
 
 import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.powermock.api.mockito.PowerMockito.spy;
@@ -59,8 +62,6 @@ public class IncrementalKeyedStateHandleTest {
 	@Test
 	public void testSharedStateDeRegistration() throws Exception {
 
-		Random rnd = new Random(42);
-
 		SharedStateRegistry registry = spy(new SharedStateRegistry());
 
 		// Create two state handles with overlapping shared state
@@ -186,6 +187,76 @@ public class IncrementalKeyedStateHandleTest {
 		verify(stateHandle2.getMetaStateHandle(), times(1)).discardState();
 	}
 
+	/**
+	 * This tests that re-registration of shared state with another registry works as expected. This simulates a
+	 * recovery from a checkpoint, when the checkpoint coordinator creates a new shared state registry and re-registers
+	 * all live checkpoint states.
+	 */
+	@Test
+	public void testSharedStateReRegistration() throws Exception {
+
+		SharedStateRegistry stateRegistryA = spy(new SharedStateRegistry());
+
+		IncrementalKeyedStateHandle stateHandleX = create(new Random(1));
+		IncrementalKeyedStateHandle stateHandleY = create(new Random(2));
+		IncrementalKeyedStateHandle stateHandleZ = create(new Random(3));
+
+		// Now we register first time ...
+		stateHandleX.registerSharedStates(stateRegistryA);
+		stateHandleY.registerSharedStates(stateRegistryA);
+		stateHandleZ.registerSharedStates(stateRegistryA);
+
+		try {
+			// Second attempt should fail
+			stateHandleX.registerSharedStates(stateRegistryA);
+			fail("Should not be able to register twice with the same registry.");
+		} catch (IllegalStateException ignore) {
+		}
+
+		// Everything should be discarded for this handle
+		stateHandleZ.discardState();
+		verify(stateHandleZ.getMetaStateHandle(), times(1)).discardState();
+		for (StreamStateHandle stateHandle : stateHandleZ.getSharedState().values()) {
+			verify(stateHandle, times(1)).discardState();
+		}
+
+		// Close the first registry
+		stateRegistryA.close();
+
+		// Attempt to register to closed registry should trigger exception
+		try {
+			create(new Random(4)).registerSharedStates(stateRegistryA);
+			fail("Should not be able to register new state to closed registry.");
+		} catch (IllegalStateException ignore) {
+		}
+
+		// All state should still get discarded
+		stateHandleY.discardState();
+		verify(stateHandleY.getMetaStateHandle(), times(1)).discardState();
+		for (StreamStateHandle stateHandle : stateHandleY.getSharedState().values()) {
+			verify(stateHandle, times(1)).discardState();
+		}
+
+		// This should still be unaffected
+		verify(stateHandleX.getMetaStateHandle(), never()).discardState();
+		for (StreamStateHandle stateHandle : stateHandleX.getSharedState().values()) {
+			verify(stateHandle, never()).discardState();
+		}
+
+		// We re-register the handle with a new registry
+		SharedStateRegistry sharedStateRegistryB = spy(new SharedStateRegistry());
+		stateHandleX.registerSharedStates(sharedStateRegistryB);
+		stateHandleX.discardState();
+
+		// Should be completely discarded because it is tracked through the new registry
+		verify(stateHandleX.getMetaStateHandle(), times(1)).discardState();
+		for (StreamStateHandle stateHandle : stateHandleX.getSharedState().values()) {
+			verify(stateHandle, times(1)).discardState();
+		}
+
+		sharedStateRegistryB.close();
+	}
+
 	private static IncrementalKeyedStateHandle create(Random rnd) {
 		return new IncrementalKeyedStateHandle(
 			UUID.nameUUIDFromBytes("test".getBytes()),

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
index a0c4412..037ecd1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
@@ -21,7 +21,8 @@ package org.apache.flink.runtime.testutils;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,14 +42,21 @@ public class RecoverableCompletedCheckpointStore implements CompletedCheckpointS
 
 	private final ArrayDeque<CompletedCheckpoint> suspended = new ArrayDeque<>(2);
 
+	private final int maxRetainedCheckpoints;
+
+	public RecoverableCompletedCheckpointStore() {
+		this(1);
+	}
+
+	public RecoverableCompletedCheckpointStore(int maxRetainedCheckpoints) {
+		Preconditions.checkArgument(maxRetainedCheckpoints > 0);
+		this.maxRetainedCheckpoints = maxRetainedCheckpoints;
+	}
+
 	@Override
-	public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
+	public void recover() throws Exception {
 		checkpoints.addAll(suspended);
 		suspended.clear();
-
-		for (CompletedCheckpoint checkpoint : checkpoints) {
-			checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
-		}
 	}
 
 	@Override
@@ -56,13 +64,16 @@ public class RecoverableCompletedCheckpointStore implements CompletedCheckpointS
 
 		checkpoints.addLast(checkpoint);
 
-
-		if (checkpoints.size() > 1) {
-			CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
-			checkpointToSubsume.discardOnSubsume();
+		if (checkpoints.size() > maxRetainedCheckpoints) {
+			removeOldestCheckpoint();
 		}
 	}
 
+	public void removeOldestCheckpoint() throws Exception {
+		CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
+		checkpointToSubsume.discardOnSubsume();
+	}
+
 	@Override
 	public CompletedCheckpoint getLatestCheckpoint() throws Exception {
 		return checkpoints.isEmpty() ? null : checkpoints.getLast();
@@ -96,7 +107,7 @@ public class RecoverableCompletedCheckpointStore implements CompletedCheckpointS
 
 	@Override
 	public int getMaxNumberOfRetainedCheckpoints() {
-		return 1;
+		return maxRetainedCheckpoints;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 3ce5a14..fb5f0e7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -837,7 +837,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-
 	@Override
 	public String toString() {
 		return getName();

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 6ad7708..7c38d8d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -27,9 +27,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointListener;
@@ -47,19 +50,25 @@ import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.After;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -84,22 +93,35 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
 	private static TestStreamEnvironment env;
 
+	private static TestingServer zkServer;
+
 	@Rule
 	public TemporaryFolder tempFolder = new TemporaryFolder();
 
 	private StateBackendEnum stateBackendEnum;
 	private AbstractStateBackend stateBackend;
 
-	AbstractEventTimeWindowCheckpointingITCase(StateBackendEnum stateBackendEnum) {
+	AbstractEventTimeWindowCheckpointingITCase(StateBackendEnum stateBackendEnum) throws IOException {
 		this.stateBackendEnum = stateBackendEnum;
 	}
 
 	enum StateBackendEnum {
-		MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, MEM_ASYNC, FILE_ASYNC
+		MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC
 	}
 
-	@BeforeClass
-	public static void startTestCluster() {
+	@Before
+	public void startTestCluster() throws Exception {
+
+		// Testing HA Scenario / ZKCompletedCheckpointStore with incremental checkpoints
+		if (ROCKSDB_INCREMENTAL_ZK.equals(stateBackendEnum)) {
+			zkServer = new TestingServer();
+			zkServer.start();
+		}
+
+		TemporaryFolder temporaryFolder = new TemporaryFolder();
+		temporaryFolder.create();
+		final File haDir = temporaryFolder.newFolder();
+
 		Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
@@ -107,21 +129,28 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 		// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
 		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 80L << 20); // 80 MB
 
-		cluster = new LocalFlinkMiniCluster(config, false);
+		if (zkServer != null) {
+			config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+			config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
+			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
+		}
+
+		// purposefully delay in the executor to tease out races
+		final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
+		HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+			config,
+			new Executor() {
+				@Override
+				public void execute(Runnable command) {
+					executor.schedule(command, 500, MILLISECONDS);
+				}
+			});
+
+		cluster = new LocalFlinkMiniCluster(config, haServices, false);
 		cluster.start();
 
 		env = new TestStreamEnvironment(cluster, PARALLELISM);
-	}
-
-	@AfterClass
-	public static void stopTestCluster() {
-		if (cluster != null) {
-			cluster.stop();
-		}
-	}
 
-	@Before
-	public void initStateBackend() throws IOException {
 		switch (stateBackendEnum) {
 			case MEM:
 				this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
@@ -146,7 +175,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 				this.stateBackend = rdb;
 				break;
 			}
-			case ROCKSDB_INCREMENTAL: {
+			case ROCKSDB_INCREMENTAL:
+			case ROCKSDB_INCREMENTAL_ZK: {
 				String rocksDb = tempFolder.newFolder().getAbsolutePath();
 				String backups = tempFolder.newFolder().getAbsolutePath();
 				// we use the fs backend with small threshold here to test the behaviour with file
@@ -160,7 +190,21 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 				this.stateBackend = rdb;
 				break;
 			}
+			default:
+				throw new IllegalStateException("No backend selected.");
+		}
+	}
+
+	@After
+	public void stopTestCluster() throws IOException {
+		if (cluster != null) {
+			cluster.stop();
+			cluster = null;
+		}
 
+		if (zkServer != null) {
+			zkServer.stop();
+			zkServer = null;
 		}
 	}
 
@@ -172,7 +216,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 		final int WINDOW_SIZE = windowSize();
 		final int NUM_KEYS = numKeys();
 		FailingSource.reset();
-		
+
 		try {
 			env.setParallelism(PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -506,7 +550,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 		}
 	}
 
-
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
@@ -667,7 +710,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
 			assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
 
-
 			Integer curr = windowCounts.get(value.f0);
 			if (curr != null) {
 				windowCounts.put(value.f0, curr + 1);
@@ -754,7 +796,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 				windowCounts.put(value.f0, 1);
 			}
 
-
 			// verify the contents of that window, the contents should be:
 			// (key + num windows so far)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
index a5bf10c..9abbddd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.test.checkpointing;
 
+import java.io.IOException;
+
 public class AsyncFileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
 
-	public AsyncFileBackendEventTimeWindowCheckpointingITCase() {
+	public AsyncFileBackendEventTimeWindowCheckpointingITCase() throws IOException {
 		super(StateBackendEnum.FILE_ASYNC);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
index ef9ad37..62041a5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.test.checkpointing;
 
+import java.io.IOException;
+
 public class AsyncMemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
 
-	public AsyncMemBackendEventTimeWindowCheckpointingITCase() {
+	public AsyncMemBackendEventTimeWindowCheckpointingITCase() throws IOException {
 		super(StateBackendEnum.MEM_ASYNC);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
index 65fda09..3111f05 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.test.checkpointing;
 
+import java.io.IOException;
+
 public class FileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
 
-	public FileBackendEventTimeWindowCheckpointingITCase() {
+	public FileBackendEventTimeWindowCheckpointingITCase() throws IOException {
 		super(StateBackendEnum.FILE);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..8e23909
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import java.io.IOException;
+
+/**
+ * Integration tests for incremental RocksDB backend.
+ */
+public class HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
+
+	public HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() throws IOException {
+		super(StateBackendEnum.ROCKSDB_INCREMENTAL_ZK);
+	}
+
+	@Override
+	protected int numElementsPerKey() {
+		return 3000;
+	}
+
+	@Override
+	protected int windowSize() {
+		return 1000;
+	}
+
+	@Override
+	protected int windowSlide() {
+		return 100;
+	}
+
+	@Override
+	protected int numKeys() {
+		return 100;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
index 352f9f7..2cdfbe7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.test.checkpointing;
 
+import java.io.IOException;
+
 public class IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
 
-	public IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() {
+	public IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() throws IOException {
 		super(StateBackendEnum.ROCKSDB_INCREMENTAL);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
index 899b8d6..701b746 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.test.checkpointing;
 
+import java.io.IOException;
+
 public class MemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
 
-	public MemBackendEventTimeWindowCheckpointingITCase() {
+	public MemBackendEventTimeWindowCheckpointingITCase() throws IOException {
 		super(StateBackendEnum.MEM);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
index da2bbc7..b7cbaa9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.test.checkpointing;
 
+import java.io.IOException;
+
 public class RocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
 
-	public RocksDbBackendEventTimeWindowCheckpointingITCase() {
+	public RocksDbBackendEventTimeWindowCheckpointingITCase() throws IOException {
 		super(StateBackendEnum.ROCKSDB_FULLY_ASYNC);
 	}
 


[2/2] flink git commit: [FLINK-7268] [checkpoints] Scope SharedStateRegistry objects per (re)start

Posted by sr...@apache.org.
[FLINK-7268] [checkpoints] Scope SharedStateRegistry objects per (re)start


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09caa9ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09caa9ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09caa9ff

Branch: refs/heads/release-1.3
Commit: 09caa9ffdc8168610c7d0260360c034ea87f904c
Parents: 0225db2
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Jul 25 12:04:16 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Jul 28 15:42:28 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |  13 +-
 .../checkpoint/CheckpointCoordinator.java       |  32 +-
 .../runtime/checkpoint/CompletedCheckpoint.java |   3 +-
 .../checkpoint/CompletedCheckpointStore.java    |   5 +-
 .../StandaloneCompletedCheckpointStore.java     |   4 +-
 .../ZooKeeperCompletedCheckpointStore.java      |  12 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   6 +-
 .../state/IncrementalKeyedStateHandle.java      |  68 ++-
 .../runtime/state/KeyGroupsStateHandle.java     |   2 +-
 .../runtime/state/MultiStreamStateHandle.java   |  10 +-
 .../runtime/state/SharedStateRegistry.java      |  52 ++-
 .../state/SharedStateRegistryFactory.java       |  35 ++
 .../state/memory/ByteStreamStateHandle.java     |   1 +
 ...tCoordinatorExternalizedCheckpointsTest.java |  22 +-
 .../CheckpointCoordinatorFailureTest.java       |   7 +-
 .../CheckpointCoordinatorMasterHooksTest.java   |   7 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 437 ++++++++++---------
 .../checkpoint/CheckpointStateRestoreTest.java  |  10 +-
 ...ZooKeeperCompletedCheckpointStoreITCase.java |  25 +-
 .../ZooKeeperCompletedCheckpointStoreTest.java  |   7 +-
 .../state/IncrementalKeyedStateHandleTest.java  |  75 +++-
 .../RecoverableCompletedCheckpointStore.java    |  33 +-
 .../streaming/runtime/tasks/StreamTask.java     |   1 -
 ...tractEventTimeWindowCheckpointingITCase.java |  85 +++-
 ...ckendEventTimeWindowCheckpointingITCase.java |   4 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |   4 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |   4 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |  51 +++
 ...ckendEventTimeWindowCheckpointingITCase.java |   4 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |   4 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |   4 +-
 31 files changed, 688 insertions(+), 339 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index a6b53ec..7e0910e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -231,7 +231,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		this.restoredKvStateMetaInfos = new HashMap<>();
 		this.materializedSstFiles = new TreeMap<>();
 		this.backendUID = UUID.randomUUID();
-		LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);
+
+		LOG.debug("Setting initial backend ID in RocksDBKeyedStateBackend for operator {} to {}.",
+			this.operatorIdentifier,
+			this.backendUID);
 	}
 
 	/**
@@ -835,11 +838,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		void takeSnapshot() throws Exception {
 			assert (Thread.holdsLock(stateBackend.asyncSnapshotLock));
 
+			final long lastCompletedCheckpoint;
+
 			// use the last completed checkpoint as the comparison base.
 			synchronized (stateBackend.materializedSstFiles) {
-				baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
+				lastCompletedCheckpoint = stateBackend.lastCompletedCheckpointId;
+				baseSstFiles = stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
 			}
 
+			LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " +
+				"assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
+
 			// save meta data
 			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
 					: stateBackend.kvStateInformation.entrySet()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 82933ac..fe94d25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.util.Preconditions;
@@ -174,8 +175,11 @@ public class CheckpointCoordinator {
 	@Nullable
 	private CheckpointStatsTracker statsTracker;
 
+	/** A factory for SharedStateRegistry objects */
+	private final SharedStateRegistryFactory sharedStateRegistryFactory;
+
 	/** Registry that tracks state which is shared across (incremental) checkpoints */
-	private final SharedStateRegistry sharedStateRegistry;
+	private SharedStateRegistry sharedStateRegistry;
 
 	// --------------------------------------------------------------------------------------------
 
@@ -192,7 +196,8 @@ public class CheckpointCoordinator {
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore completedCheckpointStore,
 			@Nullable String checkpointDirectory,
-			Executor executor) {
+			Executor executor,
+			SharedStateRegistryFactory sharedStateRegistryFactory) {
 
 		// sanity checks
 		checkArgument(baseInterval > 0, "Checkpoint timeout must be larger than zero");
@@ -230,7 +235,8 @@ public class CheckpointCoordinator {
 		this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
 		this.checkpointDirectory = checkpointDirectory;
 		this.executor = checkNotNull(executor);
-		this.sharedStateRegistry = new SharedStateRegistry(executor);
+		this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);
+		this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
 
 		this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
 		this.masterHooks = new HashMap<>();
@@ -1044,10 +1050,23 @@ public class CheckpointCoordinator {
 				throw new IllegalStateException("CheckpointCoordinator is shut down");
 			}
 
-			// Recover the checkpoints
-			completedCheckpointStore.recover(sharedStateRegistry);
+			// We create a new shared state registry object, so that all pending async disposal requests from previous
+			// runs will go against the old object (were they can do no harm).
+			// This must happen under the checkpoint lock.
+			sharedStateRegistry.close();
+			sharedStateRegistry = sharedStateRegistryFactory.create(executor);
+
+			// Recover the checkpoints, TODO this could be done only when there is a new leader, not on each recovery
+			completedCheckpointStore.recover();
+
+			// Now, we re-register all (shared) states from the checkpoint store with the new registry
+			for (CompletedCheckpoint completedCheckpoint : completedCheckpointStore.getAllCheckpoints()) {
+				completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
+			}
+
+			LOG.debug("Status of the shared state registry after restore: {}.", sharedStateRegistry);
 
-			// restore from the latest checkpoint
+			// Restore from the latest checkpoint
 			CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
 
 			if (latest == null) {
@@ -1121,7 +1140,6 @@ public class CheckpointCoordinator {
 		CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint(
 				job, tasks, savepointPath, userClassLoader, allowNonRestored);
 
-		savepoint.registerSharedStatesAfterRestored(sharedStateRegistry);
 		completedCheckpointStore.addCheckpoint(savepoint);
 		
 		// Reset the checkpoint ID counter

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 56aa19d..76d1580 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -209,6 +209,8 @@ public class CompletedCheckpoint implements Serializable {
 
 	private void doDiscard() throws Exception {
 
+		LOG.trace("Executing discard procedure for {}.", this);
+
 		try {
 			// collect exceptions and continue cleanup
 			Exception exception = null;
@@ -225,7 +227,6 @@ public class CompletedCheckpoint implements Serializable {
 			// discard private state objects
 			try {
 				Collection<OperatorState> values = operatorStates.values();
-				LOG.trace("About to discard operator states {}.", values);
 				StateUtil.bestEffortDiscardAllStateObjects(values);
 			} catch (Exception e) {
 				exception = ExceptionUtils.firstOrSuppressed(e, exception);

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index 45d407e..82193b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.state.SharedStateRegistry;
 
 import java.util.List;
 
@@ -33,10 +32,8 @@ public interface CompletedCheckpointStore {
 	 *
 	 * <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest
 	 * available checkpoint.
-	 *
-	 * @param sharedStateRegistry the shared state registry to register recovered states.
 	 */
-	void recover(SharedStateRegistry sharedStateRegistry) throws Exception;
+	void recover() throws Exception;
 
 	/**
 	 * Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints.

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index fbb0198..63e7468 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.state.SharedStateRegistry;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,7 +57,7 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
 	}
 
 	@Override
-	public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
+	public void recover() throws Exception {
 		// Nothing to do
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index c4cb6bc..88dd0d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -18,20 +18,21 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.FlinkException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -138,14 +139,13 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	 * that the history of checkpoints is consistent.
 	 */
 	@Override
-	public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
+	public void recover() throws Exception {
 		LOG.info("Recovering checkpoints from ZooKeeper.");
 
 		// Clear local handles in order to prevent duplicates on
 		// recovery. The local handles should reflect the state
 		// of ZooKeeper.
 		completedCheckpoints.clear();
-		sharedStateRegistry.clear();
 
 		// Get all there is first
 		List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
@@ -170,8 +170,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 			try {
 				completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
 				if (completedCheckpoint != null) {
-					// Re-register all shared states in the checkpoint.
-					completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
 					completedCheckpoints.add(completedCheckpoint);
 				}
 			} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index f9d2d69..c105d2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -66,6 +66,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializedThrowable;
@@ -74,8 +75,8 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
-
 import org.apache.flink.util.StringUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -459,7 +460,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			checkpointIDCounter,
 			checkpointStore,
 			checkpointDir,
-			ioExecutor);
+			ioExecutor,
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		// register the master hooks on the checkpoint coordinator
 		for (MasterTriggerRestoreHook<?> hook : masterHooks) {

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
index 0085890..0268b10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
@@ -65,27 +65,27 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 	private final UUID backendIdentifier;
 
 	/**
-	 * The key-group range covered by this state handle
+	 * The key-group range covered by this state handle.
 	 */
 	private final KeyGroupRange keyGroupRange;
 
 	/**
-	 * The checkpoint Id
+	 * The checkpoint Id.
 	 */
 	private final long checkpointId;
 
 	/**
-	 * Shared state in the incremental checkpoint. This i
+	 * Shared state in the incremental checkpoint.
 	 */
 	private final Map<StateHandleID, StreamStateHandle> sharedState;
 
 	/**
-	 * Private state in the incremental checkpoint
+	 * Private state in the incremental checkpoint.
 	 */
 	private final Map<StateHandleID, StreamStateHandle> privateState;
 
 	/**
-	 * Primary meta data state of the incremental checkpoint
+	 * Primary meta data state of the incremental checkpoint.
 	 */
 	private final StreamStateHandle metaStateHandle;
 
@@ -143,16 +143,21 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 
 	@Override
 	public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
-		if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
-			return this;
-		} else {
-			return null;
-		}
+		return KeyGroupRange.EMPTY_KEY_GROUP_RANGE.equals(this.keyGroupRange.getIntersection(keyGroupRange)) ?
+			null : this;
 	}
 
 	@Override
 	public void discardState() throws Exception {
 
+		SharedStateRegistry registry = this.sharedStateRegistry;
+		final boolean isRegistered = (registry != null);
+
+		LOG.trace("Discarding IncrementalKeyedStateHandle (registered = {}) for checkpoint {} from backend with id {}.",
+			isRegistered,
+			checkpointId,
+			backendIdentifier);
+
 		try {
 			metaStateHandle.discardState();
 		} catch (Exception e) {
@@ -168,19 +173,20 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 		// If this was not registered, we can delete the shared state. We can simply apply this
 		// to all handles, because all handles that have not been created for the first time for this
 		// are only placeholders at this point (disposing them is a NOP).
-		if (sharedStateRegistry == null) {
-			try {
-				StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
-			} catch (Exception e) {
-				LOG.warn("Could not properly discard new sst file states.", e);
-			}
-		} else {
+		if (isRegistered) {
 			// If this was registered, we only unregister all our referenced shared states
 			// from the registry.
 			for (StateHandleID stateHandleID : sharedState.keySet()) {
-				sharedStateRegistry.unregisterReference(
+				registry.unregisterReference(
 					createSharedStateRegistryKeyFromFileName(stateHandleID));
 			}
+		} else {
+			// Otherwise, we assume to own those handles and dispose them directly.
+			try {
+				StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
+			} catch (Exception e) {
+				LOG.warn("Could not properly discard new sst file states.", e);
+			}
 		}
 	}
 
@@ -202,10 +208,21 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 	@Override
 	public void registerSharedStates(SharedStateRegistry stateRegistry) {
 
-		Preconditions.checkState(sharedStateRegistry == null, "The state handle has already registered its shared states.");
+		// This is a quick check to avoid that we register twice with the same registry. However, the code allows to
+		// register again with a different registry. The implication is that ownership is transferred to this new
+		// registry. This should only happen in case of a restart, when the CheckpointCoordinator creates a new
+		// SharedStateRegistry for the current attempt and the old registry becomes meaningless. We also assume that
+		// an old registry object from a previous run is due to be GCed and will never be used for registration again.
+		Preconditions.checkState(
+			sharedStateRegistry != stateRegistry,
+			"The state handle has already registered its shared states to the given registry.");
 
 		sharedStateRegistry = Preconditions.checkNotNull(stateRegistry);
 
+		LOG.trace("Registering IncrementalKeyedStateHandle for checkpoint {} from backend with id {}.",
+			checkpointId,
+			backendIdentifier);
+
 		for (Map.Entry<StateHandleID, StreamStateHandle> sharedStateHandle : sharedState.entrySet()) {
 			SharedStateRegistryKey registryKey =
 				createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey());
@@ -284,5 +301,18 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 		result = 31 * result + getMetaStateHandle().hashCode();
 		return result;
 	}
+
+	@Override
+	public String toString() {
+		return "IncrementalKeyedStateHandle{" +
+			"backendIdentifier=" + backendIdentifier +
+			", keyGroupRange=" + keyGroupRange +
+			", checkpointId=" + checkpointId +
+			", sharedState=" + sharedState +
+			", privateState=" + privateState +
+			", metaStateHandle=" + metaStateHandle +
+			", registered=" + (sharedStateRegistry != null) +
+			'}';
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index 8e38ad4..8092f6c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -141,7 +141,7 @@ public class KeyGroupsStateHandle implements StreamStateHandle, KeyedStateHandle
 	public String toString() {
 		return "KeyGroupsStateHandle{" +
 				"groupRangeOffsets=" + groupRangeOffsets +
-				", data=" + stateHandle +
+				", stateHandle=" + stateHandle +
 				'}';
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
index b95dace..1960c1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
@@ -38,7 +38,7 @@ public class MultiStreamStateHandle implements StreamStateHandle {
 	private final List<StreamStateHandle> stateHandles;
 	private final long stateSize;
 
-	public MultiStreamStateHandle(List<StreamStateHandle> stateHandles) throws IOException {
+	public MultiStreamStateHandle(List<StreamStateHandle> stateHandles) {
 		this.stateHandles = Preconditions.checkNotNull(stateHandles);
 		long calculateSize = 0L;
 		for(StreamStateHandle stateHandle : stateHandles) {
@@ -62,6 +62,14 @@ public class MultiStreamStateHandle implements StreamStateHandle {
 		return stateSize;
 	}
 
+	@Override
+	public String toString() {
+		return "MultiStreamStateHandle{" +
+			"stateHandles=" + stateHandles +
+			", stateSize=" + stateSize +
+			'}';
+	}
+
 	static final class MultiFSDataInputStream extends AbstractMultiFSDataInputStream {
 
 		private final TreeMap<Long, StreamStateHandle> stateHandleMap;

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index e0ca873..347f30c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -38,13 +38,24 @@ import java.util.concurrent.Executor;
  * maintain the reference count of {@link StreamStateHandle}s by a key that (logically) identifies
  * them.
  */
-public class SharedStateRegistry {
+public class SharedStateRegistry implements AutoCloseable {
 
 	private static final Logger LOG = LoggerFactory.getLogger(SharedStateRegistry.class);
 
+	/** A singleton object for the default implementation of a {@link SharedStateRegistryFactory} */
+	public static final SharedStateRegistryFactory DEFAULT_FACTORY = new SharedStateRegistryFactory() {
+		@Override
+		public SharedStateRegistry create(Executor deleteExecutor) {
+			return new SharedStateRegistry(deleteExecutor);
+		}
+	};
+
 	/** All registered state objects by an artificial key */
 	private final Map<SharedStateRegistryKey, SharedStateRegistry.SharedStateEntry> registeredStates;
 
+	/** This flag indicates whether or not the registry is open or if close() was called */
+	private boolean open;
+
 	/** Executor for async state deletion */
 	private final Executor asyncDisposalExecutor;
 
@@ -56,6 +67,7 @@ public class SharedStateRegistry {
 	public SharedStateRegistry(Executor asyncDisposalExecutor) {
 		this.registeredStates = new HashMap<>();
 		this.asyncDisposalExecutor = Preconditions.checkNotNull(asyncDisposalExecutor);
+		this.open = true;
 	}
 
 	/**
@@ -82,6 +94,9 @@ public class SharedStateRegistry {
 		SharedStateRegistry.SharedStateEntry entry;
 
 		synchronized (registeredStates) {
+
+			Preconditions.checkState(open, "Attempt to register state to closed SharedStateRegistry.");
+
 			entry = registeredStates.get(registrationKey);
 
 			if (entry == null) {
@@ -96,6 +111,11 @@ public class SharedStateRegistry {
 				// delete if this is a real duplicate
 				if (!Objects.equals(state, entry.stateHandle)) {
 					scheduledStateDeletion = state;
+					LOG.trace("Identified duplicate state registration under key {}. New state {} was determined to " +
+							"be an unnecessary copy of existing state {} and will be dropped.",
+						registrationKey,
+						state,
+						entry.stateHandle);
 				}
 				entry.increaseReferenceCount();
 			}
@@ -112,7 +132,8 @@ public class SharedStateRegistry {
 	 *
 	 * @param registrationKey the shared state for which we release a reference.
 	 * @return the result of the request, consisting of the reference count after this operation
-	 * and the state handle, or null if the state handle was deleted through this request.
+	 * and the state handle, or null if the state handle was deleted through this request. Returns null if the registry
+	 * was previously closed.
 	 */
 	public Result unregisterReference(SharedStateRegistryKey registrationKey) {
 
@@ -123,6 +144,7 @@ public class SharedStateRegistry {
 		SharedStateRegistry.SharedStateEntry entry;
 
 		synchronized (registeredStates) {
+
 			entry = registeredStates.get(registrationKey);
 
 			Preconditions.checkState(entry != null,
@@ -164,10 +186,18 @@ public class SharedStateRegistry {
 		}
 	}
 
+	@Override
+	public String toString() {
+		synchronized (registeredStates) {
+			return "SharedStateRegistry{" +
+				"registeredStates=" + registeredStates +
+				'}';
+		}
+	}
+
 	private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
 		// We do the small optimization to not issue discards for placeholders, which are NOPs.
 		if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) {
-
 			LOG.trace("Scheduled delete of state handle {}.", streamStateHandle);
 			asyncDisposalExecutor.execute(
 				new SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle));
@@ -178,6 +208,13 @@ public class SharedStateRegistry {
 		return stateHandle instanceof PlaceholderStreamStateHandle;
 	}
 
+	@Override
+	public void close() {
+		synchronized (registeredStates) {
+			open = false;
+		}
+	}
+
 	/**
 	 * An entry in the registry, tracking the handle and the corresponding reference count.
 	 */
@@ -279,13 +316,4 @@ public class SharedStateRegistry {
 			}
 		}
 	}
-
-	/**
-	 * Clears the registry.
-	 */
-	public void clear() {
-		synchronized (registeredStates) {
-			registeredStates.clear();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
new file mode 100644
index 0000000..05c9825
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Simple factory to produce {@link SharedStateRegistry} objects.
+ */
+public interface SharedStateRegistryFactory {
+
+	/**
+	 * Factory method for {@link SharedStateRegistry}.
+	 *
+	 * @param deleteExecutor executor used to run (async) deletes.
+	 * @return a SharedStateRegistry object
+	 */
+	SharedStateRegistry create(Executor deleteExecutor);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
index 9ba9d35..3a43d4f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
@@ -95,6 +95,7 @@ public class ByteStreamStateHandle implements StreamStateHandle {
 	public String toString() {
 		return "ByteStreamStateHandle{" +
 			"handleName='" + handleName + '\'' +
+			", dataBytes=" + data.length +
 			'}';
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
index d293eea..edc29fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
 import org.apache.flink.runtime.concurrent.Executors;
@@ -37,11 +29,22 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 /**
  * CheckpointCoordinator tests for externalized checkpoints.
  *
@@ -91,7 +94,8 @@ public class CheckpointCoordinatorExternalizedCheckpointsTest {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			checkpointDir.getAbsolutePath(),
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
 		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 344b340..5cca94f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -78,7 +78,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new FailingCompletedCheckpointStore(),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		coord.triggerCheckpoint(triggerTimestamp, false);
 
@@ -113,7 +114,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 		when(subtaskState.getManagedKeyedState()).thenReturn(managedRawHandle);
 		
 		AcknowledgeCheckpoint acknowledgeMessage = new AcknowledgeCheckpoint(jid, executionAttemptId, checkpointId, new CheckpointMetrics(), subtaskState);
-		
+
 		try {
 			coord.receiveAcknowledgeMessage(acknowledgeMessage);
 			fail("Expected a checkpoint exception because the completed checkpoint store could not " +
@@ -136,7 +137,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 	private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore {
 
 		@Override
-		public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
+		public void recover() throws Exception {
 			throw new UnsupportedOperationException("Not implemented.");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index d6daa4e..94063a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -30,9 +30,9 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 
 import org.junit.Test;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -47,14 +47,12 @@ import java.util.List;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.mockExecutionVertex;
-
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.any;
@@ -405,7 +403,8 @@ public class CheckpointCoordinatorMasterHooksTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(10),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 	}
 
 	private static <T> T mockGeneric(Class<?> clazz) {

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 186a819..16a89ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -56,6 +54,9 @@ import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -140,7 +141,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -200,7 +202,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -251,7 +254,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -303,7 +307,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -407,7 +412,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -526,7 +532,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -698,7 +705,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -828,7 +836,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(10),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -992,7 +1001,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			// trigger a checkpoint, partially acknowledged
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -1019,8 +1029,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				Thread.sleep(250);
 			}
 			while (!checkpoint.isDiscarded() &&
-					coord.getNumberOfPendingCheckpoints() > 0 &&
-					System.currentTimeMillis() < deadline);
+				coord.getNumberOfPendingCheckpoints() > 0 &&
+				System.currentTimeMillis() < deadline);
 
 			assertTrue("Checkpoint was not canceled by the timeout", checkpoint.isDiscarded());
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -1071,7 +1081,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -1134,7 +1145,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -1274,7 +1286,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 
 			coord.startCheckpointScheduler();
@@ -1296,7 +1309,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			int numCallsSoFar = numCalls.get();
 			Thread.sleep(400);
 			assertTrue(numCallsSoFar == numCalls.get() ||
-					numCallsSoFar+1 == numCalls.get());
+				numCallsSoFar+1 == numCalls.get());
 
 			// start another sequence of periodic scheduling
 			numCalls.set(0);
@@ -1318,7 +1331,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			numCallsSoFar = numCalls.get();
 			Thread.sleep(400);
 			assertTrue(numCallsSoFar == numCalls.get() ||
-					numCallsSoFar + 1 == numCalls.get());
+				numCallsSoFar + 1 == numCalls.get());
 
 			coord.shutdown(JobStatus.FINISHED);
 		}
@@ -1354,19 +1367,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		final long delay = 50;
 
 		final CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				2,           // periodic interval is 2 ms
-				200_000,     // timeout is very long (200 s)
-				delay,       // 50 ms delay between checkpoints
-				1,
-				ExternalizedCheckpointSettings.none(),
-				new ExecutionVertex[] { vertex },
-				new ExecutionVertex[] { vertex },
-				new ExecutionVertex[] { vertex },
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(2),
-				"dummy-path",
-				Executors.directExecutor());
+			jid,
+			2,           // periodic interval is 2 ms
+			200_000,     // timeout is very long (200 s)
+			delay,       // 50 ms delay between checkpoints
+			1,
+			ExternalizedCheckpointSettings.none(),
+			new ExecutionVertex[] { vertex },
+			new ExecutionVertex[] { vertex },
+			new ExecutionVertex[] { vertex },
+			new StandaloneCheckpointIDCounter(),
+			new StandaloneCompletedCheckpointStore(2),
+			"dummy-path",
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		try {
 			coord.startCheckpointScheduler();
@@ -1439,7 +1453,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
 		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1596,7 +1611,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			counter,
 			new StandaloneCompletedCheckpointStore(10),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
@@ -1702,7 +1718,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			coord.startCheckpointScheduler();
 
@@ -1715,12 +1732,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				Thread.sleep(20);
 			}
 			while ((now = System.currentTimeMillis()) < minDuration ||
-					(numCalls.get() < maxConcurrentAttempts && now < timeout));
+				(numCalls.get() < maxConcurrentAttempts && now < timeout));
 
 			assertEquals(maxConcurrentAttempts, numCalls.get());
 
 			verify(triggerVertex.getCurrentExecutionAttempt(), times(maxConcurrentAttempts))
-					.triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
+				.triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
 
 			// now, once we acknowledge one checkpoint, it should trigger the next one
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L));
@@ -1775,7 +1792,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			coord.startCheckpointScheduler();
 
@@ -1788,7 +1806,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				Thread.sleep(20);
 			}
 			while ((now = System.currentTimeMillis()) < minDuration ||
-					(coord.getNumberOfPendingCheckpoints() < maxConcurrentAttempts && now < timeout));
+				(coord.getNumberOfPendingCheckpoints() < maxConcurrentAttempts && now < timeout));
 
 			// validate that the pending checkpoints are there
 			assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints());
@@ -1806,7 +1824,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				Thread.sleep(20);
 			}
 			while (coord.getPendingCheckpoints().get(4L) == null &&
-					System.currentTimeMillis() < newTimeout);
+				System.currentTimeMillis() < newTimeout);
 
 			// do the final check
 			assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints());
@@ -1837,12 +1855,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 			final AtomicReference<ExecutionState> currentState = new AtomicReference<>(ExecutionState.CREATED);
 			when(triggerVertex.getCurrentExecutionAttempt().getState()).thenAnswer(
-					new Answer<ExecutionState>() {
-						@Override
-						public ExecutionState answer(InvocationOnMock invocation){
-							return currentState.get();
-						}
-					});
+				new Answer<ExecutionState>() {
+					@Override
+					public ExecutionState answer(InvocationOnMock invocation){
+						return currentState.get();
+					}
+				});
 
 			CheckpointCoordinator coord = new CheckpointCoordinator(
 				jid,
@@ -1857,7 +1875,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			coord.startCheckpointScheduler();
 
@@ -1874,7 +1893,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				Thread.sleep(20);
 			}
 			while (System.currentTimeMillis() < timeout &&
-					coord.getNumberOfPendingCheckpoints() == 0);
+				coord.getNumberOfPendingCheckpoints() == 0);
 
 			assertTrue(coord.getNumberOfPendingCheckpoints() > 0);
 		}
@@ -1909,7 +1928,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			checkpointIDCounter,
 			new StandaloneCompletedCheckpointStore(2),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		List<Future<CompletedCheckpoint>> savepointFutures = new ArrayList<>();
 
@@ -1962,7 +1982,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(2),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
@@ -2006,7 +2027,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
 
 		ExecutionVertex[] arrayExecutionVertices =
-				allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
+			allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
 
 		CompletedCheckpointStore store = new RecoverableCompletedCheckpointStore();
 
@@ -2024,7 +2045,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			store,
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp, false);
@@ -2051,11 +2073,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			SubtaskState subtaskState = mockSubtaskState(jobVertexID1, index, keyGroupPartitions1.get(index));
 
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
-					jid,
-					jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
-					new CheckpointMetrics(),
-					subtaskState);
+				jid,
+				jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+				checkpointId,
+				new CheckpointMetrics(),
+				subtaskState);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
 		}
@@ -2064,11 +2086,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			SubtaskState subtaskState = mockSubtaskState(jobVertexID2, index, keyGroupPartitions2.get(index));
 
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
-					jid,
-					jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
-					new CheckpointMetrics(),
-					subtaskState);
+				jid,
+				jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+				checkpointId,
+				new CheckpointMetrics(),
+				subtaskState);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
 		}
@@ -2150,7 +2172,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp, false);
@@ -2167,11 +2190,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			KeyGroupsStateHandle keyGroupState = generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), false);
 			SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null);
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
-					jid,
-					jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
-					new CheckpointMetrics(),
-					checkpointStateHandles);
+				jid,
+				jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+				checkpointId,
+				new CheckpointMetrics(),
+				checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
 		}
@@ -2182,11 +2205,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			KeyGroupsStateHandle keyGroupState = generateKeyGroupState(jobVertexID2, keyGroupPartitions2.get(index), false);
 			SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null);
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
-					jid,
-					jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
-					new CheckpointMetrics(),
-					checkpointStateHandles);
+				jid,
+				jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+				checkpointId,
+				new CheckpointMetrics(),
+				checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
 		}
@@ -2251,7 +2274,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
 
 		ExecutionVertex[] arrayExecutionVertices =
-				allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
+			allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
 
 		// set up the coordinator and validate the initial state
 		CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -2267,7 +2290,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp, false);
@@ -2277,22 +2301,22 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
 
 		List<KeyGroupRange> keyGroupPartitions1 =
-				StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
+			StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
 		List<KeyGroupRange> keyGroupPartitions2 =
-				StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2);
+			StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2);
 
 		for (int index = 0; index < jobVertex1.getParallelism(); index++) {
 			ChainedStateHandle<StreamStateHandle> valueSizeTuple = generateStateForVertex(jobVertexID1, index);
 			KeyGroupsStateHandle keyGroupState = generateKeyGroupState(
-					jobVertexID1, keyGroupPartitions1.get(index), false);
+				jobVertexID1, keyGroupPartitions1.get(index), false);
 
 			SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null);
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
-					jid,
-					jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
-					new CheckpointMetrics(),
-					checkpointStateHandles);
+				jid,
+				jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+				checkpointId,
+				new CheckpointMetrics(),
+				checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
 		}
@@ -2302,15 +2326,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 			ChainedStateHandle<StreamStateHandle> state = generateStateForVertex(jobVertexID2, index);
 			KeyGroupsStateHandle keyGroupState = generateKeyGroupState(
-					jobVertexID2, keyGroupPartitions2.get(index), false);
+				jobVertexID2, keyGroupPartitions2.get(index), false);
 
 			SubtaskState checkpointStateHandles = new SubtaskState(state, null, null, keyGroupState, null);
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
-					jid,
-					jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
-					new CheckpointMetrics(),
-					checkpointStateHandles);
+				jid,
+				jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+				checkpointId,
+				new CheckpointMetrics(),
+				checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
 		}
@@ -2390,13 +2414,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		int newParallelism2 = scaleOut ? 13 : 2;
 
 		final ExecutionJobVertex jobVertex1 = mockExecutionJobVertex(
-				jobVertexID1,
-				parallelism1,
-				maxParallelism1);
+			jobVertexID1,
+			parallelism1,
+			maxParallelism1);
 		final ExecutionJobVertex jobVertex2 = mockExecutionJobVertex(
-				jobVertexID2,
-				parallelism2,
-				maxParallelism2);
+			jobVertexID2,
+			parallelism2,
+			maxParallelism2);
 
 		List<ExecutionVertex> allExecutionVertices = new ArrayList<>(parallelism1 + parallelism2);
 
@@ -2404,7 +2428,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
 
 		ExecutionVertex[] arrayExecutionVertices =
-				allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
+			allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
 
 		// set up the coordinator and validate the initial state
 		CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -2420,7 +2444,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		// trigger the checkpoint
 		coord.triggerCheckpoint(timestamp, false);
@@ -2430,9 +2455,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
 
 		List<KeyGroupRange> keyGroupPartitions1 =
-				StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
+			StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
 		List<KeyGroupRange> keyGroupPartitions2 =
-				StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2);
+			StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2);
 
 		//vertex 1
 		for (int index = 0; index < jobVertex1.getParallelism(); index++) {
@@ -2443,11 +2468,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 			SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, opStateBackend, null, keyedStateBackend, keyedStateRaw);
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
-					jid,
-					jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
-					new CheckpointMetrics(),
-					checkpointStateHandles);
+				jid,
+				jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+				checkpointId,
+				new CheckpointMetrics(),
+				checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
 		}
@@ -2463,14 +2488,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			expectedOpStatesBackend.add(opStateBackend);
 			expectedOpStatesRaw.add(opStateRaw);
 			SubtaskState checkpointStateHandles =
-					new SubtaskState(new ChainedStateHandle<>(
-							Collections.<StreamStateHandle>singletonList(null)), opStateBackend, opStateRaw, keyedStateBackend, keyedStateRaw);
+				new SubtaskState(new ChainedStateHandle<>(
+					Collections.<StreamStateHandle>singletonList(null)), opStateBackend, opStateRaw, keyedStateBackend, keyedStateRaw);
 			AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
-					jid,
-					jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-					checkpointId,
-					new CheckpointMetrics(),
-					checkpointStateHandles);
+				jid,
+				jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+				checkpointId,
+				new CheckpointMetrics(),
+				checkpointStateHandles);
 
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
 		}
@@ -2482,18 +2507,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
 
 		List<KeyGroupRange> newKeyGroupPartitions2 =
-				StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, newParallelism2);
+			StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, newParallelism2);
 
 		final ExecutionJobVertex newJobVertex1 = mockExecutionJobVertex(
-				jobVertexID1,
-				parallelism1,
-				maxParallelism1);
+			jobVertexID1,
+			parallelism1,
+			maxParallelism1);
 
 		// rescale vertex 2
 		final ExecutionJobVertex newJobVertex2 = mockExecutionJobVertex(
-				jobVertexID2,
-				newParallelism2,
-				maxParallelism2);
+			jobVertexID2,
+			newParallelism2,
+			maxParallelism2);
 
 		tasks.put(jobVertexID1, newJobVertex1);
 		tasks.put(jobVertexID2, newJobVertex2);
@@ -2534,7 +2559,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID);
 		return new Tuple2<>(jobVertexID, operatorID);
 	}
-	
+
 	/**
 	 * old topology
 	 * [operator1,operator2] * parallelism1 -> [operator3,operator4] * parallelism2
@@ -2575,7 +2600,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			OperatorState taskState = new OperatorState(id.f1, parallelism1, maxParallelism1);
 			operatorStates.put(id.f1, taskState);
 			for (int index = 0; index < taskState.getParallelism(); index++) {
-				StreamStateHandle subNonPartitionedState = 
+				StreamStateHandle subNonPartitionedState =
 					generateStateForVertex(id.f0, index)
 						.get(0);
 				OperatorStateHandle subManagedOperatorState =
@@ -2673,15 +2698,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			spy(new StandaloneCompletedCheckpointStore(1));
 
 		CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(
-				jobID,
-				2,
-				System.currentTimeMillis(),
-				System.currentTimeMillis() + 3000,
-				operatorStates,
-				Collections.<MasterState>emptyList(),
-				CheckpointProperties.forStandardCheckpoint(),
-				null,
-				null);
+			jobID,
+			2,
+			System.currentTimeMillis(),
+			System.currentTimeMillis() + 3000,
+			operatorStates,
+			Collections.<MasterState>emptyList(),
+			CheckpointProperties.forStandardCheckpoint(),
+			null,
+			null);
 
 		when(standaloneCompletedCheckpointStore.getLatestCheckpoint()).thenReturn(completedCheckpoint);
 
@@ -2699,7 +2724,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			standaloneCompletedCheckpointStore,
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		coord.restoreLatestCheckpointedState(tasks, false, true);
 
@@ -2832,7 +2858,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				"fake-directory",
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -2862,14 +2889,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
 		List<Collection<OperatorStateHandle>> repartitionedStates =
-				repartitioner.repartitionState(Collections.singletonList(osh), 3);
+			repartitioner.repartitionState(Collections.singletonList(osh), 3);
 
 		Map<String, Integer> checkCounts = new HashMap<>(3);
 
 		for (Collection<OperatorStateHandle> operatorStateHandles : repartitionedStates) {
 			for (OperatorStateHandle operatorStateHandle : operatorStateHandles) {
 				for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> stateNameToMetaInfo :
-						operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) {
+					operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) {
 
 					String stateName = stateNameToMetaInfo.getKey();
 					Integer count = checkCounts.get(stateName);
@@ -2900,8 +2927,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	// ------------------------------------------------------------------------
 
 	public static KeyGroupsStateHandle generateKeyGroupState(
-			JobVertexID jobVertexID,
-			KeyGroupRange keyGroupPartition, boolean rawState) throws IOException {
+		JobVertexID jobVertexID,
+		KeyGroupRange keyGroupPartition, boolean rawState) throws IOException {
 
 		List<Integer> testStatesLists = new ArrayList<>(keyGroupPartition.getNumberOfKeyGroups());
 
@@ -2918,27 +2945,27 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	}
 
 	public static KeyGroupsStateHandle generateKeyGroupState(
-			KeyGroupRange keyGroupRange,
-			List<? extends Serializable> states) throws IOException {
+		KeyGroupRange keyGroupRange,
+		List<? extends Serializable> states) throws IOException {
 
 		Preconditions.checkArgument(keyGroupRange.getNumberOfKeyGroups() == states.size());
 
 		Tuple2<byte[], List<long[]>> serializedDataWithOffsets =
-				serializeTogetherAndTrackOffsets(Collections.<List<? extends Serializable>>singletonList(states));
+			serializeTogetherAndTrackOffsets(Collections.<List<? extends Serializable>>singletonList(states));
 
 		KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange, serializedDataWithOffsets.f1.get(0));
 
 		ByteStreamStateHandle allSerializedStatesHandle = new TestByteStreamStateHandleDeepCompare(
-				String.valueOf(UUID.randomUUID()),
-				serializedDataWithOffsets.f0);
+			String.valueOf(UUID.randomUUID()),
+			serializedDataWithOffsets.f0);
 		KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(
-				keyGroupRangeOffsets,
-				allSerializedStatesHandle);
+			keyGroupRangeOffsets,
+			allSerializedStatesHandle);
 		return keyGroupsStateHandle;
 	}
 
 	public static Tuple2<byte[], List<long[]>> serializeTogetherAndTrackOffsets(
-			List<List<? extends Serializable>> serializables) throws IOException {
+		List<List<? extends Serializable>> serializables) throws IOException {
 
 		List<long[]> offsets = new ArrayList<>(serializables.size());
 		List<byte[]> serializedGroupValues = new ArrayList<>();
@@ -2962,19 +2989,19 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		runningGroupsOffset = 0;
 		for (byte[] serializedGroupValue : serializedGroupValues) {
 			System.arraycopy(
-					serializedGroupValue,
-					0,
-					allSerializedValuesConcatenated,
-					runningGroupsOffset,
-					serializedGroupValue.length);
+				serializedGroupValue,
+				0,
+				allSerializedValuesConcatenated,
+				runningGroupsOffset,
+				serializedGroupValue.length);
 			runningGroupsOffset += serializedGroupValue.length;
 		}
 		return new Tuple2<>(allSerializedValuesConcatenated, offsets);
 	}
 
 	public static ChainedStateHandle<StreamStateHandle> generateStateForVertex(
-			JobVertexID jobVertexID,
-			int index) throws IOException {
+		JobVertexID jobVertexID,
+		int index) throws IOException {
 
 		Random random = new Random(jobVertexID.hashCode() + index);
 		int value = random.nextInt();
@@ -2982,17 +3009,17 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	}
 
 	public static ChainedStateHandle<StreamStateHandle> generateChainedStateHandle(
-			Serializable value) throws IOException {
+		Serializable value) throws IOException {
 		return ChainedStateHandle.wrapSingleHandle(
-				TestByteStreamStateHandleDeepCompare.fromSerializable(String.valueOf(UUID.randomUUID()), value));
+			TestByteStreamStateHandleDeepCompare.fromSerializable(String.valueOf(UUID.randomUUID()), value));
 	}
 
 	public static ChainedStateHandle<OperatorStateHandle> generateChainedPartitionableStateHandle(
-			JobVertexID jobVertexID,
-			int index,
-			int namedStates,
-			int partitionsPerState,
-			boolean rawState) throws IOException {
+		JobVertexID jobVertexID,
+		int index,
+		int namedStates,
+		int partitionsPerState,
+		boolean rawState) throws IOException {
 
 		Map<String, List<? extends Serializable>> statesListsMap = new HashMap<>(namedStates);
 
@@ -3015,7 +3042,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	}
 
 	private static ChainedStateHandle<OperatorStateHandle> generateChainedPartitionableStateHandle(
-			Map<String, List<? extends Serializable>> states) throws IOException {
+		Map<String, List<? extends Serializable>> states) throws IOException {
 
 		List<List<? extends Serializable>> namedStateSerializables = new ArrayList<>(states.size());
 
@@ -3030,26 +3057,26 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		int idx = 0;
 		for (Map.Entry<String, List<? extends Serializable>> entry : states.entrySet()) {
 			offsetsMap.put(
-					entry.getKey(),
-					new OperatorStateHandle.StateMetaInfo(
-							serializationWithOffsets.f1.get(idx),
-							OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
+				entry.getKey(),
+				new OperatorStateHandle.StateMetaInfo(
+					serializationWithOffsets.f1.get(idx),
+					OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
 			++idx;
 		}
 
 		ByteStreamStateHandle streamStateHandle = new TestByteStreamStateHandleDeepCompare(
-				String.valueOf(UUID.randomUUID()),
-				serializationWithOffsets.f0);
+			String.valueOf(UUID.randomUUID()),
+			serializationWithOffsets.f0);
 
 		OperatorStateHandle operatorStateHandle =
-				new OperatorStateHandle(offsetsMap, streamStateHandle);
+			new OperatorStateHandle(offsetsMap, streamStateHandle);
 		return ChainedStateHandle.wrapSingleHandle(operatorStateHandle);
 	}
 
 	static ExecutionJobVertex mockExecutionJobVertex(
-			JobVertexID jobVertexID,
-			int parallelism,
-			int maxParallelism) {
+		JobVertexID jobVertexID,
+		int parallelism,
+		int maxParallelism) {
 
 		return mockExecutionJobVertex(
 			jobVertexID,
@@ -3131,7 +3158,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
 		when(jobVertex.getOperatorIDs()).thenReturn(jobVertexIDs);
-		
+
 		when(vertex.getJobVertex()).thenReturn(jobVertex);
 
 		return vertex;
@@ -3158,8 +3185,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	}
 
 	public static void verifyStateRestore(
-			JobVertexID jobVertexID, ExecutionJobVertex executionJobVertex,
-			List<KeyGroupRange> keyGroupPartitions) throws Exception {
+		JobVertexID jobVertexID, ExecutionJobVertex executionJobVertex,
+		List<KeyGroupRange> keyGroupPartitions) throws Exception {
 
 		for (int i = 0; i < executionJobVertex.getParallelism(); i++) {
 
@@ -3168,28 +3195,28 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			ChainedStateHandle<StreamStateHandle> expectNonPartitionedState = generateStateForVertex(jobVertexID, i);
 			ChainedStateHandle<StreamStateHandle> actualNonPartitionedState = taskStateHandles.getLegacyOperatorState();
 			assertTrue(CommonTestUtils.isSteamContentEqual(
-					expectNonPartitionedState.get(0).openInputStream(),
-					actualNonPartitionedState.get(0).openInputStream()));
+				expectNonPartitionedState.get(0).openInputStream(),
+				actualNonPartitionedState.get(0).openInputStream()));
 
 			ChainedStateHandle<OperatorStateHandle> expectedOpStateBackend =
-					generateChainedPartitionableStateHandle(jobVertexID, i, 2, 8, false);
+				generateChainedPartitionableStateHandle(jobVertexID, i, 2, 8, false);
 
 			List<Collection<OperatorStateHandle>> actualPartitionableState = taskStateHandles.getManagedOperatorState();
 
 			assertTrue(CommonTestUtils.isSteamContentEqual(
-					expectedOpStateBackend.get(0).openInputStream(),
-					actualPartitionableState.get(0).iterator().next().openInputStream()));
+				expectedOpStateBackend.get(0).openInputStream(),
+				actualPartitionableState.get(0).iterator().next().openInputStream()));
 
 			KeyGroupsStateHandle expectPartitionedKeyGroupState = generateKeyGroupState(
-					jobVertexID, keyGroupPartitions.get(i), false);
+				jobVertexID, keyGroupPartitions.get(i), false);
 			Collection<KeyedStateHandle> actualPartitionedKeyGroupState = taskStateHandles.getManagedKeyedState();
 			compareKeyedState(Collections.singletonList(expectPartitionedKeyGroupState), actualPartitionedKeyGroupState);
 		}
 	}
 
 	public static void compareKeyedState(
-			Collection<KeyGroupsStateHandle> expectPartitionedKeyGroupState,
-			Collection<? extends KeyedStateHandle> actualPartitionedKeyGroupState) throws Exception {
+		Collection<KeyGroupsStateHandle> expectPartitionedKeyGroupState,
+		Collection<? extends KeyedStateHandle> actualPartitionedKeyGroupState) throws Exception {
 
 		KeyGroupsStateHandle expectedHeadOpKeyGroupStateHandle = expectPartitionedKeyGroupState.iterator().next();
 		int expectedTotalKeyGroups = expectedHeadOpKeyGroupStateHandle.getKeyGroupRange().getNumberOfKeyGroups();
@@ -3207,7 +3234,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				long offset = expectedHeadOpKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
 				inputStream.seek(offset);
 				int expectedKeyGroupState =
-						InstantiationUtil.deserializeObject(inputStream, Thread.currentThread().getContextClassLoader());
+					InstantiationUtil.deserializeObject(inputStream, Thread.currentThread().getContextClassLoader());
 				for (KeyedStateHandle oneActualKeyedStateHandle : actualPartitionedKeyGroupState) {
 
 					assertTrue(oneActualKeyedStateHandle instanceof KeyGroupsStateHandle);
@@ -3218,7 +3245,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 						try (FSDataInputStream actualInputStream = oneActualKeyGroupStateHandle.openInputStream()) {
 							actualInputStream.seek(actualOffset);
 							int actualGroupState = InstantiationUtil.
-									deserializeObject(actualInputStream, Thread.currentThread().getContextClassLoader());
+								deserializeObject(actualInputStream, Thread.currentThread().getContextClassLoader());
 							assertEquals(expectedKeyGroupState, actualGroupState);
 						}
 					}
@@ -3228,8 +3255,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 	}
 
 	public static void comparePartitionableState(
-			List<ChainedStateHandle<OperatorStateHandle>> expected,
-			List<List<Collection<OperatorStateHandle>>> actual) throws Exception {
+		List<ChainedStateHandle<OperatorStateHandle>> expected,
+		List<List<Collection<OperatorStateHandle>>> actual) throws Exception {
 
 		List<String> expectedResult = new ArrayList<>();
 		for (ChainedStateHandle<OperatorStateHandle> chainedStateHandle : expected) {
@@ -3263,7 +3290,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				for (long offset : entry.getValue().getOffsets()) {
 					in.seek(offset);
 					Integer state = InstantiationUtil.
-							deserializeObject(in, Thread.currentThread().getContextClassLoader());
+						deserializeObject(in, Thread.currentThread().getContextClassLoader());
 					resultCollector.add(opIdx + " : " + entry.getKey() + " : " + state);
 				}
 			}
@@ -3308,24 +3335,25 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		// Periodic
 		CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(
-				System.currentTimeMillis(),
-				CheckpointProperties.forStandardCheckpoint(),
-				null,
-				true);
+			System.currentTimeMillis(),
+			CheckpointProperties.forStandardCheckpoint(),
+			null,
+			true);
 
 		assertTrue(triggerResult.isFailure());
 		assertEquals(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN, triggerResult.getFailureReason());
 
 		// Not periodic
 		triggerResult = coord.triggerCheckpoint(
-				System.currentTimeMillis(),
-				CheckpointProperties.forStandardCheckpoint(),
-				null,
-				false);
+			System.currentTimeMillis(),
+			CheckpointProperties.forStandardCheckpoint(),
+			null,
+			false);
 
 		assertFalse(triggerResult.isFailure());
 	}
@@ -3352,12 +3380,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			int maxPartitionsPerState = 1 + r.nextInt(9);
 
 			doTestPartitionableStateRepartitioning(
-					r, oldParallelism, newParallelism, numNamedStates, maxPartitionsPerState);
+				r, oldParallelism, newParallelism, numNamedStates, maxPartitionsPerState);
 		}
 	}
 
 	private void doTestPartitionableStateRepartitioning(
-			Random r, int oldParallelism, int newParallelism, int numNamedStates, int maxPartitionsPerState) {
+		Random r, int oldParallelism, int newParallelism, int numNamedStates, int maxPartitionsPerState) {
 
 		List<OperatorStateHandle> previousParallelOpInstanceStates = new ArrayList<>(oldParallelism);
 
@@ -3374,15 +3402,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
 				}
 
 				OperatorStateHandle.Mode mode = r.nextInt(10) == 0 ?
-						OperatorStateHandle.Mode.BROADCAST : OperatorStateHandle.Mode.SPLIT_DISTRIBUTE;
+					OperatorStateHandle.Mode.BROADCAST : OperatorStateHandle.Mode.SPLIT_DISTRIBUTE;
 				namedStatesToOffsets.put(
-						"State-" + s,
-						new OperatorStateHandle.StateMetaInfo(offs, mode));
+					"State-" + s,
+					new OperatorStateHandle.StateMetaInfo(offs, mode));
 
 			}
 
 			previousParallelOpInstanceStates.add(
-					new OperatorStateHandle(namedStatesToOffsets, new FileStateHandle(fakePath, -1)));
+				new OperatorStateHandle(namedStatesToOffsets, new FileStateHandle(fakePath, -1)));
 		}
 
 		Map<StreamStateHandle, Map<String, List<Long>>> expected = new HashMap<>();
@@ -3395,7 +3423,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 				long[] offs = e.getValue().getOffsets();
 				int replication = e.getValue().getDistributionMode().equals(OperatorStateHandle.Mode.BROADCAST) ?
-						newParallelism : 1;
+					newParallelism : 1;
 
 				expectedTotalPartitions += replication * offs.length;
 				List<Long> offsList = new ArrayList<>(offs.length);
@@ -3413,7 +3441,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
 
 		List<Collection<OperatorStateHandle>> pshs =
-				repartitioner.repartitionState(previousParallelOpInstanceStates, newParallelism);
+			repartitioner.repartitionState(previousParallelOpInstanceStates, newParallelism);
 
 		Map<StreamStateHandle, Map<String, List<Long>>> actual = new HashMap<>();
 
@@ -3486,7 +3514,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
 		coord.setCheckpointStatsTracker(tracker);
@@ -3524,7 +3553,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			new StandaloneCheckpointIDCounter(),
 			store,
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		store.addCheckpoint(new CompletedCheckpoint(
 			new JobID(),
@@ -3580,7 +3610,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			checkpointIDCounter,
 			completedCheckpointStore,
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		// trigger a first checkpoint
 		assertTrue(

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 7d24568..0888cff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.util.SerializableObject;
@@ -109,7 +110,8 @@ public class CheckpointStateRestoreTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			// create ourselves a checkpoint with state
 			final long timestamp = 34623786L;
@@ -183,7 +185,8 @@ public class CheckpointStateRestoreTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1),
 				null,
-				Executors.directExecutor());
+				Executors.directExecutor(),
+				SharedStateRegistry.DEFAULT_FACTORY);
 
 			try {
 				coord.restoreLatestCheckpointedState(new HashMap<JobVertexID, ExecutionJobVertex>(), true, false);
@@ -240,7 +243,8 @@ public class CheckpointStateRestoreTest {
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,
-			Executors.directExecutor());
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY);
 
 		StreamStateHandle serializedState = CheckpointCoordinatorTest
 				.generateChainedStateHandle(new SerializableObject())