You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/05/19 09:01:46 UTC

[1/4] flink git commit: [FLINK-6612] Allow ZooKeeperStateHandleStore to lock created ZNodes

Repository: flink
Updated Branches:
  refs/heads/release-1.3 0963718ac -> f62004079


http://git-wip-us.apache.org/repos/asf/flink/blob/f58fec70/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
new file mode 100644
index 0000000..0c215cd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLogger;
+import org.apache.zookeeper.data.Stat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for basic {@link ZooKeeperStateHandleStore} behaviour.
+ *
+ * <p> Tests include:
+ * <ul>
+ * <li>Expected usage of operations</li>
+ * <li>Correct ordering of ZooKeeper and state handle operations</li>
+ * </ul>
+ */
+public class ZooKeeperStateHandleStoreTest extends TestLogger {
+
+	private static final ZooKeeperTestEnvironment ZOOKEEPER = new ZooKeeperTestEnvironment(1);
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (ZOOKEEPER != null) {
+			ZOOKEEPER.shutdown();
+		}
+	}
+
+	@Before
+	public void cleanUp() throws Exception {
+		ZOOKEEPER.deleteAll();
+	}
+
+	/**
+	 * Tests add operation with lock.
+	 */
+	@Test
+	public void testAddAndLock() throws Exception {
+		LongStateStorage longStateStorage = new LongStateStorage();
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
+				ZOOKEEPER.getClient(), longStateStorage, Executors.directExecutor());
+
+		// Config
+		final String pathInZooKeeper = "/testAdd";
+		final Long state = 1239712317L;
+
+		// Test
+		store.addAndLock(pathInZooKeeper, state);
+
+		// Verify
+		// State handle created
+		assertEquals(1, store.getAllAndLock().size());
+		assertEquals(state, store.getAndLock(pathInZooKeeper).retrieveState());
+
+		// Path created and is persistent
+		Stat stat = ZOOKEEPER.getClient().checkExists().forPath(pathInZooKeeper);
+		assertNotNull(stat);
+		assertEquals(0, stat.getEphemeralOwner());
+
+		List<String> children = ZOOKEEPER.getClient().getChildren().forPath(pathInZooKeeper);
+
+		// there should be one child which is the lock
+		assertEquals(1, children.size());
+
+		stat = ZOOKEEPER.getClient().checkExists().forPath(pathInZooKeeper + '/' + children.get(0));
+		assertNotNull(stat);
+
+		// check that the child is an ephemeral node
+		assertNotEquals(0, stat.getEphemeralOwner());
+
+		// Data is equal
+		@SuppressWarnings("unchecked")
+		Long actual = ((RetrievableStateHandle<Long>) InstantiationUtil.deserializeObject(
+				ZOOKEEPER.getClient().getData().forPath(pathInZooKeeper),
+				ClassLoader.getSystemClassLoader())).retrieveState();
+
+		assertEquals(state, actual);
+	}
+
+	/**
+	 * Tests that an existing path throws an Exception.
+	 */
+	@Test(expected = Exception.class)
+	public void testAddAlreadyExistingPath() throws Exception {
+		LongStateStorage stateHandleProvider = new LongStateStorage();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+
+		ZOOKEEPER.getClient().create().forPath("/testAddAlreadyExistingPath");
+
+		store.addAndLock("/testAddAlreadyExistingPath", 1L);
+
+		// writing to the state storage should have succeeded
+		assertEquals(1, stateHandleProvider.getStateHandles());
+
+		// the created state handle should have been cleaned up if the add operation failed
+		assertEquals(1, stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls());
+	}
+
+	/**
+	 * Tests that the created state handle is discarded if ZooKeeper create fails.
+	 */
+	@Test
+	public void testAddDiscardStateHandleAfterFailure() throws Exception {
+		// Setup
+		LongStateStorage stateHandleProvider = new LongStateStorage();
+
+		CuratorFramework client = spy(ZOOKEEPER.getClient());
+		when(client.inTransaction().create()).thenThrow(new RuntimeException("Expected test Exception."));
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				client, stateHandleProvider, Executors.directExecutor());
+
+		// Config
+		final String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure";
+		final Long state = 81282227L;
+
+		try {
+			// Test
+			store.addAndLock(pathInZooKeeper, state);
+			fail("Did not throw expected exception");
+		}
+		catch (Exception ignored) {
+		}
+
+		// Verify
+		// State handle created and discarded
+		assertEquals(1, stateHandleProvider.getStateHandles().size());
+		assertEquals(state, stateHandleProvider.getStateHandles().get(0).retrieveState());
+		assertEquals(1, stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls());
+	}
+
+	/**
+	 * Tests that a state handle is replaced.
+	 */
+	@Test
+	public void testReplace() throws Exception {
+		// Setup
+		LongStateStorage stateHandleProvider = new LongStateStorage();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+
+		// Config
+		final String pathInZooKeeper = "/testReplace";
+		final Long initialState = 30968470898L;
+		final Long replaceState = 88383776661L;
+
+		// Test
+		store.addAndLock(pathInZooKeeper, initialState);
+		store.replace(pathInZooKeeper, 0, replaceState);
+
+		// Verify
+		// State handles created
+		assertEquals(2, stateHandleProvider.getStateHandles().size());
+		assertEquals(initialState, stateHandleProvider.getStateHandles().get(0).retrieveState());
+		assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).retrieveState());
+
+		// Path created and is persistent
+		Stat stat = ZOOKEEPER.getClient().checkExists().forPath(pathInZooKeeper);
+		assertNotNull(stat);
+		assertEquals(0, stat.getEphemeralOwner());
+
+		// Data is equal
+		@SuppressWarnings("unchecked")
+		Long actual = ((RetrievableStateHandle<Long>) InstantiationUtil.deserializeObject(
+				ZOOKEEPER.getClient().getData().forPath(pathInZooKeeper),
+				ClassLoader.getSystemClassLoader())).retrieveState();
+
+		assertEquals(replaceState, actual);
+	}
+
+	/**
+	 * Tests that a non existing path throws an Exception.
+	 */
+	@Test(expected = Exception.class)
+	public void testReplaceNonExistingPath() throws Exception {
+		RetrievableStateStorageHelper<Long> stateStorage = new LongStateStorage();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZOOKEEPER.getClient(), stateStorage, Executors.directExecutor());
+
+		store.replace("/testReplaceNonExistingPath", 0, 1L);
+	}
+
+	/**
+	 * Tests that the replace state handle is discarded if ZooKeeper setData fails.
+	 */
+	@Test
+	public void testReplaceDiscardStateHandleAfterFailure() throws Exception {
+		// Setup
+		LongStateStorage stateHandleProvider = new LongStateStorage();
+
+		CuratorFramework client = spy(ZOOKEEPER.getClient());
+		when(client.setData()).thenThrow(new RuntimeException("Expected test Exception."));
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				client, stateHandleProvider, Executors.directExecutor());
+
+		// Config
+		final String pathInZooKeeper = "/testReplaceDiscardStateHandleAfterFailure";
+		final Long initialState = 30968470898L;
+		final Long replaceState = 88383776661L;
+
+		// Test
+		store.addAndLock(pathInZooKeeper, initialState);
+
+		try {
+			store.replace(pathInZooKeeper, 0, replaceState);
+			fail("Did not throw expected exception");
+		}
+		catch (Exception ignored) {
+		}
+
+		// Verify
+		// State handle created and discarded
+		assertEquals(2, stateHandleProvider.getStateHandles().size());
+		assertEquals(initialState, stateHandleProvider.getStateHandles().get(0).retrieveState());
+		assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).retrieveState());
+		assertEquals(1, stateHandleProvider.getStateHandles().get(1).getNumberOfDiscardCalls());
+
+		// Initial value
+		@SuppressWarnings("unchecked")
+		Long actual = ((RetrievableStateHandle<Long>) InstantiationUtil.deserializeObject(
+				ZOOKEEPER.getClient().getData().forPath(pathInZooKeeper),
+				ClassLoader.getSystemClassLoader())).retrieveState();
+
+		assertEquals(initialState, actual);
+	}
+
+	/**
+	 * Tests get operation.
+	 */
+	@Test
+	public void testGetAndExists() throws Exception {
+		// Setup
+		LongStateStorage stateHandleProvider = new LongStateStorage();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+
+		// Config
+		final String pathInZooKeeper = "/testGetAndExists";
+		final Long state = 311222268470898L;
+
+		// Test
+		assertEquals(-1, store.exists(pathInZooKeeper));
+
+		store.addAndLock(pathInZooKeeper, state);
+		RetrievableStateHandle<Long> actual = store.getAndLock(pathInZooKeeper);
+
+		// Verify
+		assertEquals(state, actual.retrieveState());
+		assertTrue(store.exists(pathInZooKeeper) >= 0);
+	}
+
+	/**
+	 * Tests that a non existing path throws an Exception.
+	 */
+	@Test(expected = Exception.class)
+	public void testGetNonExistingPath() throws Exception {
+		LongStateStorage stateHandleProvider = new LongStateStorage();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+
+		store.getAndLock("/testGetNonExistingPath");
+	}
+
+	/**
+	 * Tests that all added state is returned.
+	 */
+	@Test
+	public void testGetAll() throws Exception {
+		// Setup
+		LongStateStorage stateHandleProvider = new LongStateStorage();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+
+		// Config
+		final String pathInZooKeeper = "/testGetAll";
+
+		final Set<Long> expected = new HashSet<>();
+		expected.add(311222268470898L);
+		expected.add(132812888L);
+		expected.add(27255442L);
+		expected.add(11122233124L);
+
+		// Test
+		for (long val : expected) {
+			store.addAndLock(pathInZooKeeper + val, val);
+		}
+
+		for (Tuple2<RetrievableStateHandle<Long>, String> val : store.getAllAndLock()) {
+			assertTrue(expected.remove(val.f0.retrieveState()));
+		}
+		assertEquals(0, expected.size());
+	}
+
+	/**
+	 * Tests that the state is returned sorted.
+	 */
+	@Test
+	public void testGetAllSortedByName() throws Exception {
+		// Setup
+		LongStateStorage stateHandleProvider = new LongStateStorage();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+
+		// Config
+		final String basePath = "/testGetAllSortedByName";
+
+		final Long[] expected = new Long[] {
+				311222268470898L, 132812888L, 27255442L, 11122233124L };
+
+		// Test
+		for (long val : expected) {
+			final String pathInZooKeeper = String.format("%s%016d", basePath, val);
+			store.addAndLock(pathInZooKeeper, val);
+		}
+
+		List<Tuple2<RetrievableStateHandle<Long>, String>> actual = store.getAllSortedByNameAndLock();
+		assertEquals(expected.length, actual.size());
+
+		// bring the elements in sort order
+		Arrays.sort(expected);
+
+		for (int i = 0; i < expected.length; i++) {
+			assertEquals(expected[i], actual.get(i).f0.retrieveState());
+		}
+	}
+
+	/**
+	 * Tests that state handles are correctly removed.
+	 */
+	@Test
+	public void testRemove() throws Exception {
+		// Setup
+		LongStateStorage stateHandleProvider = new LongStateStorage();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+
+		// Config
+		final String pathInZooKeeper = "/testRemove";
+		final Long state = 27255442L;
+
+		store.addAndLock(pathInZooKeeper, state);
+
+		// Test
+		store.releaseAndTryRemove(pathInZooKeeper);
+
+		// Verify discarded
+		assertEquals(0, ZOOKEEPER.getClient().getChildren().forPath("/").size());
+	}
+
+	/**
+	 * Tests that state handles are correctly removed with a callback.
+	 */
+	@Test
+	public void testRemoveWithCallback() throws Exception {
+		// Setup
+		LongStateStorage stateHandleProvider = new LongStateStorage();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+
+		// Config
+		final String pathInZooKeeper = "/testRemoveWithCallback";
+		final Long state = 27255442L;
+
+		store.addAndLock(pathInZooKeeper, state);
+
+		final CountDownLatch sync = new CountDownLatch(1);
+		ZooKeeperStateHandleStore.RemoveCallback<Long> callback = mock(ZooKeeperStateHandleStore.RemoveCallback.class);
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) throws Throwable {
+				sync.countDown();
+				return null;
+			}
+		}).when(callback).apply(any(RetrievableStateHandle.class));
+
+		// Test
+		store.releaseAndTryRemove(pathInZooKeeper, callback);
+
+		// Verify discarded and callback called
+		assertEquals(0, ZOOKEEPER.getClient().getChildren().forPath("/").size());
+
+		sync.await();
+
+		verify(callback, times(1))
+				.apply(any(RetrievableStateHandle.class));
+	}
+
+	/** Tests that all state handles are correctly discarded. */
+	@Test
+	public void testReleaseAndTryRemoveAll() throws Exception {
+		// Setup
+		LongStateStorage stateHandleProvider = new LongStateStorage();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+				ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor());
+
+		// Config
+		final String pathInZooKeeper = "/testDiscardAll";
+
+		final Set<Long> expected = new HashSet<>();
+		expected.add(311222268470898L);
+		expected.add(132812888L);
+		expected.add(27255442L);
+		expected.add(11122233124L);
+
+		// Test
+		for (long val : expected) {
+			store.addAndLock(pathInZooKeeper + val, val);
+		}
+
+		store.releaseAndTryRemoveAll();
+
+		// Verify all discarded
+		assertEquals(0, ZOOKEEPER.getClient().getChildren().forPath("/").size());
+	}
+
+	/**
+	 * Tests that the ZooKeeperStateHandleStore can handle corrupted data by releasing and trying to remove the
+	 * respective ZooKeeper ZNodes.
+	 */
+	@Test
+	public void testCorruptedData() throws Exception {
+		LongStateStorage stateStorage = new LongStateStorage();
+
+		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
+			ZOOKEEPER.getClient(),
+			stateStorage,
+			Executors.directExecutor());
+
+		final Collection<Long> input = new HashSet<>();
+		input.add(1L);
+		input.add(2L);
+		input.add(3L);
+
+		for (Long aLong : input) {
+			store.addAndLock("/" + aLong, aLong);
+		}
+
+		// corrupt one of the entries
+		ZOOKEEPER.getClient().setData().forPath("/" + 2, new byte[2]);
+
+		List<Tuple2<RetrievableStateHandle<Long>, String>> allEntries = store.getAllAndLock();
+
+		Collection<Long> expected = new HashSet<>(input);
+		expected.remove(2L);
+
+		Collection<Long> actual = new HashSet<>(expected.size());
+
+		for (Tuple2<RetrievableStateHandle<Long>, String> entry : allEntries) {
+			actual.add(entry.f0.retrieveState());
+		}
+
+		assertEquals(expected, actual);
+
+		// check the same for the all sorted by name call
+		allEntries = store.getAllSortedByNameAndLock();
+
+		actual.clear();
+
+		for (Tuple2<RetrievableStateHandle<Long>, String> entry : allEntries) {
+			actual.add(entry.f0.retrieveState());
+		}
+
+		assertEquals(expected, actual);
+
+		Stat stat = ZOOKEEPER.getClient().checkExists().forPath("/" + 2);
+
+		// check that the corrupted node no longer exists
+		assertNull("The corrupted node should no longer exist.", stat);
+	}
+
+	/**
+	 * FLINK-6612
+	 *
+	 * Tests that a concurrent delete operation cannot succeed if another instance holds a lock on the specified
+	 * node.
+	 */
+	@Test
+	public void testConcurrentDeleteOperation() throws Exception {
+		LongStateStorage longStateStorage = new LongStateStorage();
+
+		ZooKeeperStateHandleStore<Long> zkStore1 = new ZooKeeperStateHandleStore<>(
+			ZOOKEEPER.getClient(),
+			longStateStorage,
+			Executors.directExecutor());
+
+		ZooKeeperStateHandleStore<Long> zkStore2 = new ZooKeeperStateHandleStore<>(
+			ZOOKEEPER.getClient(),
+			longStateStorage,
+			Executors.directExecutor());
+
+		final String statePath = "/state";
+
+		zkStore1.addAndLock(statePath, 42L);
+		RetrievableStateHandle<Long> stateHandle = zkStore2.getAndLock(statePath);
+
+		// this should not remove the referenced node because we are still holding a state handle
+		// reference via zkStore2
+		zkStore1.releaseAndTryRemove(statePath);
+
+		// sanity check
+		assertEquals(42L, (long) stateHandle.retrieveState());
+
+		Stat nodeStat = ZOOKEEPER.getClient().checkExists().forPath(statePath);
+
+		assertNotNull("NodeStat should not be null, otherwise the referenced node does not exist.", nodeStat);
+
+		zkStore2.releaseAndTryRemove(statePath);
+
+		nodeStat = ZOOKEEPER.getClient().checkExists().forPath(statePath);
+
+		assertNull("NodeState should be null, because the referenced node should no longer exist.", nodeStat);
+	}
+
+	/**
+	 * FLINK-6612
+	 *
+	 * Tests that getAndLock removes a created lock if the RetrievableStateHandle cannot be retrieved
+	 * (e.g. deserialization problem).
+	 */
+	@Test
+	public void testLockCleanupWhenGetAndLockFails() throws Exception {
+		LongStateStorage longStateStorage = new LongStateStorage();
+
+		ZooKeeperStateHandleStore<Long> zkStore1 = new ZooKeeperStateHandleStore<>(
+			ZOOKEEPER.getClient(),
+			longStateStorage,
+			Executors.directExecutor());
+
+		ZooKeeperStateHandleStore<Long> zkStore2 = new ZooKeeperStateHandleStore<>(
+			ZOOKEEPER.getClient(),
+			longStateStorage,
+			Executors.directExecutor());
+
+		final String path = "/state";
+
+		zkStore1.addAndLock(path, 42L);
+
+		final byte[] corruptedData = {1, 2};
+
+		// corrupt the data
+		ZOOKEEPER.getClient().setData().forPath(path, corruptedData);
+
+		try {
+			zkStore2.getAndLock(path);
+			fail("Should fail because we cannot deserialize the node's data");
+		} catch (IOException ignored) {
+			// expected to fail
+		}
+
+		// check that there is no lock node left
+		String lockNodePath = zkStore2.getLockPath(path);
+
+		Stat stat = ZOOKEEPER.getClient().checkExists().forPath(lockNodePath);
+
+		// zkStore2 should not have created a lock node
+		assertNull("zkStore2 should not have created a lock node.", stat);
+
+		Collection<String> children = ZOOKEEPER.getClient().getChildren().forPath(path);
+
+		// there should be exactly one lock node from zkStore1
+		assertEquals(1, children.size());
+
+		zkStore1.releaseAndTryRemove(path);
+
+		stat = ZOOKEEPER.getClient().checkExists().forPath(path);
+
+		assertNull("The state node should have been removed.", stat);
+	}
+
+	/**
+	 * FLINK-6612
+	 *
+	 * Tests that lock nodes will be released if the client dies.
+	 */
+	@Test
+	public void testLockCleanupWhenClientTimesOut() throws Exception {
+		LongStateStorage longStateStorage = new LongStateStorage();
+
+		Configuration configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOOKEEPER.getConnectString());
+		configuration.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, 100);
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT, "timeout");
+
+		try (CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+			CuratorFramework client2 = ZooKeeperUtils.startCuratorFramework(configuration)) {
+
+			ZooKeeperStateHandleStore<Long> zkStore = new ZooKeeperStateHandleStore<>(
+				client,
+				longStateStorage,
+				Executors.directExecutor());
+
+			final String path = "/state";
+
+			zkStore.addAndLock(path, 42L);
+
+			// this should delete all ephemeral nodes
+			client.close();
+
+			Stat stat = client2.checkExists().forPath(path);
+
+			// check that our state node still exists
+			assertNotNull(stat);
+
+			Collection<String> children = client2.getChildren().forPath(path);
+
+			// check that the lock node has been released
+			assertEquals(0, children.size());
+		}
+	}
+
+	/**
+	 * FLINK-6612
+	 *
+	 * Tests that we can release a locked state handles in the ZooKeeperStateHandleStore.
+	 */
+	@Test
+	public void testRelease() throws Exception {
+		LongStateStorage longStateStorage = new LongStateStorage();
+
+		ZooKeeperStateHandleStore<Long> zkStore = new ZooKeeperStateHandleStore<>(
+			ZOOKEEPER.getClient(),
+			longStateStorage,
+			Executors.directExecutor());
+
+		final String path = "/state";
+
+		zkStore.addAndLock(path, 42L);
+
+		final String lockPath = zkStore.getLockPath(path);
+
+		Stat stat = ZOOKEEPER.getClient().checkExists().forPath(lockPath);
+
+		assertNotNull("Expected an existing lock", stat);
+
+		zkStore.release(path);
+
+		stat = ZOOKEEPER.getClient().checkExists().forPath(path);
+
+		// release should have removed the lock child
+		assertEquals("Expected no lock nodes as children", 0, stat.getNumChildren());
+
+		zkStore.releaseAndTryRemove(path);
+
+		stat = ZOOKEEPER.getClient().checkExists().forPath(path);
+
+		assertNull("State node should have been removed.",stat);
+	}
+
+	/**
+	 * FLINK-6612
+	 *
+	 * Tests that we can release all locked state handles in the ZooKeeperStateHandleStore
+	 */
+	@Test
+	public void testReleaseAll() throws Exception {
+		LongStateStorage longStateStorage = new LongStateStorage();
+
+		ZooKeeperStateHandleStore<Long> zkStore = new ZooKeeperStateHandleStore<>(
+			ZOOKEEPER.getClient(),
+			longStateStorage,
+			Executors.directExecutor());
+
+		final Collection<String> paths = Arrays.asList("/state1", "/state2", "/state3");
+
+		for (String path : paths) {
+			zkStore.addAndLock(path, 42L);
+		}
+
+		for (String path : paths) {
+			Stat stat = ZOOKEEPER.getClient().checkExists().forPath(zkStore.getLockPath(path));
+
+			assertNotNull("Expecte and existing lock.", stat);
+		}
+
+		zkStore.releaseAll();
+
+		for (String path : paths) {
+			Stat stat = ZOOKEEPER.getClient().checkExists().forPath(path);
+
+			assertEquals(0, stat.getNumChildren());
+		}
+
+		zkStore.releaseAndTryRemoveAll();
+
+		Stat stat = ZOOKEEPER.getClient().checkExists().forPath("/");
+
+		assertEquals(0, stat.getNumChildren());
+	}
+
+	// ---------------------------------------------------------------------------------------------
+	// Simple test helpers
+	// ---------------------------------------------------------------------------------------------
+
+	private static class LongStateStorage implements RetrievableStateStorageHelper<Long> {
+
+		private final List<LongRetrievableStateHandle> stateHandles = new ArrayList<>();
+
+		@Override
+		public RetrievableStateHandle<Long> store(Long state) throws Exception {
+			LongRetrievableStateHandle stateHandle = new LongRetrievableStateHandle(state);
+			stateHandles.add(stateHandle);
+
+			return stateHandle;
+		}
+
+		List<LongRetrievableStateHandle> getStateHandles() {
+			return stateHandles;
+		}
+	}
+
+	private static class LongRetrievableStateHandle implements RetrievableStateHandle<Long> {
+
+		private static final long serialVersionUID = -3555329254423838912L;
+
+		private final Long state;
+
+		private int numberOfDiscardCalls;
+
+		public LongRetrievableStateHandle(Long state) {
+			this.state = state;
+		}
+
+		@Override
+		public Long retrieveState() throws Exception {
+			return state;
+		}
+
+		@Override
+		public void discardState() throws Exception {
+			numberOfDiscardCalls++;
+		}
+
+		@Override
+		public long getStateSize() {
+			return 0;
+		}
+
+		public int getNumberOfDiscardCalls() {
+			return numberOfDiscardCalls;
+		}
+	}
+}


[2/4] flink git commit: [FLINK-6612] Allow ZooKeeperStateHandleStore to lock created ZNodes

Posted by sr...@apache.org.
[FLINK-6612] Allow ZooKeeperStateHandleStore to lock created ZNodes

In order to guard against deletions of ZooKeeper nodes which are still being used
by a different ZooKeeperStateHandleStore, we have to introduce a locking mechanism.
Only after all ZooKeeperStateHandleStores have released their lock, the ZNode is
allowed to be deleted.

THe locking mechanism is implemented via ephemeral child nodes of the respective
ZooKeeper node. Whenever a ZooKeeperStateHandleStore wants to lock a ZNode, thus,
protecting it from being deleted, it creates an ephemeral child node. The node's
name is unique to the ZooKeeperStateHandleStore instance. The delete operations
will then only delete the node if it does not have any children associated.

In order to guard against oprhaned lock nodes, they are created as ephemeral nodes.
This means that they will be deleted by ZooKeeper once the connection of the
ZooKeeper client which created the node timed out.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f58fec70
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f58fec70
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f58fec70

Branch: refs/heads/release-1.3
Commit: f58fec70fef12056bd58b6cc2985532ccb07625e
Parents: 0963718
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 17 14:52:04 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri May 19 11:00:07 2017 +0200

----------------------------------------------------------------------
 .../store/ZooKeeperMesosWorkerStore.java        |   8 +-
 .../ZooKeeperCompletedCheckpointStore.java      | 150 ++--
 .../ZooKeeperSubmittedJobGraphStore.java        |  50 +-
 .../zookeeper/ZooKeeperStateHandleStore.java    | 419 +++++++---
 .../CompletedCheckpointStoreTest.java           |   9 +
 ...ZooKeeperCompletedCheckpointStoreITCase.java | 133 ++-
 .../ZooKeeperCompletedCheckpointStoreTest.java  |  11 +-
 .../ZooKeeperStateHandleStoreITCase.java        | 642 ---------------
 .../ZooKeeperStateHandleStoreTest.java          | 805 +++++++++++++++++++
 9 files changed, 1345 insertions(+), 882 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f58fec70/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
index 42abd4c..663ce56 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
@@ -88,7 +88,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 				totalTaskCountInZooKeeper.close();
 
 				if(cleanup) {
-					workersInZooKeeper.removeAndDiscardAllState();
+					workersInZooKeeper.releaseAndTryRemoveAll();
 				}
 
 				isRunning = false;
@@ -169,7 +169,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 		synchronized (startStopLock) {
 			verifyIsRunning();
 
-			List<Tuple2<RetrievableStateHandle<Worker>, String>> handles = workersInZooKeeper.getAll();
+			List<Tuple2<RetrievableStateHandle<Worker>, String>> handles = workersInZooKeeper.getAllAndLock();
 
 			if(handles.size() != 0) {
 				List<MesosWorkerStore.Worker> workers = new ArrayList<>(handles.size());
@@ -199,7 +199,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 			int currentVersion = workersInZooKeeper.exists(path);
 			if (currentVersion == -1) {
 				try {
-					workersInZooKeeper.add(path, worker);
+					workersInZooKeeper.addAndLock(path, worker);
 					LOG.debug("Added {} in ZooKeeper.", worker);
 				} catch (KeeperException.NodeExistsException ex) {
 					throw new ConcurrentModificationException("ZooKeeper unexpectedly modified", ex);
@@ -227,7 +227,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 				return false;
 			}
 
-			workersInZooKeeper.removeAndDiscardState(path);
+			workersInZooKeeper.releaseAndTryRemove(path);
 			LOG.debug("Removed worker {} from ZooKeeper.", taskID);
 			return true;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/f58fec70/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 95cfb0f..084d93e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -19,9 +19,6 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -34,12 +31,12 @@ import org.apache.flink.util.FlinkException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -155,7 +152,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 		List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
 		while (true) {
 			try {
-				initialCheckpoints = checkpointsInZooKeeper.getAllSortedByName();
+				initialCheckpoints = checkpointsInZooKeeper.getAllSortedByNameAndLock();
 				break;
 			}
 			catch (ConcurrentModificationException e) {
@@ -178,7 +175,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 					"checkpoint store.", e);
 
 				// remove the checkpoint with broken state handle
-				removeBrokenStateHandle(checkpointStateHandle);
+				removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle.f0);
 			}
 
 			if (completedCheckpoint != null) {
@@ -201,7 +198,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 		final RetrievableStateHandle<CompletedCheckpoint> stateHandle;
 
 		// First add the new one. If it fails, we don't want to loose existing data.
-		stateHandle = checkpointsInZooKeeper.add(path, checkpoint);
+		stateHandle = checkpointsInZooKeeper.addAndLock(path, checkpoint);
 
 		checkpointStateHandles.addLast(new Tuple2<>(stateHandle, path));
 
@@ -211,7 +208,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 		// Everything worked, let's remove a previous checkpoint if necessary.
 		while (checkpointStateHandles.size() > maxNumberOfCheckpointsToRetain) {
 			try {
-				removeSubsumed(checkpointStateHandles.removeFirst(), sharedStateRegistry);
+				removeSubsumed(checkpointStateHandles.removeFirst().f1, sharedStateRegistry);
 			} catch (Exception e) {
 				LOG.warn("Failed to subsume the old checkpoint", e);
 			}
@@ -237,7 +234,8 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 
 					try {
 						// remove the checkpoint with broken state handle
-						removeBrokenStateHandle(checkpointStateHandles.pollLast());
+						Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint = checkpointStateHandles.pollLast();
+						removeBrokenStateHandle(checkpoint.f1, checkpoint.f0);
 					} catch (Exception removeException) {
 						LOG.warn("Could not remove the latest checkpoint with a broken state handle.", removeException);
 					}
@@ -265,7 +263,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 
 				// remove the checkpoint with broken state handle
 				stateHandleIterator.remove();
-				removeBrokenStateHandle(stateHandlePath);
+				removeBrokenStateHandle(stateHandlePath.f1, stateHandlePath.f0);
 			}
 		}
 
@@ -289,7 +287,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 
 			for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint : checkpointStateHandles) {
 				try {
-					removeShutdown(checkpoint, jobStatus, sharedStateRegistry);
+					removeShutdown(checkpoint.f1, jobStatus, sharedStateRegistry);
 				} catch (Exception e) {
 					LOG.error("Failed to discard checkpoint.", e);
 				}
@@ -306,117 +304,87 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 
 			// Clear the local handles, but don't remove any state
 			checkpointStateHandles.clear();
+
+			// Release the state handle locks in ZooKeeper such that they can be deleted
+			checkpointsInZooKeeper.releaseAll();
 		}
 	}
 
 	// ------------------------------------------------------------------------
 
 	private void removeSubsumed(
-		final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath,
+		final String pathInZooKeeper,
 		final SharedStateRegistry sharedStateRegistry) throws Exception {
 		
-		Callable<Void> action = new Callable<Void>() {
+		ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> action = new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
 			@Override
-			public Void call() throws Exception {
-				CompletedCheckpoint completedCheckpoint = retrieveCompletedCheckpoint(stateHandleAndPath);
-				
-				if (completedCheckpoint != null) {
-					completedCheckpoint.discardOnSubsume(sharedStateRegistry);
-				}
+			public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
+				if (value != null) {
+					final CompletedCheckpoint completedCheckpoint;
+					try {
+						completedCheckpoint = value.retrieveState();
+					} catch (Exception e) {
+						throw new FlinkException("Could not retrieve the completed checkpoint from the given state handle.", e);
+					}
 
-				return null;
+					if (completedCheckpoint != null) {
+						try {
+							completedCheckpoint.discardOnSubsume(sharedStateRegistry);
+						} catch (Exception e) {
+							throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
+						}
+					}
+				}
 			}
 		};
 
-		remove(stateHandleAndPath, action);
+		checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, action);
 	}
 
 	private void removeShutdown(
-			final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath,
+			final String pathInZooKeeper,
 			final JobStatus jobStatus,
 			final SharedStateRegistry sharedStateRegistry) throws Exception {
 
-		Callable<Void> action = new Callable<Void>() {
+		ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> removeAction = new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
 			@Override
-			public Void call() throws Exception {
-				CompletedCheckpoint completedCheckpoint = retrieveCompletedCheckpoint(stateHandleAndPath);
-				
-				if (completedCheckpoint != null) {
-					completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry);
-				}
+			public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
+				if (value != null) {
+					final CompletedCheckpoint completedCheckpoint;
+
+					try {
+						completedCheckpoint = value.retrieveState();
+					} catch (Exception e) {
+						throw new FlinkException("Could not retrieve the completed checkpoint from the given state handle.", e);
+					}
 
-				return null;
+					if (completedCheckpoint != null) {
+						try {
+							completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry);
+						} catch (Exception e) {
+							throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
+						}
+					}
+				}
 			}
 		};
 
-		remove(stateHandleAndPath, action);
-	}
-
-	private void removeBrokenStateHandle(final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath) throws Exception {
-		remove(stateHandleAndPath, null);
+		checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, removeAction);
 	}
 
-	/**
-	 * Removes the state handle from ZooKeeper, discards the checkpoints, and the state handle.
-	 */
-	private void remove(
-			final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath,
-			final Callable<Void> action) throws Exception {
-
-		BackgroundCallback callback = new BackgroundCallback() {
+	private void removeBrokenStateHandle(
+			final String pathInZooKeeper,
+			final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle) throws Exception {
+		checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
 			@Override
-			public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
-				final long checkpointId = pathToCheckpointId(stateHandleAndPath.f1);
-
+			public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
 				try {
-					if (event.getType() == CuratorEventType.DELETE) {
-						if (event.getResultCode() == 0) {
-							Exception exception = null;
-
-							if (null != action) {
-								try {
-									action.call();
-								} catch (Exception e) {
-									exception = new Exception("Could not execute callable action " +
-										"for checkpoint " + checkpointId + '.', e);
-								}
-							}
-
-							try {
-								// Discard the state handle
-								stateHandleAndPath.f0.discardState();
-							} catch (Exception e) {
-								Exception newException = new Exception("Could not discard meta " +
-									"data for completed checkpoint " + checkpointId + '.', e);
-
-								if (exception == null) {
-									exception = newException;
-								} else {
-									exception.addSuppressed(newException);
-								}
-							}
-
-							if (exception != null) {
-								throw exception;
-							}
-						} else {
-							throw new IllegalStateException("Unexpected result code " +
-									event.getResultCode() + " in '" + event + "' callback.");
-						}
-					} else {
-						throw new IllegalStateException("Unexpected event type " +
-								event.getType() + " in '" + event + "' callback.");
-					}
+					retrievableStateHandle.discardState();
 				} catch (Exception e) {
-					LOG.warn("Failed to discard checkpoint {}.", checkpointId, e);
+					throw new FlinkException("Could not discard state handle.", e);
 				}
 			}
-		};
-
-		// Remove state handle from ZooKeeper first. If this fails, we can still recover, but if
-		// we remove a state handle and fail to remove it from ZooKeeper, we end up in an
-		// inconsistent state.
-		checkpointsInZooKeeper.remove(stateHandleAndPath.f1, callback);
+		});
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f58fec70/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 2552088..fa972ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -157,36 +157,46 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	@Override
 	public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
 		checkNotNull(jobId, "Job ID");
-		String path = getPathForJob(jobId);
+		final String path = getPathForJob(jobId);
 
 		LOG.debug("Recovering job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
 
 		synchronized (cacheLock) {
 			verifyIsRunning();
 
-			RetrievableStateHandle<SubmittedJobGraph> jobGraphRetrievableStateHandle;
+			boolean success = false;
 
 			try {
-				jobGraphRetrievableStateHandle = jobGraphsInZooKeeper.get(path);
-			} catch (KeeperException.NoNodeException ignored) {
-				return null;
-			} catch (Exception e) {
-				throw new Exception("Could not retrieve the submitted job graph state handle " +
-					"for " + path + "from the submitted job graph store.", e);
-			}
-			SubmittedJobGraph jobGraph;
+				RetrievableStateHandle<SubmittedJobGraph> jobGraphRetrievableStateHandle;
 
-			try {
-				jobGraph = jobGraphRetrievableStateHandle.retrieveState();
-			} catch (Exception e) {
-				throw new Exception("Failed to retrieve the submitted job graph from state handle.", e);
-			}
+				try {
+					jobGraphRetrievableStateHandle = jobGraphsInZooKeeper.getAndLock(path);
+				} catch (KeeperException.NoNodeException ignored) {
+					success = true;
+					return null;
+				} catch (Exception e) {
+					throw new Exception("Could not retrieve the submitted job graph state handle " +
+						"for " + path + "from the submitted job graph store.", e);
+				}
+				SubmittedJobGraph jobGraph;
 
-			addedJobGraphs.add(jobGraph.getJobId());
+				try {
+					jobGraph = jobGraphRetrievableStateHandle.retrieveState();
+				} catch (Exception e) {
+					throw new Exception("Failed to retrieve the submitted job graph from state handle.", e);
+				}
 
-			LOG.info("Recovered {}.", jobGraph);
+				addedJobGraphs.add(jobGraph.getJobId());
 
-			return jobGraph;
+				LOG.info("Recovered {}.", jobGraph);
+
+				success = true;
+				return jobGraph;
+			} finally {
+				if (!success) {
+					jobGraphsInZooKeeper.release(path);
+				}
+			}
 		}
 	}
 
@@ -207,7 +217,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 
 				if (currentVersion == -1) {
 					try {
-						jobGraphsInZooKeeper.add(path, jobGraph);
+						jobGraphsInZooKeeper.addAndLock(path, jobGraph);
 
 						addedJobGraphs.add(jobGraph.getJobId());
 
@@ -245,7 +255,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 
 		synchronized (cacheLock) {
 			if (addedJobGraphs.contains(jobId)) {
-				jobGraphsInZooKeeper.removeAndDiscardState(path);
+				jobGraphsInZooKeeper.releaseAndTryRemove(path);
 
 				addedJobGraphs.remove(jobId);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/f58fec70/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index 364ba0f..a548f1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -20,28 +20,38 @@ package org.apache.flink.runtime.zookeeper;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * State handles backed by ZooKeeper.
+ * Class which stores state via the provided {@link RetrievableStateStorageHelper} and writes the
+ * returned state handle to ZooKeeper. The ZooKeeper node can be locked by creating an ephemeral
+ * child and only allowing the deletion of the ZooKeeper node if it does not have any children.
+ * That way we protect concurrent accesses from different ZooKeeperStateHandleStore instances.
  *
  * <p>Added state is persisted via {@link RetrievableStateHandle RetrievableStateHandles},
  * which in turn are written to ZooKeeper. This level of indirection is necessary to keep the
@@ -80,6 +90,9 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 
 	private final Executor executor;
 
+	/** Lock node name of this ZooKeeperStateHandleStore. The name should be unique among all other state handle stores. */
+	private final String lockNode;
+
 	/**
 	 * Creates a {@link ZooKeeperStateHandleStore}.
 	 *
@@ -99,40 +112,36 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 		this.client = checkNotNull(client, "Curator client");
 		this.storage = checkNotNull(storage, "State storage");
 		this.executor = checkNotNull(executor);
-	}
 
-	/**
-	 * Creates a state handle and stores it in ZooKeeper with create mode {@link
-	 * CreateMode#PERSISTENT}.
-	 *
-	 * @see #add(String, T, CreateMode)
-	 */
-	public RetrievableStateHandle<T> add(String pathInZooKeeper, T state) throws Exception {
-		return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
+		// Generate a unique lock node name
+		lockNode = UUID.randomUUID().toString();
 	}
 
 	/**
-	 * Creates a state handle and stores it in ZooKeeper.
+	 * Creates a state handle, stores it in ZooKeeper and locks it. A locked node cannot be removed by
+	 * another {@link ZooKeeperStateHandleStore} instance as long as this instance remains connected
+	 * to ZooKeeper.
 	 *
 	 * <p><strong>Important</strong>: This will <em>not</em> store the actual state in
 	 * ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection
 	 * makes sure that data in ZooKeeper is small.
 	 *
-	 * @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and
-	 *                        start with a '/')
+	 * <p>The operation will fail if there is already an node under the given path
+	 *
+	 * @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet)
 	 * @param state           State to be added
-	 * @param createMode      The create mode for the new path in ZooKeeper
 	 *
 	 * @return The Created {@link RetrievableStateHandle}.
 	 * @throws Exception If a ZooKeeper or state handle operation fails
 	 */
-	public RetrievableStateHandle<T> add(
+	public RetrievableStateHandle<T> addAndLock(
 			String pathInZooKeeper,
-			T state,
-			CreateMode createMode) throws Exception {
+			T state) throws Exception {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 		checkNotNull(state, "State");
 
+		final String path = normalizePath(pathInZooKeeper);
+
 		RetrievableStateHandle<T> storeHandle = storage.store(state);
 
 		boolean success = false;
@@ -145,7 +154,11 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 			// smaller than the state itself. This level of indirection makes sure that data in
 			// ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but
 			// the state can be larger.
-			client.create().withMode(createMode).forPath(pathInZooKeeper, serializedStoreHandle);
+			// Create the lock node in a transaction with the actual state node. That way we can prevent
+			// race conditions with a concurrent delete operation.
+			client.inTransaction().create().withMode(CreateMode.PERSISTENT).forPath(path, serializedStoreHandle)
+				.and().create().withMode(CreateMode.EPHEMERAL).forPath(getLockPath(path))
+				.and().commit();
 
 			success = true;
 			return storeHandle;
@@ -172,7 +185,9 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 		checkNotNull(state, "State");
 
-		RetrievableStateHandle<T> oldStateHandle = get(pathInZooKeeper);
+		final String path = normalizePath(pathInZooKeeper);
+
+		RetrievableStateHandle<T> oldStateHandle = get(path, false);
 
 		RetrievableStateHandle<T> newStateHandle = storage.store(state);
 
@@ -185,7 +200,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 			// Replace state handle in ZooKeeper.
 			client.setData()
 					.withVersion(expectedVersion)
-					.forPath(pathInZooKeeper, serializedStateHandle);
+					.forPath(path, serializedStateHandle);
 			success = true;
 		} finally {
 			if(success) {
@@ -207,7 +222,9 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	public int exists(String pathInZooKeeper) throws Exception {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 
-		Stat stat = client.checkExists().forPath(pathInZooKeeper);
+		final String path = normalizePath(pathInZooKeeper);
+
+		Stat stat = client.checkExists().forPath(path);
 
 		if (stat != null) {
 			return stat.getVersion();
@@ -217,32 +234,17 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	}
 
 	/**
-	 * Gets a state handle from ZooKeeper.
+	 * Gets the {@link RetrievableStateHandle} stored in the given ZooKeeper node and locks it. A
+	 * locked node cannot be removed by another {@link ZooKeeperStateHandleStore} instance as long
+	 * as this instance remains connected to ZooKeeper.
 	 *
-	 * @param pathInZooKeeper Path in ZooKeeper to get the state handle from (expected to
-	 *                        exist and start with a '/').
-	 * @return The state handle
-	 * @throws Exception If a ZooKeeper or state handle operation fails
+	 * @param pathInZooKeeper Path to the ZooKeeper node which contains the state handle
+	 * @return The retrieved state handle from the specified ZooKeeper node
+	 * @throws IOException Thrown if the method failed to deserialize the stored state handle
+	 * @throws Exception Thrown if a ZooKeeper operation failed
 	 */
-	@SuppressWarnings("unchecked")
-	public RetrievableStateHandle<T> get(String pathInZooKeeper) throws Exception {
-		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
-
-		byte[] data;
-
-		try {
-			data = client.getData().forPath(pathInZooKeeper);
-		} catch (Exception e) {
-			throw new Exception("Failed to retrieve state handle data under " + pathInZooKeeper +
-				" from ZooKeeper.", e);
-		}
-
-		try {
-			return InstantiationUtil.deserializeObject(data, Thread.currentThread().getContextClassLoader());
-		} catch (IOException | ClassNotFoundException e) {
-			throw new IOException("Failed to deserialize state handle from ZooKeeper data from " +
-				pathInZooKeeper + '.', e);
-		}
+	public RetrievableStateHandle<T> getAndLock(String pathInZooKeeper) throws Exception {
+		return get(pathInZooKeeper, true);
 	}
 
 	/**
@@ -270,7 +272,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	}
 
 	/**
-	 * Gets all available state handles from ZooKeeper.
+	 * Gets all available state handles from ZooKeeper and locks the respective state nodes.
 	 *
 	 * <p>If there is a concurrent modification, the operation is retried until it succeeds.
 	 *
@@ -278,7 +280,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	 * @throws Exception If a ZooKeeper or state handle operation fails
 	 */
 	@SuppressWarnings("unchecked")
-	public List<Tuple2<RetrievableStateHandle<T>, String>> getAll() throws Exception {
+	public List<Tuple2<RetrievableStateHandle<T>, String>> getAllAndLock() throws Exception {
 		final List<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new ArrayList<>();
 
 		boolean success = false;
@@ -300,7 +302,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 					path = "/" + path;
 
 					try {
-						final RetrievableStateHandle<T> stateHandle = get(path);
+						final RetrievableStateHandle<T> stateHandle = getAndLock(path);
 						stateHandles.add(new Tuple2<>(stateHandle, path));
 					} catch (KeeperException.NoNodeException ignored) {
 						// Concurrent deletion, retry
@@ -323,7 +325,8 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 
 
 	/**
-	 * Gets all available state handles from ZooKeeper sorted by name (ascending).
+	 * Gets all available state handles from ZooKeeper sorted by name (ascending) and locks the
+	 * respective state nodes.
 	 *
 	 * <p>If there is a concurrent modification, the operation is retried until it succeeds.
 	 *
@@ -331,7 +334,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	 * @throws Exception If a ZooKeeper or state handle operation fails
 	 */
 	@SuppressWarnings("unchecked")
-	public List<Tuple2<RetrievableStateHandle<T>, String>> getAllSortedByName() throws Exception {
+	public List<Tuple2<RetrievableStateHandle<T>, String>> getAllSortedByNameAndLock() throws Exception {
 		final List<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new ArrayList<>();
 
 		boolean success = false;
@@ -355,14 +358,16 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 					path = "/" + path;
 
 					try {
-						final RetrievableStateHandle<T> stateHandle = get(path);
+						final RetrievableStateHandle<T> stateHandle = getAndLock(path);
 						stateHandles.add(new Tuple2<>(stateHandle, path));
 					} catch (KeeperException.NoNodeException ignored) {
 						// Concurrent deletion, retry
 						continue retry;
 					} catch (IOException ioException) {
 						LOG.warn("Could not get all ZooKeeper children. Node {} contained " +
-							"corrupted data. Ignoring this node.", path, ioException);
+							"corrupted data. Releasing and trying to remove this node.", path, ioException);
+
+						releaseAndTryRemove(path);
 					}
 				}
 
@@ -370,6 +375,9 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 
 				// Check for concurrent modifications
 				success = initialCVersion == finalCVersion;
+
+				// we don't have to release all locked nodes in case of a concurrent modification, because we
+				// will retrieve them in the next iteration again.
 			}
 		}
 
@@ -377,75 +385,306 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
 	}
 
 	/**
-	 * Removes a state handle from ZooKeeper.
+	 * Releases the lock for the given state node and tries to remove the state node if it is no longer locked.
+	 * The deletion of the state node is executed asynchronously.
 	 *
-	 * <p><strong>Important</strong>: this does not discard the state handle. If you want to
-	 * discard the state handle call {@link #removeAndDiscardState(String)}.
+	 * <p><strong>Important</strong>: This also discards the stored state handle after the given action
+	 * has been executed.
 	 *
 	 * @param pathInZooKeeper Path of state handle to remove (expected to start with a '/')
 	 * @throws Exception If the ZooKeeper operation fails
 	 */
-	public void remove(String pathInZooKeeper) throws Exception {
-		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
-
-		client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper);
+	public void releaseAndTryRemove(String pathInZooKeeper) throws Exception {
+		releaseAndTryRemove(pathInZooKeeper, null);
 	}
 
 	/**
-	 * Removes a state handle from ZooKeeper asynchronously.
+	 * Releases the lock for the given state node and tries to remove the state node if it is no longer locked.
+	 * The deletion of the state node is executed asynchronously. After the state node has been deleted, the given
+	 * callback is called with the {@link RetrievableStateHandle} of the deleted state node.
 	 *
-	 * <p><strong>Important</strong>: this does not discard the state handle. If you want to
-	 * discard the state handle call {@link #removeAndDiscardState(String)}.
+	 * <p><strong>Important</strong>: This also discards the stored state handle after the given action
+	 * has been executed.
 	 *
-	 * @param pathInZooKeeper Path of state handle to remove (expected to start with a '/')
-	 * @param callback        The callback after the operation finishes
+	 * @param pathInZooKeeper Path of state handle to remove
+	 * @param callback The callback to execute after a successful deletion. Null if no action needs to be executed.
 	 * @throws Exception If the ZooKeeper operation fails
 	 */
-	public void remove(String pathInZooKeeper, BackgroundCallback callback) throws Exception {
+	public void releaseAndTryRemove(
+			String pathInZooKeeper,
+			@Nullable final RemoveCallback<T> callback) throws Exception {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
-		checkNotNull(callback, "Background callback");
 
-		client.delete().deletingChildrenIfNeeded().inBackground(callback, executor).forPath(pathInZooKeeper);
+		final String path = normalizePath(pathInZooKeeper);
+
+		RetrievableStateHandle<T> stateHandle = null;
+
+		try {
+			stateHandle = get(path, false);
+		} catch (Exception e) {
+			LOG.warn("Could not retrieve the state handle from node " + path + '.', e);
+		}
+
+		release(pathInZooKeeper);
+
+		final BackgroundCallback backgroundCallback = new RemoveBackgroundCallback<>(stateHandle, callback, path);
+
+		client.delete().inBackground(backgroundCallback, executor).forPath(path);
 	}
 
 	/**
-	 * Discards a state handle and removes it from ZooKeeper.
+	 * Releases all lock nodes of this ZooKeeperStateHandleStores and tries to remove all state nodes which
+	 * are not locked anymore.
 	 *
-	 * <p>If you only want to remove the state handle in ZooKeeper call {@link #remove(String)}.
+	 * <p>The delete operation is executed asynchronously
 	 *
-	 * @param pathInZooKeeper Path of state handle to discard (expected to start with a '/')
-	 * @throws Exception If the ZooKeeper or state handle operation fails
+	 * @throws Exception if the delete operation fails
 	 */
-	public void removeAndDiscardState(String pathInZooKeeper) throws Exception {
+	public void releaseAndTryRemoveAll() throws Exception {
+		Collection<String> children = getAllPaths();
+
+		Exception exception = null;
+
+		for (String child : children) {
+			try {
+				releaseAndTryRemove('/' + child);
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
+		}
+
+		if (exception != null) {
+			throw new Exception("Could not properly release and try removing all state nodes.", exception);
+		}
+	}
+
+	/**
+	 * Releases the lock from the node under the given ZooKeeper path. If no lock exists, then nothing happens.
+	 *
+	 * @param pathInZooKeeper Path describing the ZooKeeper node
+	 * @throws Exception if the delete operation of the lock node fails
+	 */
+	public void release(String pathInZooKeeper) throws Exception {
+		final String path = normalizePath(pathInZooKeeper);
+
+		try {
+			client.delete().forPath(getLockPath(path));
+		} catch (KeeperException.NoNodeException ignored) {
+			// we have never locked this node
+		} catch (Exception e) {
+			throw new Exception("Could not release the lock: " + getLockPath(pathInZooKeeper) + '.', e);
+		}
+	}
+
+	/**
+	 * Releases all lock nodes of this ZooKeeperStateHandleStore.
+	 *
+	 * @throws Exception if the delete operation of a lock file fails
+	 */
+	public void releaseAll() throws Exception {
+		Collection<String> children = getAllPaths();
+
+		Exception exception = null;
+
+		for (String child: children) {
+			try {
+				release(child);
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
+		}
+
+		if (exception != null) {
+			throw new Exception("Could not properly release all state nodes.", exception);
+		}
+	}
+
+	// ---------------------------------------------------------------------------------------------------------
+	// Protected methods
+	// ---------------------------------------------------------------------------------------------------------
+
+	/**
+	 * Returns the path for the lock node relative to the given path.
+	 *
+	 * @param rootPath Root path under which the lock node shall be created
+	 * @return Path for the lock node
+	 */
+	protected String getLockPath(String rootPath) {
+		return rootPath + '/' + lockNode;
+	}
+
+	// ---------------------------------------------------------------------------------------------------------
+	// Private methods
+	// ---------------------------------------------------------------------------------------------------------
+
+	/**
+	 * Gets a state handle from ZooKeeper and optionally locks it.
+	 *
+	 * @param pathInZooKeeper Path in ZooKeeper to get the state handle from
+	 * @param lock True if we should lock the node; otherwise false
+	 * @return The state handle
+	 * @throws IOException Thrown if the method failed to deserialize the stored state handle
+	 * @throws Exception Thrown if a ZooKeeper operation failed
+	 */
+	@SuppressWarnings("unchecked")
+	private RetrievableStateHandle<T> get(String pathInZooKeeper, boolean lock) throws Exception {
 		checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 
-		RetrievableStateHandle<T> stateHandle = get(pathInZooKeeper);
+		final String path = normalizePath(pathInZooKeeper);
+
+		if (lock) {
+			// try to lock the node
+			try {
+				client.create().withMode(CreateMode.EPHEMERAL).forPath(getLockPath(path));
+			} catch (KeeperException.NodeExistsException ignored) {
+				// we have already created the lock
+			} catch (KeeperException.NoNodeException e) {
+				throw new Exception("Cannot lock the node " + path + " since it does not exist.", e);
+			}
+		}
+
+		boolean success = false;
+
+		try {
+			byte[] data;
+
+			try {
+				data = client.getData().forPath(path);
+			} catch (Exception e) {
+				throw new Exception("Failed to retrieve state handle data under " + path +
+					" from ZooKeeper.", e);
+			}
+
+			try {
+				RetrievableStateHandle<T> retrievableStateHandle = InstantiationUtil.deserializeObject(
+					data,
+					Thread.currentThread().getContextClassLoader());
 
-		// Delete the state handle from ZooKeeper first
-		client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper);
+				success = true;
 
-		// Discard the state handle only after it has been successfully deleted from ZooKeeper.
-		// Otherwise we might enter an illegal state after failures (with a state handle in
-		// ZooKeeper, which has already been discarded).
-		stateHandle.discardState();
+				return retrievableStateHandle;
+			} catch (IOException | ClassNotFoundException e) {
+				throw new IOException("Failed to deserialize state handle from ZooKeeper data from " +
+					path + '.', e);
+			}
+		} finally {
+			if (!success && lock) {
+				// release the lock
+				release(path);
+			}
+		}
 	}
 
 	/**
-	 * Discards all available state handles and removes them from ZooKeeper.
+	 * Makes sure that every path starts with a "/"
 	 *
-	 * @throws Exception If a ZooKeeper or state handle operation fails
+	 * @param path Path to normalize
+	 * @return Normalized path such that it starts with a "/"
 	 */
-	public void removeAndDiscardAllState() throws Exception {
-		final List<Tuple2<RetrievableStateHandle<T>, String>> allStateHandles = getAll();
+	private static String normalizePath(String path) {
+		if (path.startsWith("/")) {
+			return path;
+		} else {
+			return '/' + path;
+		}
+	}
 
-		ZKPaths.deleteChildren(
-				client.getZookeeperClient().getZooKeeper(),
-				ZKPaths.fixForNamespace(client.getNamespace(), "/"),
-				false);
+	// ---------------------------------------------------------------------------------------------------------
+	// Utility classes
+	// ---------------------------------------------------------------------------------------------------------
 
-		// Discard the state handles only after they have been successfully deleted from ZooKeeper.
-		for (Tuple2<RetrievableStateHandle<T>, String> stateHandleAndPath : allStateHandles) {
-			stateHandleAndPath.f0.discardState();
+	/**
+	 * Callback which is executed when removing a node from ZooKeeper. The callback will call the given
+	 * {@link RemoveCallback} if it is not null. Afterwards, it will discard the given {@link RetrievableStateHandle}
+	 * if it is not null.
+	 *
+	 * @param <T> Type of the value stored in the RetrievableStateHandle
+	 */
+	private static final class RemoveBackgroundCallback<T extends Serializable> implements BackgroundCallback {
+		@Nullable
+		private final RetrievableStateHandle<T> stateHandle;
+
+		@Nullable
+		private final RemoveCallback<T> callback;
+
+		private final String pathInZooKeeper;
+
+		private RemoveBackgroundCallback(
+			@Nullable RetrievableStateHandle<T> stateHandle,
+			@Nullable RemoveCallback<T> callback,
+			String pathInZooKeeper) {
+
+			this.stateHandle = stateHandle;
+			this.callback = callback;
+			this.pathInZooKeeper = Preconditions.checkNotNull(pathInZooKeeper);
 		}
+
+		@Override
+		public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+			try {
+				if (event.getType() == CuratorEventType.DELETE) {
+					final KeeperException.Code resultCode = KeeperException.Code.get(event.getResultCode());
+
+					if (resultCode == KeeperException.Code.OK) {
+						Exception exception = null;
+
+						if (null != callback) {
+							try {
+								callback.apply(stateHandle);
+							} catch (Throwable e) {
+								exception = new Exception("Could not execute delete action for node " +
+									pathInZooKeeper + '.', e);
+							}
+						}
+
+						if (stateHandle != null) {
+							try {
+								// Discard the state handle
+								stateHandle.discardState();
+							} catch (Throwable e) {
+								Exception newException = new Exception("Could not discard state handle of node " +
+									pathInZooKeeper + '.', e);
+
+								if (exception == null) {
+									exception = newException;
+								} else {
+									exception.addSuppressed(newException);
+								}
+							}
+						}
+
+						if (exception != null) {
+							throw exception;
+						}
+					} else if (resultCode == KeeperException.Code.NOTEMPTY) {
+						// Could not delete the node because it still contains children/locks
+						LOG.debug("Could not delete node " + pathInZooKeeper + " because it is still locked.");
+					} else {
+						throw new IllegalStateException("Unexpected result code " +
+							resultCode.name() + " in '" + event + "' callback.");
+					}
+				} else {
+					throw new IllegalStateException("Unexpected event type " +
+						event.getType() + " in '" + event + "' callback.");
+				}
+			} catch (Exception e) {
+				LOG.warn("Failed to run callback for delete operation on node " + pathInZooKeeper + '.', e);
+			}
+
+		}
+	};
+
+	/**
+	 * Callback interface for remove calls
+	 */
+	public interface RemoveCallback<T extends Serializable> {
+		/**
+		 * Callback method. The parameter can be null if the {@link RetrievableStateHandle} could not be retrieved
+		 * from ZooKeeper.
+		 *
+		 * @param value RetrievableStateHandle retrieved from ZooKeeper, null if it was not retrievable
+		 * @throws FlinkException If the callback failed
+		 */
+		void apply(@Nullable RetrievableStateHandle<T> value) throws FlinkException;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f58fec70/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 94bd12f..985c662 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -32,6 +32,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -307,6 +308,14 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 			}
 		}
 
+		public boolean awaitDiscard(long timeout) throws InterruptedException {
+			if (discardLatch != null) {
+				return discardLatch.await(timeout, TimeUnit.MILLISECONDS);
+			} else {
+				return false;
+			}
+		}
+
 		@Override
 		public boolean equals(Object o) {
 			if (this == o) return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/f58fec70/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 3fd7f1b..0d93289 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -24,50 +24,52 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for basic {@link CompletedCheckpointStore} contract and ZooKeeper state handling.
  */
 public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpointStoreTest {
 
-	private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+	private static final ZooKeeperTestEnvironment ZOOKEEPER = new ZooKeeperTestEnvironment(1);
 
-	private final static String CheckpointsPath = "/checkpoints";
+	private static final String CHECKPOINT_PATH = "/checkpoints";
 
 	@AfterClass
 	public static void tearDown() throws Exception {
-		if (ZooKeeper != null) {
-			ZooKeeper.shutdown();
+		if (ZOOKEEPER != null) {
+			ZOOKEEPER.shutdown();
 		}
 	}
 
 	@Before
 	public void cleanUp() throws Exception {
-		ZooKeeper.deleteAll();
+		ZOOKEEPER.deleteAll();
 	}
 
 	@Override
-	protected AbstractCompletedCheckpointStore createCompletedCheckpoints(
-			int maxNumberOfCheckpointsToRetain) throws Exception {
-
+	protected ZooKeeperCompletedCheckpointStore createCompletedCheckpoints(int maxNumberOfCheckpointsToRetain) throws Exception {
 		return new ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain,
-			ZooKeeper.createClient(), CheckpointsPath, new RetrievableStateStorageHelper<CompletedCheckpoint>() {
-			@Override
-			public RetrievableStateHandle<CompletedCheckpoint> store(CompletedCheckpoint state) throws Exception {
-				return new HeapRetrievableStateHandle<>(state);
-			}
-		}, Executors.directExecutor());
+			ZOOKEEPER.getClient(),
+			CHECKPOINT_PATH,
+			new HeapStateStorageHelper(),
+			Executors.directExecutor());
 	}
 
 	// ---------------------------------------------------------------------------------------------
@@ -95,7 +97,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		verifyCheckpointRegistered(expected[2].getOperatorStates().values(), checkpoints.sharedStateRegistry);
 
 		// All three should be in ZK
-		assertEquals(3, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size());
+		assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
 		assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
 
 		resetCheckpoint(expected[0].getOperatorStates().values());
@@ -105,7 +107,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		// Recover TODO!!! clear registry!
 		checkpoints.recover();
 
-		assertEquals(3, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size());
+		assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
 		assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
 		assertEquals(expected[2], checkpoints.getLatestCheckpoint());
 
@@ -130,18 +132,18 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 	 */
 	@Test
 	public void testShutdownDiscardsCheckpoints() throws Exception {
-		CuratorFramework client = ZooKeeper.getClient();
+		CuratorFramework client = ZOOKEEPER.getClient();
 
 		CompletedCheckpointStore store = createCompletedCheckpoints(1);
 		TestCompletedCheckpoint checkpoint = createCheckpoint(0);
 
 		store.addCheckpoint(checkpoint);
 		assertEquals(1, store.getNumberOfRetainedCheckpoints());
-		assertNotNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
+		assertNotNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
 		store.shutdown(JobStatus.FINISHED);
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
-		assertNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
+		assertNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
 		store.recover();
 
@@ -149,24 +151,30 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 	}
 
 	/**
-	 * Tests that suspends keeps all checkpoints (as they can be recovered
-	 * later by the ZooKeeper store).
+	 * Tests that suspends keeps all checkpoints (so that they can be recovered
+	 * later by the ZooKeeper store). Furthermore, suspending a job should release
+	 * all locks.
 	 */
 	@Test
 	public void testSuspendKeepsCheckpoints() throws Exception {
-		CuratorFramework client = ZooKeeper.getClient();
+		CuratorFramework client = ZOOKEEPER.getClient();
 
 		CompletedCheckpointStore store = createCompletedCheckpoints(1);
 		TestCompletedCheckpoint checkpoint = createCheckpoint(0);
 
 		store.addCheckpoint(checkpoint);
 		assertEquals(1, store.getNumberOfRetainedCheckpoints());
-		assertNotNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
+		assertNotNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
 		store.shutdown(JobStatus.SUSPENDED);
 
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
-		assertNotNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
+
+		final String checkpointPath = CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID());
+		Stat stat = client.checkExists().forPath(checkpointPath);
+
+		assertNotNull("The checkpoint node should exist.", stat);
+		assertEquals("The checkpoint node should not be locked.", 0, stat.getNumChildren());
 
 		// Recover again
 		store.recover();
@@ -201,24 +209,91 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		assertEquals(checkpoints.get(checkpoints.size() -1), latestCheckpoint);
 	}
 
+	/**
+	 * FLINK-6612
+	 *
+	 * Checks that a concurrent checkpoint completion won't discard a checkpoint which has been
+	 * recovered by a different completed checkpoint store.
+	 */
+	@Test
+	public void testConcurrentCheckpointOperations() throws Exception {
+		final int numberOfCheckpoints = 1;
+		final long waitingTimeout = 50L;
+
+		ZooKeeperCompletedCheckpointStore zkCheckpointStore1 = createCompletedCheckpoints(numberOfCheckpoints);
+		ZooKeeperCompletedCheckpointStore zkCheckpointStore2 = createCompletedCheckpoints(numberOfCheckpoints);
+
+		TestCompletedCheckpoint completedCheckpoint = createCheckpoint(1);
+
+		// complete the first checkpoint
+		zkCheckpointStore1.addCheckpoint(completedCheckpoint);
+
+		// recover the checkpoint by a different checkpoint store
+		zkCheckpointStore2.recover();
+
+		CompletedCheckpoint recoveredCheckpoint = zkCheckpointStore2.getLatestCheckpoint();
+		assertTrue(recoveredCheckpoint instanceof TestCompletedCheckpoint);
+		TestCompletedCheckpoint recoveredTestCheckpoint = (TestCompletedCheckpoint) recoveredCheckpoint;
+
+		// Check that the recovered checkpoint is not yet discarded
+		assertFalse(recoveredTestCheckpoint.isDiscarded());
+
+		// complete another checkpoint --> this should remove the first checkpoint from the store
+		// because the number of retained checkpoints == 1
+		TestCompletedCheckpoint completedCheckpoint2 = createCheckpoint(2);
+		zkCheckpointStore1.addCheckpoint(completedCheckpoint2);
+
+		List<CompletedCheckpoint> allCheckpoints = zkCheckpointStore1.getAllCheckpoints();
+
+		// check that we have removed the first checkpoint from zkCompletedStore1
+		assertEquals(Collections.singletonList(completedCheckpoint2), allCheckpoints);
+
+		// lets wait a little bit to see that no discard operation will be executed
+		assertFalse("The checkpoint should not have been discarded.", recoveredTestCheckpoint.awaitDiscard(waitingTimeout));
+
+		// check that we have not discarded the first completed checkpoint
+		assertFalse(recoveredTestCheckpoint.isDiscarded());
+
+		TestCompletedCheckpoint completedCheckpoint3 = createCheckpoint(3);
+
+		// this should release the last lock on completedCheckoint and thus discard it
+		zkCheckpointStore2.addCheckpoint(completedCheckpoint3);
+
+		// the checkpoint should be discarded eventually because there is no lock on it anymore
+		recoveredTestCheckpoint.awaitDiscard();
+	}
+
+
+	static class HeapStateStorageHelper implements RetrievableStateStorageHelper<CompletedCheckpoint> {
+		@Override
+		public RetrievableStateHandle<CompletedCheckpoint> store(CompletedCheckpoint state) throws Exception {
+			return new HeapRetrievableStateHandle<>(state);
+		}
+	}
+
 	static class HeapRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> {
 
 		private static final long serialVersionUID = -268548467968932L;
 
+		private static AtomicInteger nextKey = new AtomicInteger(0);
+
+		private static HashMap<Integer, Object> stateMap = new HashMap<>();
+
+		private final int key;
+
 		public HeapRetrievableStateHandle(T state) {
-			this.state = state;
+			key = nextKey.getAndIncrement();
+			stateMap.put(key, state);
 		}
 
-		private T state;
-
 		@Override
 		public T retrieveState() throws Exception {
-			return state;
+			return (T) stateMap.get(key);
 		}
 
 		@Override
 		public void discardState() throws Exception {
-			state = null;
+			stateMap.remove(key);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f58fec70/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 0d22dc6..7d22d8e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -110,7 +110,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 
 		ZooKeeperStateHandleStore<CompletedCheckpoint> zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor()));
 		whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
-		doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByName();
+		doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
 
 		final int numCheckpointsToRetain = 1;
 
@@ -126,7 +126,6 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 		when(
 			client
 				.delete()
-				.deletingChildrenIfNeeded()
 				.inBackground(any(BackgroundCallback.class), any(Executor.class))
 		).thenAnswer(new Answer<ErrorListenerPathable<Void>>() {
 			@Override
@@ -150,13 +149,13 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 		});
 
 		final String checkpointsPath = "foobar";
-		final RetrievableStateStorageHelper<CompletedCheckpoint> stateSotrage = mock(RetrievableStateStorageHelper.class);
+		final RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage = mock(RetrievableStateStorageHelper.class);
 
 		ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
 			numCheckpointsToRetain,
 			client,
 			checkpointsPath,
-			stateSotrage,
+			stateStorage,
 			Executors.directExecutor());
 
 		zooKeeperCompletedCheckpointStore.recover();
@@ -209,9 +208,9 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 				
 				return retrievableStateHandle;
 			}
-		}).when(zookeeperStateHandleStoreMock).add(anyString(), any(CompletedCheckpoint.class));
+		}).when(zookeeperStateHandleStoreMock).addAndLock(anyString(), any(CompletedCheckpoint.class));
 		
-		doThrow(new Exception()).when(zookeeperStateHandleStoreMock).remove(anyString(), any(BackgroundCallback.class));
+		doThrow(new Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString(), any(ZooKeeperStateHandleStore.RemoveCallback.class));
 		
 		final int numCheckpointsToRetain = 1;
 		final String checkpointsPath = "foobar";

http://git-wip-us.apache.org/repos/asf/flink/blob/f58fec70/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
deleted file mode 100644
index 4dc4c6b..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
+++ /dev/null
@@ -1,642 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.zookeeper;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.TestLogger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for basic {@link ZooKeeperStateHandleStore} behaviour.
- *
- * <p> Tests include:
- * <ul>
- * <li>Expected usage of operations</li>
- * <li>Correct ordering of ZooKeeper and state handle operations</li>
- * </ul>
- */
-public class ZooKeeperStateHandleStoreITCase extends TestLogger {
-
-	private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
-
-	@AfterClass
-	public static void tearDown() throws Exception {
-		if (ZooKeeper != null) {
-			ZooKeeper.shutdown();
-		}
-	}
-
-	@Before
-	public void cleanUp() throws Exception {
-		ZooKeeper.deleteAll();
-	}
-
-	/**
-	 * Tests add operation with default {@link CreateMode}.
-	 */
-	@Test
-	public void testAdd() throws Exception {
-		LongStateStorage longStateStorage = new LongStateStorage();
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
-				ZooKeeper.getClient(), longStateStorage, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testAdd";
-		final Long state = 1239712317L;
-
-		// Test
-		store.add(pathInZooKeeper, state);
-
-		// Verify
-		// State handle created
-		assertEquals(1, store.getAll().size());
-		assertEquals(state, store.get(pathInZooKeeper).retrieveState());
-
-		// Path created and is persistent
-		Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
-		assertNotNull(stat);
-		assertEquals(0, stat.getEphemeralOwner());
-
-		// Data is equal
-		@SuppressWarnings("unchecked")
-		Long actual = ((RetrievableStateHandle<Long>) InstantiationUtil.deserializeObject(
-				ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
-				ClassLoader.getSystemClassLoader())).retrieveState();
-
-		assertEquals(state, actual);
-	}
-
-	/**
-	 * Tests that {@link CreateMode} is respected.
-	 */
-	@Test
-	public void testAddWithCreateMode() throws Exception {
-		LongStateStorage longStateStorage = new LongStateStorage();
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<Long>(
-				ZooKeeper.getClient(), longStateStorage, Executors.directExecutor());
-
-		// Config
-		Long state = 3457347234L;
-
-		CreateMode[] modes = CreateMode.values();
-		for (int i = 0; i < modes.length; i++) {
-			CreateMode mode = modes[i];
-			state += i;
-
-			String pathInZooKeeper = "/testAddWithCreateMode" + mode.name();
-
-			// Test
-			store.add(pathInZooKeeper, state, mode);
-
-			if (mode.isSequential()) {
-				// Figure out the sequential ID
-				List<String> paths = ZooKeeper.getClient().getChildren().forPath("/");
-				for (String p : paths) {
-					if (p.startsWith("testAddWithCreateMode" + mode.name())) {
-						pathInZooKeeper = "/" + p;
-						break;
-					}
-				}
-			}
-
-			// Verify
-			// State handle created
-			assertEquals(i + 1, store.getAll().size());
-			assertEquals(state, longStateStorage.getStateHandles().get(i).retrieveState());
-
-			// Path created
-			Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
-
-			assertNotNull(stat);
-
-			// Is ephemeral or persistent
-			if (mode.isEphemeral()) {
-				assertTrue(stat.getEphemeralOwner() != 0);
-			}
-			else {
-				assertEquals(0, stat.getEphemeralOwner());
-			}
-
-			// Data is equal
-			@SuppressWarnings("unchecked")
-			Long actual = ((RetrievableStateHandle<Long>) InstantiationUtil.deserializeObject(
-					ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
-					ClassLoader.getSystemClassLoader())).retrieveState();
-
-			assertEquals(state, actual);
-		}
-	}
-
-	/**
-	 * Tests that an existing path throws an Exception.
-	 */
-	@Test(expected = Exception.class)
-	public void testAddAlreadyExistingPath() throws Exception {
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		ZooKeeper.getClient().create().forPath("/testAddAlreadyExistingPath");
-
-		store.add("/testAddAlreadyExistingPath", 1L);
-	}
-
-	/**
-	 * Tests that the created state handle is discarded if ZooKeeper create fails.
-	 */
-	@Test
-	public void testAddDiscardStateHandleAfterFailure() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		CuratorFramework client = spy(ZooKeeper.getClient());
-		when(client.create()).thenThrow(new RuntimeException("Expected test Exception."));
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				client, stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure";
-		final Long state = 81282227L;
-
-		try {
-			// Test
-			store.add(pathInZooKeeper, state);
-			fail("Did not throw expected exception");
-		}
-		catch (Exception ignored) {
-		}
-
-		// Verify
-		// State handle created and discarded
-		assertEquals(1, stateHandleProvider.getStateHandles().size());
-		assertEquals(state, stateHandleProvider.getStateHandles().get(0).retrieveState());
-		assertEquals(1, stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls());
-	}
-
-	/**
-	 * Tests that a state handle is replaced.
-	 */
-	@Test
-	public void testReplace() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testReplace";
-		final Long initialState = 30968470898L;
-		final Long replaceState = 88383776661L;
-
-		// Test
-		store.add(pathInZooKeeper, initialState);
-		store.replace(pathInZooKeeper, 0, replaceState);
-
-		// Verify
-		// State handles created
-		assertEquals(2, stateHandleProvider.getStateHandles().size());
-		assertEquals(initialState, stateHandleProvider.getStateHandles().get(0).retrieveState());
-		assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).retrieveState());
-
-		// Path created and is persistent
-		Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
-		assertNotNull(stat);
-		assertEquals(0, stat.getEphemeralOwner());
-
-		// Data is equal
-		@SuppressWarnings("unchecked")
-		Long actual = ((RetrievableStateHandle<Long>) InstantiationUtil.deserializeObject(
-				ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
-				ClassLoader.getSystemClassLoader())).retrieveState();
-
-		assertEquals(replaceState, actual);
-	}
-
-	/**
-	 * Tests that a non existing path throws an Exception.
-	 */
-	@Test(expected = Exception.class)
-	public void testReplaceNonExistingPath() throws Exception {
-		RetrievableStateStorageHelper<Long> stateStorage = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateStorage, Executors.directExecutor());
-
-		store.replace("/testReplaceNonExistingPath", 0, 1L);
-	}
-
-	/**
-	 * Tests that the replace state handle is discarded if ZooKeeper setData fails.
-	 */
-	@Test
-	public void testReplaceDiscardStateHandleAfterFailure() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		CuratorFramework client = spy(ZooKeeper.getClient());
-		when(client.setData()).thenThrow(new RuntimeException("Expected test Exception."));
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				client, stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testReplaceDiscardStateHandleAfterFailure";
-		final Long initialState = 30968470898L;
-		final Long replaceState = 88383776661L;
-
-		// Test
-		store.add(pathInZooKeeper, initialState);
-
-		try {
-			store.replace(pathInZooKeeper, 0, replaceState);
-			fail("Did not throw expected exception");
-		}
-		catch (Exception ignored) {
-		}
-
-		// Verify
-		// State handle created and discarded
-		assertEquals(2, stateHandleProvider.getStateHandles().size());
-		assertEquals(initialState, stateHandleProvider.getStateHandles().get(0).retrieveState());
-		assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).retrieveState());
-		assertEquals(1, stateHandleProvider.getStateHandles().get(1).getNumberOfDiscardCalls());
-
-		// Initial value
-		@SuppressWarnings("unchecked")
-		Long actual = ((RetrievableStateHandle<Long>) InstantiationUtil.deserializeObject(
-				ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
-				ClassLoader.getSystemClassLoader())).retrieveState();
-
-		assertEquals(initialState, actual);
-	}
-
-	/**
-	 * Tests get operation.
-	 */
-	@Test
-	public void testGetAndExists() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testGetAndExists";
-		final Long state = 311222268470898L;
-
-		// Test
-		assertEquals(-1, store.exists(pathInZooKeeper));
-
-		store.add(pathInZooKeeper, state);
-		RetrievableStateHandle<Long> actual = store.get(pathInZooKeeper);
-
-		// Verify
-		assertEquals(state, actual.retrieveState());
-		assertTrue(store.exists(pathInZooKeeper) >= 0);
-	}
-
-	/**
-	 * Tests that a non existing path throws an Exception.
-	 */
-	@Test(expected = Exception.class)
-	public void testGetNonExistingPath() throws Exception {
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		store.get("/testGetNonExistingPath");
-	}
-
-	/**
-	 * Tests that all added state is returned.
-	 */
-	@Test
-	public void testGetAll() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testGetAll";
-
-		final Set<Long> expected = new HashSet<>();
-		expected.add(311222268470898L);
-		expected.add(132812888L);
-		expected.add(27255442L);
-		expected.add(11122233124L);
-
-		// Test
-		for (long val : expected) {
-			store.add(pathInZooKeeper, val, CreateMode.PERSISTENT_SEQUENTIAL);
-		}
-
-		for (Tuple2<RetrievableStateHandle<Long>, String> val : store.getAll()) {
-			assertTrue(expected.remove(val.f0.retrieveState()));
-		}
-		assertEquals(0, expected.size());
-	}
-
-	/**
-	 * Tests that the state is returned sorted.
-	 */
-	@Test
-	public void testGetAllSortedByName() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testGetAllSortedByName";
-
-		final Long[] expected = new Long[] {
-				311222268470898L, 132812888L, 27255442L, 11122233124L };
-
-		// Test
-		for (long val : expected) {
-			store.add(pathInZooKeeper, val, CreateMode.PERSISTENT_SEQUENTIAL);
-		}
-
-		List<Tuple2<RetrievableStateHandle<Long>, String>> actual = store.getAllSortedByName();
-		assertEquals(expected.length, actual.size());
-
-		for (int i = 0; i < expected.length; i++) {
-			assertEquals(expected[i], actual.get(i).f0.retrieveState());
-		}
-	}
-
-	/**
-	 * Tests that state handles are correctly removed.
-	 */
-	@Test
-	public void testRemove() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testRemove";
-		final Long state = 27255442L;
-
-		store.add(pathInZooKeeper, state);
-
-		// Test
-		store.remove(pathInZooKeeper);
-
-		// Verify discarded
-		assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size());
-	}
-
-	/**
-	 * Tests that state handles are correctly removed with a callback.
-	 */
-	@Test
-	public void testRemoveWithCallback() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testRemoveWithCallback";
-		final Long state = 27255442L;
-
-		store.add(pathInZooKeeper, state);
-
-		final CountDownLatch sync = new CountDownLatch(1);
-		BackgroundCallback callback = mock(BackgroundCallback.class);
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocation) throws Throwable {
-				sync.countDown();
-				return null;
-			}
-		}).when(callback).processResult(eq(ZooKeeper.getClient()), any(CuratorEvent.class));
-
-		// Test
-		store.remove(pathInZooKeeper, callback);
-
-		// Verify discarded and callback called
-		assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size());
-
-		sync.await();
-
-		verify(callback, times(1))
-				.processResult(eq(ZooKeeper.getClient()), any(CuratorEvent.class));
-	}
-
-	/**
-	 * Tests that state handles are correctly discarded.
-	 */
-	@Test
-	public void testRemoveAndDiscardState() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testDiscard";
-		final Long state = 27255442L;
-
-		store.add(pathInZooKeeper, state);
-
-		// Test
-		store.removeAndDiscardState(pathInZooKeeper);
-
-		// Verify discarded
-		assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size());
-	}
-
-	/** Tests that all state handles are correctly discarded. */
-	@Test
-	public void testRemoveAndDiscardAllState() throws Exception {
-		// Setup
-		LongStateStorage stateHandleProvider = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-				ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());
-
-		// Config
-		final String pathInZooKeeper = "/testDiscardAll";
-
-		final Set<Long> expected = new HashSet<>();
-		expected.add(311222268470898L);
-		expected.add(132812888L);
-		expected.add(27255442L);
-		expected.add(11122233124L);
-
-		// Test
-		for (long val : expected) {
-			store.add(pathInZooKeeper, val, CreateMode.PERSISTENT_SEQUENTIAL);
-		}
-
-		store.removeAndDiscardAllState();
-
-		// Verify all discarded
-		assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size());
-	}
-
-	/**
-	 * Tests that the ZooKeeperStateHandleStore can handle corrupted data by ignoring the respective
-	 * ZooKeeper ZNodes.
-	 */
-	@Test
-	public void testCorruptedData() throws Exception {
-		LongStateStorage stateStorage = new LongStateStorage();
-
-		ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
-			ZooKeeper.getClient(),
-			stateStorage,
-			Executors.directExecutor());
-
-		final Collection<Long> input = new HashSet<>();
-		input.add(1L);
-		input.add(2L);
-		input.add(3L);
-
-		for (Long aLong : input) {
-			store.add("/" + aLong, aLong);
-		}
-
-		// corrupt one of the entries
-		ZooKeeper.getClient().setData().forPath("/" + 2, new byte[2]);
-
-		List<Tuple2<RetrievableStateHandle<Long>, String>> allEntries = store.getAll();
-
-		Collection<Long> expected = new HashSet<>(input);
-		expected.remove(2L);
-
-		Collection<Long> actual = new HashSet<>(expected.size());
-
-		for (Tuple2<RetrievableStateHandle<Long>, String> entry : allEntries) {
-			actual.add(entry.f0.retrieveState());
-		}
-
-		assertEquals(expected, actual);
-
-		// check the same for the all sorted by name call
-		allEntries = store.getAllSortedByName();
-
-		actual.clear();
-
-		for (Tuple2<RetrievableStateHandle<Long>, String> entry : allEntries) {
-			actual.add(entry.f0.retrieveState());
-		}
-
-		assertEquals(expected, actual);
-	}
-
-	// ---------------------------------------------------------------------------------------------
-	// Simple test helpers
-	// ---------------------------------------------------------------------------------------------
-
-	private static class LongStateStorage implements RetrievableStateStorageHelper<Long> {
-
-		private final List<LongRetrievableStateHandle> stateHandles = new ArrayList<>();
-
-		@Override
-		public RetrievableStateHandle<Long> store(Long state) throws Exception {
-			LongRetrievableStateHandle stateHandle = new LongRetrievableStateHandle(state);
-			stateHandles.add(stateHandle);
-
-			return stateHandle;
-		}
-
-		List<LongRetrievableStateHandle> getStateHandles() {
-			return stateHandles;
-		}
-	}
-
-	private static class LongRetrievableStateHandle implements RetrievableStateHandle<Long> {
-
-		private static final long serialVersionUID = -3555329254423838912L;
-
-		private final Long state;
-
-		private int numberOfDiscardCalls;
-
-		public LongRetrievableStateHandle(Long state) {
-			this.state = state;
-		}
-
-		@Override
-		public Long retrieveState() throws Exception {
-			return state;
-		}
-
-		@Override
-		public void discardState() throws Exception {
-			numberOfDiscardCalls++;
-		}
-
-		@Override
-		public long getStateSize() {
-			return 0;
-		}
-
-		public int getNumberOfDiscardCalls() {
-			return numberOfDiscardCalls;
-		}
-	}
-}


[3/4] flink git commit: [FLINK-6633] Register shared state before adding to CompletedCheckpointStore

Posted by sr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
new file mode 100644
index 0000000..2a6975a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.checkpoint.savepoint.CheckpointTestUtils;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Random;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+public class IncrementalKeyedStateHandleTest {
+
+	/**
+	 * This test checks, that for an unregistered {@link IncrementalKeyedStateHandle} all state
+	 * (including shared) is discarded.
+	 */
+	@Test
+	public void testUnregisteredDiscarding() throws Exception {
+		IncrementalKeyedStateHandle stateHandle = create(new Random(42));
+
+		stateHandle.discardState();
+
+		for (StreamStateHandle handle : stateHandle.getPrivateState().values()) {
+			verify(handle).discardState();
+		}
+
+		for (StreamStateHandle handle : stateHandle.getSharedState().values()) {
+			verify(handle).discardState();
+		}
+
+		verify(stateHandle.getMetaStateHandle()).discardState();
+	}
+
+	/**
+	 * This test checks, that for a registered {@link IncrementalKeyedStateHandle} discards respect
+	 * all shared state and only discard it one all references are released.
+	 */
+	@Test
+	public void testSharedStateDeRegistration() throws Exception {
+
+		Random rnd = new Random(42);
+
+		SharedStateRegistry registry = spy(new SharedStateRegistry());
+
+		// Create two state handles with overlapping shared state
+		IncrementalKeyedStateHandle stateHandle1 = create(new Random(42));
+		IncrementalKeyedStateHandle stateHandle2 = create(new Random(42));
+
+		// Both handles should not be registered and not discarded by now.
+		for (Map.Entry<StateHandleID, StreamStateHandle> entry :
+			stateHandle1.getSharedState().entrySet()) {
+
+			SharedStateRegistryKey registryKey =
+				stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey());
+
+			verify(registry, times(0)).unregisterReference(registryKey);
+			verify(entry.getValue(), times(0)).discardState();
+		}
+
+		for (Map.Entry<StateHandleID, StreamStateHandle> entry :
+			stateHandle2.getSharedState().entrySet()) {
+
+			SharedStateRegistryKey registryKey =
+				stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey());
+
+			verify(registry, times(0)).unregisterReference(registryKey);
+			verify(entry.getValue(), times(0)).discardState();
+		}
+
+		// Now we register both ...
+		stateHandle1.registerSharedStates(registry);
+		stateHandle2.registerSharedStates(registry);
+
+		for (Map.Entry<StateHandleID, StreamStateHandle> stateHandleEntry :
+			stateHandle1.getSharedState().entrySet()) {
+
+			SharedStateRegistryKey registryKey =
+				stateHandle1.createSharedStateRegistryKeyFromFileName(stateHandleEntry.getKey());
+
+			verify(registry).registerReference(
+				registryKey,
+				stateHandleEntry.getValue());
+		}
+
+		for (Map.Entry<StateHandleID, StreamStateHandle> stateHandleEntry :
+			stateHandle2.getSharedState().entrySet()) {
+
+			SharedStateRegistryKey registryKey =
+				stateHandle1.createSharedStateRegistryKeyFromFileName(stateHandleEntry.getKey());
+
+			verify(registry).registerReference(
+				registryKey,
+				stateHandleEntry.getValue());
+		}
+
+		// We discard the first
+		stateHandle1.discardState();
+
+		// Should be unregistered, non-shared discarded, shared not discarded
+		for (Map.Entry<StateHandleID, StreamStateHandle> entry :
+			stateHandle1.getSharedState().entrySet()) {
+
+			SharedStateRegistryKey registryKey =
+				stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey());
+
+			verify(registry, times(1)).unregisterReference(registryKey);
+			verify(entry.getValue(), times(0)).discardState();
+		}
+
+		for (StreamStateHandle handle :
+			stateHandle2.getSharedState().values()) {
+
+			verify(handle, times(0)).discardState();
+		}
+
+		for (Map.Entry<StateHandleID, StreamStateHandle> handleEntry :
+			stateHandle1.getPrivateState().entrySet()) {
+
+			SharedStateRegistryKey registryKey =
+				stateHandle1.createSharedStateRegistryKeyFromFileName(handleEntry.getKey());
+
+			verify(registry, times(0)).unregisterReference(registryKey);
+			verify(handleEntry.getValue(), times(1)).discardState();
+		}
+
+		for (Map.Entry<StateHandleID, StreamStateHandle> handleEntry :
+			stateHandle2.getPrivateState().entrySet()) {
+
+			SharedStateRegistryKey registryKey =
+				stateHandle1.createSharedStateRegistryKeyFromFileName(handleEntry.getKey());
+
+			verify(registry, times(0)).unregisterReference(registryKey);
+			verify(handleEntry.getValue(), times(0)).discardState();
+		}
+
+		verify(stateHandle1.getMetaStateHandle(), times(1)).discardState();
+		verify(stateHandle2.getMetaStateHandle(), times(0)).discardState();
+
+		// We discard the second
+		stateHandle2.discardState();
+
+
+		// Now everything should be unregistered and discarded
+		for (Map.Entry<StateHandleID, StreamStateHandle> entry :
+			stateHandle1.getSharedState().entrySet()) {
+
+			SharedStateRegistryKey registryKey =
+				stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey());
+
+			verify(registry, times(2)).unregisterReference(registryKey);
+			verify(entry.getValue()).discardState();
+		}
+
+		for (Map.Entry<StateHandleID, StreamStateHandle> entry :
+			stateHandle2.getSharedState().entrySet()) {
+
+			SharedStateRegistryKey registryKey =
+				stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey());
+
+			verify(registry, times(2)).unregisterReference(registryKey);
+			verify(entry.getValue()).discardState();
+		}
+
+		verify(stateHandle1.getMetaStateHandle(), times(1)).discardState();
+		verify(stateHandle2.getMetaStateHandle(), times(1)).discardState();
+	}
+
+	private static IncrementalKeyedStateHandle create(Random rnd) {
+		return new IncrementalKeyedStateHandle(
+			"test",
+			KeyGroupRange.of(0, 0),
+			1L,
+			placeSpies(CheckpointTestUtils.createRandomStateHandleMap(rnd)),
+			placeSpies(CheckpointTestUtils.createRandomStateHandleMap(rnd)),
+			spy(CheckpointTestUtils.createDummyStreamStateHandle(rnd)));
+	}
+
+	private static Map<StateHandleID, StreamStateHandle> placeSpies(
+		Map<StateHandleID, StreamStateHandle> map) {
+
+		for (Map.Entry<StateHandleID, StreamStateHandle> entry : map.entrySet()) {
+			entry.setValue(spy(entry.getValue()));
+		}
+		return map;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
index 03e2a13..4104595 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
@@ -40,14 +40,14 @@ public class SharedStateRegistryTest {
 
 		// register one state
 		TestSharedState firstState = new TestSharedState("first");
-		SharedStateRegistry.Result result = sharedStateRegistry.registerNewReference(firstState.getRegistrationKey(), firstState);
+		SharedStateRegistry.Result result = sharedStateRegistry.registerReference(firstState.getRegistrationKey(), firstState);
 		assertEquals(1, result.getReferenceCount());
 		assertTrue(firstState == result.getReference());
 		assertFalse(firstState.isDiscarded());
 
 		// register another state
 		TestSharedState secondState = new TestSharedState("second");
-		result = sharedStateRegistry.registerNewReference(secondState.getRegistrationKey(), secondState);
+		result = sharedStateRegistry.registerReference(secondState.getRegistrationKey(), secondState);
 		assertEquals(1, result.getReferenceCount());
 		assertTrue(secondState == result.getReference());
 		assertFalse(firstState.isDiscarded());
@@ -55,7 +55,7 @@ public class SharedStateRegistryTest {
 
 		// attempt to register state under an existing key
 		TestSharedState firstStatePrime = new TestSharedState(firstState.getRegistrationKey().getKeyString());
-		result = sharedStateRegistry.registerNewReference(firstState.getRegistrationKey(), firstStatePrime);
+		result = sharedStateRegistry.registerReference(firstState.getRegistrationKey(), firstStatePrime);
 		assertEquals(2, result.getReferenceCount());
 		assertFalse(firstStatePrime == result.getReference());
 		assertTrue(firstState == result.getReference());
@@ -63,19 +63,19 @@ public class SharedStateRegistryTest {
 		assertFalse(firstState.isDiscarded());
 
 		// reference the first state again
-		result = sharedStateRegistry.obtainReference(firstState.getRegistrationKey());
+		result = sharedStateRegistry.registerReference(firstState.getRegistrationKey(), firstState);
 		assertEquals(3, result.getReferenceCount());
 		assertTrue(firstState == result.getReference());
 		assertFalse(firstState.isDiscarded());
 
 		// unregister the second state
-		result = sharedStateRegistry.releaseReference(secondState.getRegistrationKey());
+		result = sharedStateRegistry.unregisterReference(secondState.getRegistrationKey());
 		assertEquals(0, result.getReferenceCount());
 		assertTrue(result.getReference() == null);
 		assertTrue(secondState.isDiscarded());
 
 		// unregister the first state
-		result = sharedStateRegistry.releaseReference(firstState.getRegistrationKey());
+		result = sharedStateRegistry.unregisterReference(firstState.getRegistrationKey());
 		assertEquals(2, result.getReferenceCount());
 		assertTrue(firstState == result.getReference());
 		assertFalse(firstState.isDiscarded());
@@ -87,7 +87,7 @@ public class SharedStateRegistryTest {
 	@Test(expected = IllegalStateException.class)
 	public void testUnregisterWithUnexistedKey() {
 		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-		sharedStateRegistry.releaseReference(new SharedStateRegistryKey("non-existent"));
+		sharedStateRegistry.unregisterReference(new SharedStateRegistryKey("non-existent"));
 	}
 
 	private static class TestSharedState implements StreamStateHandle {

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index b1927f1..8d4a38e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -539,7 +539,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		snapshot2.registerSharedStates(sharedStateRegistry);
 
-		snapshot.unregisterSharedStates(sharedStateRegistry);
 		snapshot.discardState();
 
 		backend.dispose();
@@ -631,7 +630,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		snapshot2.registerSharedStates(sharedStateRegistry);
 
-		snapshot.unregisterSharedStates(sharedStateRegistry);
 		snapshot.discardState();
 
 		backend.dispose();

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
index 11a03cc..2251e46 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
@@ -52,10 +52,12 @@ public class RecoverableCompletedCheckpointStore extends AbstractCompletedCheckp
 
 	@Override
 	public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
-		checkpoints.addLast(checkpoint);
 
 		checkpoint.registerSharedStates(sharedStateRegistry);
 
+		checkpoints.addLast(checkpoint);
+
+
 		if (checkpoints.size() > 1) {
 			CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
 			checkpointToSubsume.discardOnSubsume(sharedStateRegistry);
@@ -76,7 +78,6 @@ public class RecoverableCompletedCheckpointStore extends AbstractCompletedCheckp
 			suspended.clear();
 
 			for (CompletedCheckpoint checkpoint : checkpoints) {
-				sharedStateRegistry.unregisterAll(checkpoint.getOperatorStates().values());
 				suspended.add(checkpoint);
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index fea2b79..6ad7708 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointListener;
@@ -147,8 +148,14 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 			}
 			case ROCKSDB_INCREMENTAL: {
 				String rocksDb = tempFolder.newFolder().getAbsolutePath();
+				String backups = tempFolder.newFolder().getAbsolutePath();
+				// we use the fs backend with small threshold here to test the behaviour with file
+				// references, not self contained byte handles
 				RocksDBStateBackend rdb =
-					new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), true);
+					new RocksDBStateBackend(
+						new FsStateBackend(
+							new Path("file://" + backups).toUri(), 16),
+						true);
 				rdb.setDbStoragePath(rocksDb);
 				this.stateBackend = rdb;
 				break;

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 6c70b87..f9af603 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -20,9 +20,20 @@ package org.apache.flink.test.recovery;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
 import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -35,6 +46,12 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
@@ -43,18 +60,20 @@ import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerProcess;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
-import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.testutils.junit.RetryOnFailure;
 import org.apache.flink.testutils.junit.RetryRule;
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
@@ -127,7 +146,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 
 	private static final int Parallelism = 8;
 
-	private static CountDownLatch CompletedCheckpointsLatch = new CountDownLatch(2);
+	private static CountDownLatch CompletedCheckpointsLatch = new CountDownLatch(4);
 
 	private static AtomicLongArray RecoveredStates = new AtomicLongArray(Parallelism);
 
@@ -137,182 +156,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 
 	private static long LastElement = -1;
 
-	/**
-	 * Simple checkpointed streaming sum.
-	 *
-	 * <p>The sources (Parallelism) count until sequenceEnd. The sink (1) sums up all counts and
-	 * returns it to the main thread via a static variable. We wait until some checkpoints are
-	 * completed and sanity check that the sources recover with an updated state to make sure that
-	 * this test actually tests something.
-	 */
-	@Test
-	@RetryOnFailure(times=1)
-	public void testCheckpointedStreamingSumProgram() throws Exception {
-		// Config
-		final int checkpointingInterval = 200;
-		final int sequenceEnd = 5000;
-		final long expectedSum = Parallelism * sequenceEnd * (sequenceEnd + 1) / 2;
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
-		env.setParallelism(Parallelism);
-		env.enableCheckpointing(checkpointingInterval);
-
-		env
-				.addSource(new CheckpointedSequenceSource(sequenceEnd))
-				.addSink(new CountingSink())
-				.setParallelism(1);
-
-		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-
-		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(ZooKeeper
-				.getConnectString(), FileStateBackendBasePath.getAbsoluteFile().toURI().toString());
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Parallelism);
-
-		ActorSystem testSystem = null;
-		final JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2];
-		LeaderRetrievalService leaderRetrievalService = null;
-		ActorSystem taskManagerSystem = null;
-		final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
-			config,
-			TestingUtils.defaultExecutor(),
-			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-		try {
-			final Deadline deadline = TestTimeOut.fromNow();
-
-			// Test actor system
-			testSystem = AkkaUtils.createActorSystem(new Configuration(),
-					new Some<>(new Tuple2<String, Object>("localhost", 0)));
-
-			// The job managers
-			jobManagerProcess[0] = new JobManagerProcess(0, config);
-			jobManagerProcess[1] = new JobManagerProcess(1, config);
-
-			jobManagerProcess[0].startProcess();
-			jobManagerProcess[1].startProcess();
-
-			// Leader listener
-			TestingListener leaderListener = new TestingListener();
-			leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
-			leaderRetrievalService.start(leaderListener);
-
-			// The task manager
-			taskManagerSystem = AkkaUtils.createActorSystem(
-					config, Option.apply(new Tuple2<String, Object>("localhost", 0)));
-			TaskManager.startTaskManagerComponentsAndActor(
-				config,
-				ResourceID.generate(),
-				taskManagerSystem,
-				highAvailabilityServices,
-				"localhost",
-				Option.<String>empty(),
-				false,
-				TaskManager.class);
-
-			{
-				// Initial submission
-				leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
-
-				String leaderAddress = leaderListener.getAddress();
-				UUID leaderId = leaderListener.getLeaderSessionID();
-
-				// Get the leader ref
-				ActorRef leaderRef = AkkaUtils.getActorRef(
-						leaderAddress, testSystem, deadline.timeLeft());
-				ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
-
-				// Submit the job in detached mode
-				leader.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED));
-
-				JobManagerActorTestUtils.waitForJobStatus(
-						jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft());
-			}
-
-			// Who's the boss?
-			JobManagerProcess leadingJobManagerProcess;
-			if (jobManagerProcess[0].getJobManagerAkkaURL(deadline.timeLeft()).equals(leaderListener.getAddress())) {
-				leadingJobManagerProcess = jobManagerProcess[0];
-			}
-			else {
-				leadingJobManagerProcess = jobManagerProcess[1];
-			}
-
-			CompletedCheckpointsLatch.await();
-
-			// Kill the leading job manager process
-			leadingJobManagerProcess.destroy();
-
-			{
-				// Recovery by the standby JobManager
-				leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
-
-				String leaderAddress = leaderListener.getAddress();
-				UUID leaderId = leaderListener.getLeaderSessionID();
-
-				ActorRef leaderRef = AkkaUtils.getActorRef(
-						leaderAddress, testSystem, deadline.timeLeft());
-				ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
-
-				JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING,
-						leader, deadline.timeLeft());
-			}
-
-			// Wait to finish
-			FinalCountLatch.await();
-
-			assertEquals(expectedSum, (long) FinalCount.get());
-
-			for (int i = 0; i < Parallelism; i++) {
-				assertNotEquals(0, RecoveredStates.get(i));
-			}
-		}
-		catch (Throwable t) {
-			// Reset all static state for test retries
-			CompletedCheckpointsLatch = new CountDownLatch(2);
-			RecoveredStates = new AtomicLongArray(Parallelism);
-			FinalCountLatch = new CountDownLatch(1);
-			FinalCount = new AtomicReference<>();
-			LastElement = -1;
-
-			// Print early (in some situations the process logs get too big
-			// for Travis and the root problem is not shown)
-			t.printStackTrace();
-
-			// In case of an error, print the job manager process logs.
-			if (jobManagerProcess[0] != null) {
-				jobManagerProcess[0].printProcessLog();
-			}
-
-			if (jobManagerProcess[1] != null) {
-				jobManagerProcess[1].printProcessLog();
-			}
-
-			throw t;
-		}
-		finally {
-			if (jobManagerProcess[0] != null) {
-				jobManagerProcess[0].destroy();
-			}
-
-			if (jobManagerProcess[1] != null) {
-				jobManagerProcess[1].destroy();
-			}
-
-			if (leaderRetrievalService != null) {
-				leaderRetrievalService.stop();
-			}
-
-			if (taskManagerSystem != null) {
-				taskManagerSystem.shutdown();
-			}
-
-			if (testSystem != null) {
-				testSystem.shutdown();
-			}
-
-			highAvailabilityServices.closeAndCleanupAllData();
-		}
-	}
+	private static final int retainedCheckpoints = 2;
 
 	/**
 	 * Tests that the JobManager logs failures during recovery properly.
@@ -480,13 +324,110 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testCheckpointedStreamingProgramIncrementalRocksDB() throws Exception {
+		testCheckpointedStreamingProgram(
+			new RocksDBStateBackend(
+				new FsStateBackend(FileStateBackendBasePath.getAbsoluteFile().toURI(), 16),
+				true));
+	}
+
+	private void testCheckpointedStreamingProgram(AbstractStateBackend stateBackend) throws Exception {
+
+		// Config
+		final int checkpointingInterval = 100;
+		final int sequenceEnd = 5000;
+		final long expectedSum = Parallelism * sequenceEnd * (sequenceEnd + 1) / 2;
+
+		final ActorSystem system = ActorSystem.create("Test", AkkaUtils.getDefaultAkkaConfig());
+		final TestingServer testingServer = new TestingServer();
+		final TemporaryFolder temporaryFolder = new TemporaryFolder();
+		temporaryFolder.create();
+
+		LocalFlinkMiniCluster miniCluster = null;
+
+		final int numJMs = 2;
+		final int numTMs = 4;
+		final int numSlots = 8;
+
+		try {
+			Configuration config = new Configuration();
+			config.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS, retainedCheckpoints);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+
+
+			String tmpFolderString = temporaryFolder.newFolder().toString();
+			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, tmpFolderString);
+			config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+			config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+
+			miniCluster = new LocalFlinkMiniCluster(config, true);
+
+			miniCluster.start();
+
+			ActorGateway jmGateway = miniCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
+
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(Parallelism);
+			env.enableCheckpointing(checkpointingInterval);
+
+			//TODO parameterize
+			env.setStateBackend(stateBackend);
+			env
+				.addSource(new CheckpointedSequenceSource(sequenceEnd, 1))
+				.keyBy(new KeySelector<Long, Object>() {
+
+					private static final long serialVersionUID = -8572892067702489025L;
+
+					@Override
+					public Object getKey(Long value) throws Exception {
+						return value;
+					}
+				})
+				.flatMap(new StatefulFlatMap()).setParallelism(1)
+				.addSink(new CountingSink())
+				.setParallelism(1);
+
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			miniCluster.submitJobDetached(jobGraph);
+
+			CompletedCheckpointsLatch.await();
+
+			jmGateway.tell(PoisonPill.getInstance());
+
+			// Wait to finish
+			FinalCountLatch.await();
+
+			assertEquals(expectedSum, (long) FinalCount.get());
+
+			for (int i = 0; i < Parallelism; i++) {
+				assertNotEquals(0, RecoveredStates.get(i));
+			}
+
+		} finally {
+			if (miniCluster != null) {
+				miniCluster.stop();
+				miniCluster.awaitTermination();
+			}
+
+			system.shutdown();
+			system.awaitTermination();
+
+			testingServer.stop();
+			testingServer.close();
+
+		}
+	}
+
 	// ---------------------------------------------------------------------------------------------
 
 	/**
 	 * A checkpointed source, which emits elements from 0 to a configured number.
 	 */
 	public static class CheckpointedSequenceSource extends RichParallelSourceFunction<Long>
-			implements ListCheckpointed<Long> {
+			implements ListCheckpointed<Tuple2<Long, Integer>> {
 
 		private static final Logger LOG = LoggerFactory.getLogger(CheckpointedSequenceSource.class);
 
@@ -496,13 +437,22 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 
 		private final long end;
 
-		private long current = 0;
+		private int repeat;
+
+		private long current;
 
 		private volatile boolean isRunning = true;
 
 		public CheckpointedSequenceSource(long end) {
+			this(end, 1);
+
+		}
+
+		public CheckpointedSequenceSource(long end, int repeat) {
 			checkArgument(end >= 0, "Negative final count");
+			this.current = 0;
 			this.end = end;
+			this.repeat = repeat;
 		}
 
 		@Override
@@ -511,8 +461,10 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 				synchronized (ctx.getCheckpointLock()) {
 					if (current <= end) {
 						ctx.collect(current++);
-					}
-					else {
+					} else if(repeat > 0) {
+						--repeat;
+						current = 0;
+					} else {
 						ctx.collect(LastElement);
 						return;
 					}
@@ -520,32 +472,33 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 
 				// Slow down until some checkpoints are completed
 				if (sync.getCount() != 0) {
-					Thread.sleep(100);
+					Thread.sleep(50);
 				}
 			}
 		}
 
 		@Override
-		public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+		public List<Tuple2<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
 			LOG.debug("Snapshotting state {} @ ID {}.", current, checkpointId);
-			return Collections.singletonList(this.current);
+			return Collections.singletonList(new Tuple2<>(this.current, this.repeat));
 		}
 
 		@Override
-		public void restoreState(List<Long> state) throws Exception {
-			if (state.isEmpty() || state.size() > 1) {
-				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+		public void restoreState(List<Tuple2<Long, Integer>> list) throws Exception {
+			if (list.isEmpty() || list.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + list.size());
 			}
-			Long s = state.get(0);
-			LOG.debug("Restoring state {}", s);
+			Tuple2<Long, Integer> state = list.get(0);
+			LOG.debug("Restoring state {}", state);
 
 			// This is necessary to make sure that something is recovered at all. Otherwise it
 			// might happen that the job is restarted from the beginning.
-			RecoveredStates.set(getRuntimeContext().getIndexOfThisSubtask(), s);
+			RecoveredStates.set(getRuntimeContext().getIndexOfThisSubtask(), 1);
 
 			sync.countDown();
 
-			current = s;
+			current = state._1;
+			repeat = state._2;
 		}
 
 		@Override
@@ -571,6 +524,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 
 		@Override
 		public void invoke(Long value) throws Exception {
+
 			if (value == LastElement) {
 				numberOfReceivedLastElements++;
 
@@ -611,4 +565,41 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 			CompletedCheckpointsLatch.countDown();
 		}
 	}
+
+	public static class StatefulFlatMap extends RichFlatMapFunction<Long, Long> implements CheckpointedFunction {
+
+		private static final long serialVersionUID = 9031079547885320663L;
+
+		private transient ValueState<Integer> alreadySeen;
+
+		@Override
+		public void flatMap(Long input, Collector<Long> out) throws Exception {
+
+			Integer seen = this.alreadySeen.value();
+			if (seen >= Parallelism || input == -1) {
+				out.collect(input);
+			}
+			this.alreadySeen.update(seen + 1);
+		}
+
+		@Override
+		public void open(Configuration config) {
+
+		}
+
+		@Override
+		public void snapshotState(FunctionSnapshotContext context) throws Exception {
+
+		}
+
+		@Override
+		public void initializeState(FunctionInitializationContext context) throws Exception {
+			ValueStateDescriptor<Integer> descriptor =
+				new ValueStateDescriptor<>(
+					"seenCountState",
+					TypeInformation.of(new TypeHint<Integer>() {}),
+					0);
+			alreadySeen = context.getKeyedStateStore().getState(descriptor);
+		}
+	}
 }


[4/4] flink git commit: [FLINK-6633] Register shared state before adding to CompletedCheckpointStore

Posted by sr...@apache.org.
[FLINK-6633] Register shared state before adding to CompletedCheckpointStore


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6200407
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6200407
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6200407

Branch: refs/heads/release-1.3
Commit: f6200407979fb6987f86d7029df81b345f0d9525
Parents: f58fec7
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue May 16 12:32:05 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri May 19 11:01:12 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |  56 +--
 .../state/RocksDBStateBackendTest.java          |  88 ++++-
 .../runtime/checkpoint/CompletedCheckpoint.java | 144 ++-----
 .../flink/runtime/checkpoint/OperatorState.java |   7 -
 .../checkpoint/OperatorSubtaskState.java        |  11 -
 .../StandaloneCompletedCheckpointStore.java     |   4 +-
 .../flink/runtime/checkpoint/SubtaskState.java  |  11 -
 .../flink/runtime/checkpoint/TaskState.java     |   7 -
 .../ZooKeeperCompletedCheckpointStore.java      | 149 +++-----
 .../savepoint/SavepointV2Serializer.java        |  17 +-
 .../runtime/state/CompositeStateHandle.java     |  15 +-
 .../state/IncrementalKeyedStateHandle.java      | 171 ++++-----
 .../runtime/state/KeyGroupsStateHandle.java     |   5 -
 .../state/PlaceholderStreamStateHandle.java     |  44 +--
 .../runtime/state/SharedStateRegistry.java      |  54 +--
 .../state/memory/ByteStreamStateHandle.java     |   7 +
 .../checkpoint/CheckpointCoordinatorTest.java   |  25 --
 .../CompletedCheckpointStoreTest.java           |  61 +--
 .../checkpoint/CompletedCheckpointTest.java     |   3 -
 .../checkpoint/PendingCheckpointTest.java       |   1 -
 ...ZooKeeperCompletedCheckpointStoreITCase.java |   7 +-
 .../savepoint/CheckpointTestUtils.java          |  25 +-
 .../state/IncrementalKeyedStateHandleTest.java  | 206 ++++++++++
 .../runtime/state/SharedStateRegistryTest.java  |  14 +-
 .../runtime/state/StateBackendTestBase.java     |   2 -
 .../RecoverableCompletedCheckpointStore.java    |   5 +-
 ...tractEventTimeWindowCheckpointingITCase.java |   9 +-
 .../JobManagerHACheckpointRecoveryITCase.java   | 375 +++++++++----------
 28 files changed, 747 insertions(+), 776 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 88a759d..1f32a89 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -105,6 +105,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.UUID;
@@ -170,8 +171,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/** True if incremental checkpointing is enabled */
 	private final boolean enableIncrementalCheckpointing;
 
-	/** The sst files materialized in pending checkpoints */
-	private final SortedMap<Long, Map<StateHandleID, StreamStateHandle>> materializedSstFiles = new TreeMap<>();
+	/** The state handle ids of all sst files materialized in snapshots for previous checkpoints */
+	private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles = new TreeMap<>();
 
 	/** The identifier of the last completed checkpoint */
 	private long lastCompletedCheckpointId = -1;
@@ -720,7 +721,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private final long checkpointTimestamp;
 
 		/** All sst files that were part of the last previously completed checkpoint */
-		private Map<StateHandleID, StreamStateHandle> baseSstFiles;
+		private Set<StateHandleID> baseSstFiles;
 
 		/** The state meta data */
 		private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = new ArrayList<>();
@@ -732,10 +733,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private final CloseableRegistry closeableRegistry = new CloseableRegistry();
 
 		// new sst files since the last completed checkpoint
-		private final Map<StateHandleID, StreamStateHandle> newSstFiles = new HashMap<>();
-
-		// old sst files which have been materialized in previous completed checkpoints
-		private final Map<StateHandleID, StreamStateHandle> oldSstFiles = new HashMap<>();
+		private final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
 
 		// handles to the misc files in the current snapshot
 		private final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();
@@ -830,7 +828,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			// use the last completed checkpoint as the comparison base.
 			baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
 
-
 			// save meta data
 			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
 					: stateBackend.kvStateInformation.entrySet()) {
@@ -867,18 +864,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					final StateHandleID stateHandleID = new StateHandleID(fileName);
 
 					if (fileName.endsWith(SST_FILE_SUFFIX)) {
-						StreamStateHandle fileHandle =
-							baseSstFiles == null ? null : baseSstFiles.get(fileName);
+						final boolean existsAlready =
+							baseSstFiles == null ? false : baseSstFiles.contains(stateHandleID);
 
-						if (fileHandle == null) {
-							fileHandle = materializeStateData(filePath);
-							newSstFiles.put(stateHandleID, fileHandle);
-						} else {
+						if (existsAlready) {
 							// we introduce a placeholder state handle, that is replaced with the
 							// original from the shared state registry (created from a previous checkpoint)
-							oldSstFiles.put(
+							sstFiles.put(
 								stateHandleID,
-								new PlaceholderStreamStateHandle(fileHandle.getStateSize()));
+								new PlaceholderStreamStateHandle());
+						} else {
+							sstFiles.put(stateHandleID, materializeStateData(filePath));
 						}
 					} else {
 						StreamStateHandle fileHandle = materializeStateData(filePath);
@@ -887,22 +883,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				}
 			}
 
-			Map<StateHandleID, StreamStateHandle> sstFiles =
-				new HashMap<>(newSstFiles.size() + oldSstFiles.size());
 
-			sstFiles.putAll(newSstFiles);
-			sstFiles.putAll(oldSstFiles);
 
 			synchronized (stateBackend.asyncSnapshotLock) {
-				stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
+				stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
 			}
 
 			return new IncrementalKeyedStateHandle(
 				stateBackend.operatorIdentifier,
 				stateBackend.keyGroupRange,
 				checkpointId,
-				newSstFiles,
-				oldSstFiles,
+				sstFiles,
 				miscFiles,
 				metaStateHandle);
 		}
@@ -933,7 +924,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 				statesToDiscard.add(metaStateHandle);
 				statesToDiscard.addAll(miscFiles.values());
-				statesToDiscard.addAll(newSstFiles.values());
+				statesToDiscard.addAll(sstFiles.values());
 
 				try {
 					StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
@@ -1308,15 +1299,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				UUID.randomUUID().toString());
 
 			try {
-				final Map<StateHandleID, StreamStateHandle> newSstFiles =
-					restoreStateHandle.getCreatedSharedState();
-				final Map<StateHandleID, StreamStateHandle> oldSstFiles =
-					restoreStateHandle.getReferencedSharedState();
+				final Map<StateHandleID, StreamStateHandle> sstFiles =
+					restoreStateHandle.getSharedState();
 				final Map<StateHandleID, StreamStateHandle> miscFiles =
 					restoreStateHandle.getPrivateState();
 
-				readAllStateData(newSstFiles, restoreInstancePath);
-				readAllStateData(oldSstFiles, restoreInstancePath);
+				readAllStateData(sstFiles, restoreInstancePath);
 				readAllStateData(miscFiles, restoreInstancePath);
 
 				// read meta data
@@ -1409,8 +1397,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 						throw new IOException("Could not create RocksDB data directory.");
 					}
 
-					createFileHardLinksInRestorePath(newSstFiles, restoreInstancePath);
-					createFileHardLinksInRestorePath(oldSstFiles, restoreInstancePath);
+					createFileHardLinksInRestorePath(sstFiles, restoreInstancePath);
 					createFileHardLinksInRestorePath(miscFiles, restoreInstancePath);
 
 					List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
@@ -1437,10 +1424,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 
 					// use the restore sst files as the base for succeeding checkpoints
-					Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
-					sstFiles.putAll(newSstFiles);
-					sstFiles.putAll(oldSstFiles);
-					stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles);
+					stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles.keySet());
 
 					stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId();
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 9340455..89eb1d5 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -26,18 +26,22 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateBackendTestBase;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -58,7 +62,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.RunnableFuture;
 
 import static junit.framework.TestCase.assertNotNull;
@@ -67,6 +75,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.mockito.internal.verification.VerificationModeFactory.times;
 import static org.powermock.api.mockito.PowerMockito.mock;
@@ -351,6 +360,83 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 		assertEquals(1, allFilesInDbDir.size());
 	}
 
+	@Test
+	public void testSharedIncrementalStateDeRegistration() throws Exception {
+		if (enableIncrementalCheckpointing) {
+			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+			ValueStateDescriptor<String> kvId =
+				new ValueStateDescriptor<>("id", String.class, null);
+
+			kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+			ValueState<String> state =
+				backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+
+			Queue<IncrementalKeyedStateHandle> previousStateHandles = new LinkedList<>();
+			SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry());
+			for (int checkpointId = 0; checkpointId < 3; ++checkpointId) {
+
+				reset(sharedStateRegistry);
+
+				backend.setCurrentKey(checkpointId);
+				state.update("Hello-" + checkpointId);
+
+				RunnableFuture<KeyedStateHandle> snapshot = backend.snapshot(
+					checkpointId,
+					checkpointId,
+					createStreamFactory(),
+					CheckpointOptions.forFullCheckpoint());
+
+				snapshot.run();
+
+				IncrementalKeyedStateHandle stateHandle = (IncrementalKeyedStateHandle) snapshot.get();
+				Map<StateHandleID, StreamStateHandle> sharedState =
+					new HashMap<>(stateHandle.getSharedState());
+
+				stateHandle.registerSharedStates(sharedStateRegistry);
+
+				for (Map.Entry<StateHandleID, StreamStateHandle> e : sharedState.entrySet()) {
+					verify(sharedStateRegistry).registerReference(
+						stateHandle.createSharedStateRegistryKeyFromFileName(e.getKey()),
+						e.getValue());
+				}
+
+				previousStateHandles.add(stateHandle);
+				backend.notifyCheckpointComplete(checkpointId);
+
+				//-----------------------------------------------------------------
+
+				if (previousStateHandles.size() > 1) {
+					checkRemove(previousStateHandles.remove(), sharedStateRegistry);
+				}
+			}
+
+			while (!previousStateHandles.isEmpty()) {
+
+				reset(sharedStateRegistry);
+
+				checkRemove(previousStateHandles.remove(), sharedStateRegistry);
+			}
+
+			backend.close();
+			backend.dispose();
+		}
+	}
+
+	private void checkRemove(IncrementalKeyedStateHandle remove, SharedStateRegistry registry) throws Exception {
+		for (StateHandleID id : remove.getSharedState().keySet()) {
+			verify(registry, times(0)).unregisterReference(
+				remove.createSharedStateRegistryKeyFromFileName(id));
+		}
+
+		remove.discardState();
+
+		for (StateHandleID id : remove.getSharedState().keySet()) {
+			verify(registry).unregisterReference(
+				remove.createSharedStateRegistryKeyFromFileName(id));
+		}
+	}
 
 	private void runStateUpdates() throws Exception{
 		for (int i = 50; i < 150; ++i) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 1ab5b41..b382080 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -25,8 +25,6 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -177,13 +175,13 @@ public class CompletedCheckpoint implements Serializable {
 	}
 
 	public void discardOnFailedStoring() throws Exception {
-		new UnstoredDiscardStategy().discard();
+		doDiscard();
 	}
 
 	public boolean discardOnSubsume(SharedStateRegistry sharedStateRegistry) throws Exception {
 
 		if (props.discardOnSubsumed()) {
-			new StoredDiscardStrategy(sharedStateRegistry).discard();
+			doDiscard();
 			return true;
 		}
 
@@ -197,7 +195,7 @@ public class CompletedCheckpoint implements Serializable {
 				jobStatus == JobStatus.FAILED && props.discardOnJobFailed() ||
 				jobStatus == JobStatus.SUSPENDED && props.discardOnJobSuspended()) {
 
-			new StoredDiscardStrategy(sharedStateRegistry).discard();
+			doDiscard();
 			return true;
 		} else {
 			if (externalPointer != null) {
@@ -209,6 +207,42 @@ public class CompletedCheckpoint implements Serializable {
 		}
 	}
 
+	private void doDiscard() throws Exception {
+
+		try {
+			// collect exceptions and continue cleanup
+			Exception exception = null;
+
+			// drop the metadata, if we have some
+			if (externalizedMetadata != null) {
+				try {
+					externalizedMetadata.discardState();
+				} catch (Exception e) {
+					exception = e;
+				}
+			}
+
+			// discard private state objects
+			try {
+				StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
+
+			if (exception != null) {
+				throw exception;
+			}
+		} finally {
+			operatorStates.clear();
+
+			// to be null-pointer safe, copy reference to stack
+			CompletedCheckpointStats.DiscardCallback discardCallback = this.discardCallback;
+			if (discardCallback != null) {
+				discardCallback.notifyDiscardedCheckpoint();
+			}
+		}
+	}
+
 	public long getStateSize() {
 		long result = 0L;
 
@@ -252,7 +286,7 @@ public class CompletedCheckpoint implements Serializable {
 
 	/**
 	 * Register all shared states in the given registry. This is method is called
-	 * when the completed checkpoint has been successfully added into the store.
+	 * before the checkpoint is added into the store.
 	 *
 	 * @param sharedStateRegistry The registry where shared states are registered
 	 */
@@ -266,102 +300,4 @@ public class CompletedCheckpoint implements Serializable {
 	public String toString() {
 		return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job);
 	}
-
-	/**
-	 * Base class for the discarding strategies of {@link CompletedCheckpoint}.
-	 */
-	private abstract class DiscardStrategy {
-
-		protected Exception storedException;
-
-		public DiscardStrategy() {
-			this.storedException = null;
-		}
-
-		public void discard() throws Exception {
-
-			try {
-				// collect exceptions and continue cleanup
-				storedException = null;
-
-				doDiscardExternalizedMetaData();
-				doDiscardSharedState();
-				doDiscardPrivateState();
-				doReportStoredExceptions();
-			} finally {
-				clearTaskStatesAndNotifyDiscardCompleted();
-			}
-		}
-
-		protected void doDiscardExternalizedMetaData() {
-			// drop the metadata, if we have some
-			if (externalizedMetadata != null) {
-				try {
-					externalizedMetadata.discardState();
-				} catch (Exception e) {
-					storedException = e;
-				}
-			}
-		}
-
-		protected void doDiscardPrivateState() {
-			// discard private state objects
-			try {
-				StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
-			} catch (Exception e) {
-				storedException = ExceptionUtils.firstOrSuppressed(e, storedException);
-			}
-		}
-
-		protected abstract void doDiscardSharedState();
-
-		protected void doReportStoredExceptions() throws Exception {
-			if (storedException != null) {
-				throw storedException;
-			}
-		}
-
-		protected void clearTaskStatesAndNotifyDiscardCompleted() {
-			operatorStates.clear();
-			// to be null-pointer safe, copy reference to stack
-			CompletedCheckpointStats.DiscardCallback discardCallback =
-				CompletedCheckpoint.this.discardCallback;
-
-			if (discardCallback != null) {
-				discardCallback.notifyDiscardedCheckpoint();
-			}
-		}
-	}
-
-	/**
-	 * Discard all shared states created in the checkpoint. This strategy is applied
-	 * when the completed checkpoint fails to be added into the store.
-	 */
-	private class UnstoredDiscardStategy extends CompletedCheckpoint.DiscardStrategy {
-
-		@Override
-		protected void doDiscardSharedState() {
-			// nothing to do because we did not register any shared state yet. unregistered, new
-			// shared state is then still considered private state and deleted as part of
-			// doDiscardPrivateState().
-		}
-	}
-
-	/**
-	 * Unregister all shared states from the given registry. This is strategy is
-	 * applied when the completed checkpoint is subsumed or the job terminates.
-	 */
-	private class StoredDiscardStrategy extends CompletedCheckpoint.DiscardStrategy {
-
-		SharedStateRegistry sharedStateRegistry;
-
-		public StoredDiscardStrategy(SharedStateRegistry sharedStateRegistry) {
-			this.sharedStateRegistry = Preconditions.checkNotNull(sharedStateRegistry);
-		}
-
-		@Override
-		protected void doDiscardSharedState() {
-			sharedStateRegistry.unregisterAll(operatorStates.values());
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
index aa676e7..b153028 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
@@ -126,13 +126,6 @@ public class OperatorState implements CompositeStateHandle {
 	}
 
 	@Override
-	public void unregisterSharedStates(SharedStateRegistry sharedStateRegistry) {
-		for (OperatorSubtaskState operatorSubtaskState : operatorSubtaskStates.values()) {
-			operatorSubtaskState.unregisterSharedStates(sharedStateRegistry);
-		}
-	}
-
-	@Override
 	public long getStateSize() {
 		long result = 0L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
index 49ef863..e2ae632 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
@@ -158,17 +158,6 @@ public class OperatorSubtaskState implements CompositeStateHandle {
 	}
 
 	@Override
-	public void unregisterSharedStates(SharedStateRegistry sharedStateRegistry) {
-		if (managedKeyedState != null) {
-			managedKeyedState.unregisterSharedStates(sharedStateRegistry);
-		}
-
-		if (rawKeyedState != null) {
-			rawKeyedState.unregisterSharedStates(sharedStateRegistry);
-		}
-	}
-
-	@Override
 	public long getStateSize() {
 		return stateSize;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index f5e1db3..233cfc8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -63,10 +63,10 @@ public class StandaloneCompletedCheckpointStore extends AbstractCompletedCheckpo
 	@Override
 	public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
 		
-		checkpoints.addLast(checkpoint);
-
 		checkpoint.registerSharedStates(sharedStateRegistry);
 
+		checkpoints.addLast(checkpoint);
+
 		if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
 			try {
 				CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
index a77baf3..20d675b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
@@ -162,17 +162,6 @@ public class SubtaskState implements CompositeStateHandle {
 	}
 
 	@Override
-	public void unregisterSharedStates(SharedStateRegistry sharedStateRegistry) {
-		if (managedKeyedState != null) {
-			managedKeyedState.unregisterSharedStates(sharedStateRegistry);
-		}
-
-		if (rawKeyedState != null) {
-			rawKeyedState.unregisterSharedStates(sharedStateRegistry);
-		}
-	}
-
-	@Override
 	public long getStateSize() {
 		return stateSize;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index aa5c516..ed847a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -141,13 +141,6 @@ public class TaskState implements CompositeStateHandle {
 	}
 
 	@Override
-	public void unregisterSharedStates(SharedStateRegistry sharedStateRegistry) {
-		for (SubtaskState subtaskState : subtaskStates.values()) {
-			subtaskState.unregisterSharedStates(sharedStateRegistry);
-		}
-	}
-
-	@Override
 	public long getStateSize() {
 		long result = 0L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 084d93e..4c3c1ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -35,7 +35,6 @@ import javax.annotation.Nullable;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Executor;
 
@@ -79,7 +78,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 	private final int maxNumberOfCheckpointsToRetain;
 
 	/** Local completed checkpoints. */
-	private final ArrayDeque<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointStateHandles;
+	private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
 
 	/**
 	 * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
@@ -122,7 +121,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 
 		this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage, executor);
 
-		this.checkpointStateHandles = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
+		this.completedCheckpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
 		
 		LOG.info("Initialized in '{}'.", checkpointsPath);
 	}
@@ -146,7 +145,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 		// Clear local handles in order to prevent duplicates on
 		// recovery. The local handles should reflect the state
 		// of ZooKeeper.
-		checkpointStateHandles.clear();
+		completedCheckpoints.clear();
 
 		// Get all there is first
 		List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
@@ -170,6 +169,11 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 
 			try {
 				completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
+				if (completedCheckpoint != null) {
+					// Re-register all shared states in the checkpoint.
+					completedCheckpoint.registerSharedStates(sharedStateRegistry);
+					completedCheckpoints.add(completedCheckpoint);
+				}
 			} catch (Exception e) {
 				LOG.warn("Could not retrieve checkpoint. Removing it from the completed " +
 					"checkpoint store.", e);
@@ -177,11 +181,6 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 				// remove the checkpoint with broken state handle
 				removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle.f0);
 			}
-
-			if (completedCheckpoint != null) {
-				completedCheckpoint.registerSharedStates(sharedStateRegistry);
-				checkpointStateHandles.add(checkpointStateHandle);
-			}
 		}
 	}
 
@@ -195,20 +194,19 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 		checkNotNull(checkpoint, "Checkpoint");
 		
 		final String path = checkpointIdToPath(checkpoint.getCheckpointID());
-		final RetrievableStateHandle<CompletedCheckpoint> stateHandle;
 
-		// First add the new one. If it fails, we don't want to loose existing data.
-		stateHandle = checkpointsInZooKeeper.addAndLock(path, checkpoint);
+		// First, register all shared states in the checkpoint to consolidates placeholder.
+		checkpoint.registerSharedStates(sharedStateRegistry);
 
-		checkpointStateHandles.addLast(new Tuple2<>(stateHandle, path));
+		// Now add the new one. If it fails, we don't want to loose existing data.
+		checkpointsInZooKeeper.addAndLock(path, checkpoint);
 
-		// Register all shared states in the checkpoint
-		checkpoint.registerSharedStates(sharedStateRegistry);
+		completedCheckpoints.addLast(checkpoint);
 
 		// Everything worked, let's remove a previous checkpoint if necessary.
-		while (checkpointStateHandles.size() > maxNumberOfCheckpointsToRetain) {
+		while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
 			try {
-				removeSubsumed(checkpointStateHandles.removeFirst().f1, sharedStateRegistry);
+				removeSubsumed(completedCheckpoints.removeFirst(), sharedStateRegistry);
 			} catch (Exception e) {
 				LOG.warn("Failed to subsume the old checkpoint", e);
 			}
@@ -219,60 +217,23 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 
 	@Override
 	public CompletedCheckpoint getLatestCheckpoint() {
-		if (checkpointStateHandles.isEmpty()) {
+		if (completedCheckpoints.isEmpty()) {
 			return null;
 		}
 		else {
-			while(!checkpointStateHandles.isEmpty()) {
-				Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle = checkpointStateHandles.peekLast();
-
-				try {
-					return retrieveCompletedCheckpoint(checkpointStateHandle);
-				} catch (Exception e) {
-					LOG.warn("Could not retrieve latest checkpoint. Removing it from " +
-						"the completed checkpoint store.", e);
-
-					try {
-						// remove the checkpoint with broken state handle
-						Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint = checkpointStateHandles.pollLast();
-						removeBrokenStateHandle(checkpoint.f1, checkpoint.f0);
-					} catch (Exception removeException) {
-						LOG.warn("Could not remove the latest checkpoint with a broken state handle.", removeException);
-					}
-				}
-			}
-
-			return null;
+			return completedCheckpoints.peekLast();
 		}
 	}
 
 	@Override
 	public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
-		List<CompletedCheckpoint> checkpoints = new ArrayList<>(checkpointStateHandles.size());
-
-		Iterator<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> stateHandleIterator = checkpointStateHandles.iterator();
-
-		while (stateHandleIterator.hasNext()) {
-			Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandlePath = stateHandleIterator.next();
-
-			try {
-				checkpoints.add(retrieveCompletedCheckpoint(stateHandlePath));
-			} catch (Exception e) {
-				LOG.warn("Could not retrieve checkpoint. Removing it from the completed " +
-					"checkpoint store.", e);
-
-				// remove the checkpoint with broken state handle
-				stateHandleIterator.remove();
-				removeBrokenStateHandle(stateHandlePath.f1, stateHandlePath.f0);
-			}
-		}
-
+		List<CompletedCheckpoint> checkpoints = new ArrayList<>(completedCheckpoints);
 		return checkpoints;
 	}
 
 	@Override
 	public int getNumberOfRetainedCheckpoints() {
-		return checkpointStateHandles.size();
+		return completedCheckpoints.size();
 	}
 
 	@Override
@@ -285,15 +246,15 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 		if (jobStatus.isGloballyTerminalState()) {
 			LOG.info("Shutting down");
 
-			for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint : checkpointStateHandles) {
+			for (CompletedCheckpoint checkpoint : completedCheckpoints) {
 				try {
-					removeShutdown(checkpoint.f1, jobStatus, sharedStateRegistry);
+					removeShutdown(checkpoint, jobStatus, sharedStateRegistry);
 				} catch (Exception e) {
 					LOG.error("Failed to discard checkpoint.", e);
 				}
 			}
 
-			checkpointStateHandles.clear();
+			completedCheckpoints.clear();
 
 			String path = "/" + client.getNamespace();
 
@@ -303,7 +264,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 			LOG.info("Suspending");
 
 			// Clear the local handles, but don't remove any state
-			checkpointStateHandles.clear();
+			completedCheckpoints.clear();
 
 			// Release the state handle locks in ZooKeeper such that they can be deleted
 			checkpointsInZooKeeper.releaseAll();
@@ -313,21 +274,18 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 	// ------------------------------------------------------------------------
 
 	private void removeSubsumed(
-		final String pathInZooKeeper,
+		final CompletedCheckpoint completedCheckpoint,
 		final SharedStateRegistry sharedStateRegistry) throws Exception {
-		
-		ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> action = new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
-			@Override
-			public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
-				if (value != null) {
-					final CompletedCheckpoint completedCheckpoint;
-					try {
-						completedCheckpoint = value.retrieveState();
-					} catch (Exception e) {
-						throw new FlinkException("Could not retrieve the completed checkpoint from the given state handle.", e);
-					}
 
-					if (completedCheckpoint != null) {
+		if(completedCheckpoint == null) {
+			return;
+		}
+
+		ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> action =
+			new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
+				@Override
+				public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
+					if (value != null) {
 						try {
 							completedCheckpoint.discardOnSubsume(sharedStateRegistry);
 						} catch (Exception e) {
@@ -335,46 +293,41 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 						}
 					}
 				}
-			}
-		};
+			};
 
-		checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, action);
+		checkpointsInZooKeeper.releaseAndTryRemove(
+			checkpointIdToPath(completedCheckpoint.getCheckpointID()),
+			action);
 	}
 
 	private void removeShutdown(
-			final String pathInZooKeeper,
+			final CompletedCheckpoint completedCheckpoint,
 			final JobStatus jobStatus,
 			final SharedStateRegistry sharedStateRegistry) throws Exception {
 
+		if(completedCheckpoint == null) {
+			return;
+		}
+
 		ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> removeAction = new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
 			@Override
 			public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
-				if (value != null) {
-					final CompletedCheckpoint completedCheckpoint;
-
-					try {
-						completedCheckpoint = value.retrieveState();
-					} catch (Exception e) {
-						throw new FlinkException("Could not retrieve the completed checkpoint from the given state handle.", e);
-					}
-
-					if (completedCheckpoint != null) {
-						try {
-							completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry);
-						} catch (Exception e) {
-							throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
-						}
-					}
+				try {
+					completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry);
+				} catch (Exception e) {
+					throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
 				}
 			}
 		};
 
-		checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, removeAction);
+		checkpointsInZooKeeper.releaseAndTryRemove(
+			checkpointIdToPath(completedCheckpoint.getCheckpointID()),
+			removeAction);
 	}
 
 	private void removeBrokenStateHandle(
-			final String pathInZooKeeper,
-			final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle) throws Exception {
+		final String pathInZooKeeper,
+		final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle) throws Exception {
 		checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
 			@Override
 			public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
index b71418b..da0022c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
@@ -75,7 +74,6 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
 	private static final byte KEY_GROUPS_HANDLE = 3;
 	private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
 	private static final byte INCREMENTAL_KEY_GROUPS_HANDLE = 5;
-	private static final byte PLACEHOLDER_STREAM_STATE_HANDLE = 6;
 
 	/** The singleton instance of the serializer */
 	public static final SavepointV2Serializer INSTANCE = new SavepointV2Serializer();
@@ -328,8 +326,7 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
 
 			serializeStreamStateHandle(incrementalKeyedStateHandle.getMetaStateHandle(), dos);
 
-			serializeStreamStateHandleMap(incrementalKeyedStateHandle.getCreatedSharedState(), dos);
-			serializeStreamStateHandleMap(incrementalKeyedStateHandle.getReferencedSharedState(), dos);
+			serializeStreamStateHandleMap(incrementalKeyedStateHandle.getSharedState(), dos);
 			serializeStreamStateHandleMap(incrementalKeyedStateHandle.getPrivateState(), dos);
 		} else {
 			throw new IllegalStateException("Unknown KeyedStateHandle type: " + stateHandle.getClass());
@@ -390,16 +387,14 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
 				KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
 
 			StreamStateHandle metaDataStateHandle = deserializeStreamStateHandle(dis);
-			Map<StateHandleID, StreamStateHandle> createdStates = deserializeStreamStateHandleMap(dis);
-			Map<StateHandleID, StreamStateHandle> referencedStates = deserializeStreamStateHandleMap(dis);
+			Map<StateHandleID, StreamStateHandle> sharedStates = deserializeStreamStateHandleMap(dis);
 			Map<StateHandleID, StreamStateHandle> privateStates = deserializeStreamStateHandleMap(dis);
 
 			return new IncrementalKeyedStateHandle(
 				operatorId,
 				keyGroupRange,
 				checkpointId,
-				createdStates,
-				referencedStates,
+				sharedStates,
 				privateStates,
 				metaDataStateHandle);
 		} else {
@@ -485,10 +480,6 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
 			byte[] internalData = byteStreamStateHandle.getData();
 			dos.writeInt(internalData.length);
 			dos.write(byteStreamStateHandle.getData());
-		} else if (stateHandle instanceof PlaceholderStreamStateHandle) {
-			PlaceholderStreamStateHandle placeholder = (PlaceholderStreamStateHandle) stateHandle;
-			dos.writeByte(PLACEHOLDER_STREAM_STATE_HANDLE);
-			dos.writeLong(placeholder.getStateSize());
 		} else {
 			throw new IOException("Unknown implementation of StreamStateHandle: " + stateHandle.getClass());
 		}
@@ -510,8 +501,6 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
 			byte[] data = new byte[numBytes];
 			dis.readFully(data);
 			return new ByteStreamStateHandle(handleName, data);
-		} else if (PLACEHOLDER_STREAM_STATE_HANDLE == type) {
-			return new PlaceholderStreamStateHandle(dis.readLong());
 		} else {
 			throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java
index 002b7c3..1bc6a0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java
@@ -34,7 +34,8 @@ package org.apache.flink.runtime.state;
  * this handle and considered as private state until it is registered for the first time. Registration
  * transfers ownership to the {@link SharedStateRegistry}.
  * The composite state handle should only delete all private states in the
- * {@link StateObject#discardState()} method.
+ * {@link StateObject#discardState()} method, the {@link SharedStateRegistry} is responsible for
+ * deleting shared states after they were registered.
  */
 public interface CompositeStateHandle extends StateObject {
 
@@ -45,18 +46,10 @@ public interface CompositeStateHandle extends StateObject {
 	 * <p>
 	 * After this is completed, newly created shared state is considered as published is no longer
 	 * owned by this handle. This means that it should no longer be deleted as part of calls to
-	 * {@link #discardState()}.
+	 * {@link #discardState()}. Instead, {@link #discardState()} will trigger an unregistration
+	 * from the registry.
 	 *
 	 * @param stateRegistry The registry where shared states are registered.
 	 */
 	void registerSharedStates(SharedStateRegistry stateRegistry);
-
-	/**
-	 * Unregister both created and referenced shared states in the given
-	 * {@link SharedStateRegistry}. This method is called when the checkpoint is
-	 * subsumed or the job is shut down.
-	 *
-	 * @param stateRegistry The registry where shared states are registered.
-	 */
-	void unregisterSharedStates(SharedStateRegistry stateRegistry);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
index 706e219..770b5a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
@@ -28,18 +28,24 @@ import java.util.Map;
 /**
  * The handle to states of an incremental snapshot.
  * <p>
- * The states contained in an incremental snapshot include
+ * The states contained in an incremental snapshot include:
  * <ul>
- * <li> Created shared state which includes (the supposed to be) shared files produced since the last
+ * <li> Created shared state which includes shared files produced since the last
  * completed checkpoint. These files can be referenced by succeeding checkpoints if the
  * checkpoint succeeds to complete. </li>
  * <li> Referenced shared state which includes the shared files materialized in previous
- * checkpoints. </li>
+ * checkpoints. Until we this is registered to a {@link SharedStateRegistry}, all referenced
+ * shared state handles are only placeholders, so that we do not send state handles twice
+ * from which we know that they already exist on the checkpoint coordinator.</li>
  * <li> Private state which includes all other files, typically mutable, that cannot be shared by
  * other checkpoints. </li>
  * <li> Backend meta state which includes the information of existing states. </li>
  * </ul>
  *
+ * When this should become a completed checkpoint on the checkpoint coordinator, it must first be
+ * registered with a {@link SharedStateRegistry}, so that all placeholder state handles to
+ * previously existing state are replaced with the originals.
+ *
  * IMPORTANT: This class currently overrides equals and hash code only for testing purposes. They
  * should not be called from production code. This means this class is also not suited to serve as
  * a key, e.g. in hash maps.
@@ -66,14 +72,9 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 	private final long checkpointId;
 
 	/**
-	 * State that the incremental checkpoint created new
-	 */
-	private final Map<StateHandleID, StreamStateHandle> createdSharedState;
-
-	/**
-	 * State that the incremental checkpoint references from previous checkpoints
+	 * Shared state in the incremental checkpoint. This i
 	 */
-	private final Map<StateHandleID, StreamStateHandle> referencedSharedState;
+	private final Map<StateHandleID, StreamStateHandle> sharedState;
 
 	/**
 	 * Private state in the incremental checkpoint
@@ -86,32 +87,30 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 	private final StreamStateHandle metaStateHandle;
 
 	/**
-	 * True if the state handle has already registered shared states.
-	 * <p>
-	 * Once the shared states are registered, it's the {@link SharedStateRegistry}'s
-	 * responsibility to maintain the shared states. But in the cases where the
-	 * state handle is discarded before performing the registration, the handle
-	 * should delete all the shared states created by it.
+	 * Once the shared states are registered, it is the {@link SharedStateRegistry}'s
+	 * responsibility to cleanup those shared states.
+	 * But in the cases where the state handle is discarded before performing the registration,
+	 * the handle should delete all the shared states created by it.
+	 *
+	 * This variable is not null iff the handles was registered.
 	 */
-	private boolean registered;
+	private transient SharedStateRegistry sharedStateRegistry;
 
 	public IncrementalKeyedStateHandle(
 		String operatorIdentifier,
 		KeyGroupRange keyGroupRange,
 		long checkpointId,
-		Map<StateHandleID, StreamStateHandle> createdSharedState,
-		Map<StateHandleID, StreamStateHandle> referencedSharedState,
+		Map<StateHandleID, StreamStateHandle> sharedState,
 		Map<StateHandleID, StreamStateHandle> privateState,
 		StreamStateHandle metaStateHandle) {
 
 		this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);
 		this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
 		this.checkpointId = checkpointId;
-		this.createdSharedState = Preconditions.checkNotNull(createdSharedState);
-		this.referencedSharedState = Preconditions.checkNotNull(referencedSharedState);
+		this.sharedState = Preconditions.checkNotNull(sharedState);
 		this.privateState = Preconditions.checkNotNull(privateState);
 		this.metaStateHandle = Preconditions.checkNotNull(metaStateHandle);
-		this.registered = false;
+		this.sharedStateRegistry = null;
 	}
 
 	@Override
@@ -123,12 +122,8 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 		return checkpointId;
 	}
 
-	public Map<StateHandleID, StreamStateHandle> getCreatedSharedState() {
-		return createdSharedState;
-	}
-
-	public Map<StateHandleID, StreamStateHandle> getReferencedSharedState() {
-		return referencedSharedState;
+	public Map<StateHandleID, StreamStateHandle> getSharedState() {
+		return sharedState;
 	}
 
 	public Map<StateHandleID, StreamStateHandle> getPrivateState() {
@@ -155,8 +150,6 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 	@Override
 	public void discardState() throws Exception {
 
-		Preconditions.checkState(!registered, "Attempt to dispose a registered composite state with registered shared state. Must unregister first.");
-
 		try {
 			metaStateHandle.discardState();
 		} catch (Exception e) {
@@ -169,37 +162,35 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 			LOG.warn("Could not properly discard misc file states.", e);
 		}
 
-		try {
-			StateUtil.bestEffortDiscardAllStateObjects(createdSharedState.values());
-		} catch (Exception e) {
-			LOG.warn("Could not properly discard new sst file states.", e);
+		// If this was not registered, we can delete the shared state. We can simply apply this
+		// to all handles, because all handles that have not been created for the first time for this
+		// are only placeholders at this point (disposing them is a NOP).
+		if (sharedStateRegistry == null) {
+			try {
+				StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
+			} catch (Exception e) {
+				LOG.warn("Could not properly discard new sst file states.", e);
+			}
+		} else {
+			// If this was registered, we only unregister all our referenced shared states
+			// from the registry.
+			for (StateHandleID stateHandleID : sharedState.keySet()) {
+				sharedStateRegistry.unregisterReference(
+					createSharedStateRegistryKeyFromFileName(stateHandleID));
+			}
 		}
-
 	}
 
 	@Override
 	public long getStateSize() {
-		long size = getPrivateStateSize();
-
-		for (StreamStateHandle oldSstFileHandle : referencedSharedState.values()) {
-			size += oldSstFileHandle.getStateSize();
-		}
-
-		return size;
-	}
-
-	/**
-	 * Returns the size of the state that is privately owned by this handle.
-	 */
-	public long getPrivateStateSize() {
 		long size = StateUtil.getStateSize(metaStateHandle);
 
-		for (StreamStateHandle newSstFileHandle : createdSharedState.values()) {
-			size += newSstFileHandle.getStateSize();
+		for (StreamStateHandle sharedStateHandle : sharedState.values()) {
+			size += sharedStateHandle.getStateSize();
 		}
 
-		for (StreamStateHandle miscFileHandle : privateState.values()) {
-			size += miscFileHandle.getStateSize();
+		for (StreamStateHandle privateStateHandle : privateState.values()) {
+			size += privateStateHandle.getStateSize();
 		}
 
 		return size;
@@ -208,64 +199,38 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 	@Override
 	public void registerSharedStates(SharedStateRegistry stateRegistry) {
 
-		Preconditions.checkState(!registered, "The state handle has already registered its shared states.");
+		Preconditions.checkState(sharedStateRegistry == null, "The state handle has already registered its shared states.");
+
+		sharedStateRegistry = Preconditions.checkNotNull(stateRegistry);
 
-		for (Map.Entry<StateHandleID, StreamStateHandle> newSstFileEntry : createdSharedState.entrySet()) {
+		for (Map.Entry<StateHandleID, StreamStateHandle> sharedStateHandle : sharedState.entrySet()) {
 			SharedStateRegistryKey registryKey =
-				createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
+				createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey());
 
 			SharedStateRegistry.Result result =
-				stateRegistry.registerNewReference(registryKey, newSstFileEntry.getValue());
-
-			// We update our reference with the result from the registry, to prevent the following
-			// problem:
+				stateRegistry.registerReference(registryKey, sharedStateHandle.getValue());
+
+			// This step consolidates our shared handles with the registry, which does two things:
+			//
+			// 1) Replace placeholder state handle with already registered, actual state handles.
+			//
+			// 2) Deduplicate re-uploads of incremental state due to missing confirmations about
+			// completed checkpoints.
+			//
+			// This prevents the following problem:
 			// A previous checkpoint n has already registered the state. This can happen if a
 			// following checkpoint (n + x) wants to reference the same state before the backend got
 			// notified that checkpoint n completed. In this case, the shared registry did
 			// deduplication and returns the previous reference.
-			newSstFileEntry.setValue(result.getReference());
-		}
-
-		for (Map.Entry<StateHandleID, StreamStateHandle> oldSstFileName : referencedSharedState.entrySet()) {
-			SharedStateRegistryKey registryKey =
-				createSharedStateRegistryKeyFromFileName(oldSstFileName.getKey());
-
-			SharedStateRegistry.Result result = stateRegistry.obtainReference(registryKey);
-
-			// Again we update our state handle with the result from the registry, thus replacing
-			// placeholder state handles with the originals.
-			oldSstFileName.setValue(result.getReference());
-		}
-
-		// Migrate state from unregistered to registered, so that it will not count as private state
-		// for #discardState() from now.
-		referencedSharedState.putAll(createdSharedState);
-		createdSharedState.clear();
-
-		registered = true;
-	}
-
-	@Override
-	public void unregisterSharedStates(SharedStateRegistry stateRegistry) {
-
-		Preconditions.checkState(registered, "The state handle has not registered its shared states yet.");
-
-		for (Map.Entry<StateHandleID, StreamStateHandle> newSstFileEntry : createdSharedState.entrySet()) {
-			SharedStateRegistryKey registryKey =
-				createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
-			stateRegistry.releaseReference(registryKey);
-		}
-
-		for (Map.Entry<StateHandleID, StreamStateHandle> oldSstFileEntry : referencedSharedState.entrySet()) {
-			SharedStateRegistryKey registryKey =
-				createSharedStateRegistryKeyFromFileName(oldSstFileEntry.getKey());
-			stateRegistry.releaseReference(registryKey);
+			sharedStateHandle.setValue(result.getReference());
 		}
-
-		registered = false;
 	}
 
-	private SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
+	/**
+	 * Create a unique key to register one of our shared state handles.
+	 */
+	@VisibleForTesting
+	public SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
 		return new SharedStateRegistryKey(operatorIdentifier + '-' + keyGroupRange, shId);
 	}
 
@@ -293,10 +258,7 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 		if (!getKeyGroupRange().equals(that.getKeyGroupRange())) {
 			return false;
 		}
-		if (!getCreatedSharedState().equals(that.getCreatedSharedState())) {
-			return false;
-		}
-		if (!getReferencedSharedState().equals(that.getReferencedSharedState())) {
+		if (!getSharedState().equals(that.getSharedState())) {
 			return false;
 		}
 		if (!getPrivateState().equals(that.getPrivateState())) {
@@ -314,8 +276,7 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 		int result = getOperatorIdentifier().hashCode();
 		result = 31 * result + getKeyGroupRange().hashCode();
 		result = 31 * result + (int) (getCheckpointId() ^ (getCheckpointId() >>> 32));
-		result = 31 * result + getCreatedSharedState().hashCode();
-		result = 31 * result + getReferencedSharedState().hashCode();
+		result = 31 * result + getSharedState().hashCode();
 		result = 31 * result + getPrivateState().hashCode();
 		result = 31 * result + getMetaStateHandle().hashCode();
 		return result;

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index 8280460..8e38ad4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -98,11 +98,6 @@ public class KeyGroupsStateHandle implements StreamStateHandle, KeyedStateHandle
 	}
 
 	@Override
-	public void unregisterSharedStates(SharedStateRegistry stateRegistry) {
-		// No shared states
-	}
-
-	@Override
 	public void discardState() throws Exception {
 		stateHandle.discardState();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
index 2136061..7c948a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
@@ -18,29 +18,20 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 
 /**
  * A placeholder state handle for shared state that will replaced by an original that was
- * created in a previous checkpoint. So we don't have to send the handle twice, e.g. in
- * case of {@link ByteStreamStateHandle}. To be used in the referenced states of
+ * created in a previous checkpoint. So we don't have to send a state handle twice, e.g. in
+ * case of {@link ByteStreamStateHandle}. This class is used in the referenced states of
  * {@link IncrementalKeyedStateHandle}.
- * <p>
- * IMPORTANT: This class currently overrides equals and hash code only for testing purposes. They
- * should not be called from production code. This means this class is also not suited to serve as
- * a key, e.g. in hash maps.
  */
 public class PlaceholderStreamStateHandle implements StreamStateHandle {
 
 	private static final long serialVersionUID = 1L;
 
-	/** We remember the size of the original file for which this is a placeholder */
-	private final long originalSize;
-
-	public PlaceholderStreamStateHandle(long originalSize) {
-		this.originalSize = originalSize;
+	public PlaceholderStreamStateHandle() {
 	}
 
 	@Override
@@ -56,33 +47,6 @@ public class PlaceholderStreamStateHandle implements StreamStateHandle {
 
 	@Override
 	public long getStateSize() {
-		return originalSize;
-	}
-
-	/**
-	 * This method is should only be called in tests! This should never serve as key in a hash map.
-	 */
-	@VisibleForTesting
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		PlaceholderStreamStateHandle that = (PlaceholderStreamStateHandle) o;
-
-		return originalSize == that.originalSize;
-	}
-
-	/**
-	 * This method is should only be called in tests! This should never serve as key in a hash map.
-	 */
-	@VisibleForTesting
-	@Override
-	public int hashCode() {
-		return (int) (originalSize ^ (originalSize >>> 32));
+		return 0L;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index f9161b0..a5e0f84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -54,7 +54,7 @@ public class SharedStateRegistry {
 	}
 
 	/**
-	 * Register a reference to the given (supposedly new) shared state in the registry.
+	 * Register a reference to the given shared state in the registry.
 	 * This does the following: We check if the state handle is actually new by the
 	 * registrationKey. If it is new, we register it with a reference count of 1. If there is
 	 * already a state handle registered under the given key, we dispose the given "new" state
@@ -62,14 +62,14 @@ public class SharedStateRegistry {
 	 * a replacement with the result.
 	 *
 	 * <p>IMPORTANT: caller should check the state handle returned by the result, because the
-	 * registry is performing deduplication and could potentially return a handle that is supposed
+	 * registry is performing de-duplication and could potentially return a handle that is supposed
 	 * to replace the one from the registration request.
 	 *
 	 * @param state the shared state for which we register a reference.
 	 * @return the result of this registration request, consisting of the state handle that is
 	 * registered under the key by the end of the oepration and its current reference count.
 	 */
-	public Result registerNewReference(SharedStateRegistryKey registrationKey, StreamStateHandle state) {
+	public Result registerReference(SharedStateRegistryKey registrationKey, StreamStateHandle state) {
 
 		Preconditions.checkNotNull(state);
 
@@ -96,28 +96,6 @@ public class SharedStateRegistry {
 	}
 
 	/**
-	 * Obtains one reference to the given shared state in the registry. This increases the
-	 * reference count by one.
-	 *
-	 * @param registrationKey the shared state for which we obtain a reference.
-	 * @return the shared state for which we release a reference.
-	 * @return the result of the request, consisting of the reference count after this operation
-	 * and the state handle.
-	 */
-	public Result obtainReference(SharedStateRegistryKey registrationKey) {
-
-		Preconditions.checkNotNull(registrationKey);
-
-		synchronized (registeredStates) {
-			SharedStateRegistry.SharedStateEntry entry =
-				Preconditions.checkNotNull(registeredStates.get(registrationKey),
-					"Could not find a state for the given registration key!");
-			entry.increaseReferenceCount();
-			return new Result(entry);
-		}
-	}
-
-	/**
 	 * Releases one reference to the given shared state in the registry. This decreases the
 	 * reference count by one. Once the count reaches zero, the shared state is deleted.
 	 *
@@ -125,7 +103,7 @@ public class SharedStateRegistry {
 	 * @return the result of the request, consisting of the reference count after this operation
 	 * and the state handle, or null if the state handle was deleted through this request.
 	 */
-	public Result releaseReference(SharedStateRegistryKey registrationKey) {
+	public Result unregisterReference(SharedStateRegistryKey registrationKey) {
 
 		Preconditions.checkNotNull(registrationKey);
 
@@ -172,30 +150,18 @@ public class SharedStateRegistry {
 		}
 	}
 
-	/**
-	 * Unregister all the shared states referenced by the given.
-	 *
-	 * @param stateHandles The shared states to unregister.
-	 */
-	public void unregisterAll(Iterable<? extends CompositeStateHandle> stateHandles) {
-		if (stateHandles == null) {
-			return;
-		}
-
-		synchronized (registeredStates) {
-			for (CompositeStateHandle stateHandle : stateHandles) {
-				stateHandle.unregisterSharedStates(this);
-			}
-		}
-	}
-
 	private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
-		if (streamStateHandle != null) {
+		// We do the small optimization to not issue discards for placeholders, which are NOPs.
+		if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) {
 			asyncDisposalExecutor.execute(
 				new SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle));
 		}
 	}
 
+	private boolean isPlaceholder(StreamStateHandle stateHandle) {
+		return stateHandle instanceof PlaceholderStreamStateHandle;
+	}
+
 	/**
 	 * An entry in the registry, tracking the handle and the corresponding reference count.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
index 42703f8..9ba9d35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
@@ -91,6 +91,13 @@ public class ByteStreamStateHandle implements StreamStateHandle {
 		return 31 * handleName.hashCode();
 	}
 
+	@Override
+	public String toString() {
+		return "ByteStreamStateHandle{" +
+			"handleName='" + handleName + '\'' +
+			'}';
+	}
+
 	/**
 	 * An input stream view on a byte array.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 9250634..3b44d9a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -639,12 +639,6 @@ public class CheckpointCoordinatorTest {
 			assertEquals(checkpointIdNew, successNew.getCheckpointID());
 			assertTrue(successNew.getOperatorStates().isEmpty());
 
-			// validate that the subtask states in old savepoint have unregister their shared states
-			{
-				verify(subtaskState1, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
-				verify(subtaskState2, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
-			}
-
 			// validate that the relevant tasks got a confirmation message
 			{
 				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class));
@@ -925,9 +919,6 @@ public class CheckpointCoordinatorTest {
 			verify(subtaskState1_2, times(1)).discardState();
 
 			// validate that all subtask states in the second checkpoint are not discarded
-			verify(subtaskState2_1, never()).unregisterSharedStates(any(SharedStateRegistry.class));
-			verify(subtaskState2_2, never()).unregisterSharedStates(any(SharedStateRegistry.class));
-			verify(subtaskState2_3, never()).unregisterSharedStates(any(SharedStateRegistry.class));
 			verify(subtaskState2_1, never()).discardState();
 			verify(subtaskState2_2, never()).discardState();
 			verify(subtaskState2_3, never()).discardState();
@@ -951,9 +942,6 @@ public class CheckpointCoordinatorTest {
 			coord.shutdown(JobStatus.FINISHED);
 
 			// validate that the states in the second checkpoint have been discarded
-			verify(subtaskState2_1, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
-			verify(subtaskState2_2, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
-			verify(subtaskState2_3, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
 			verify(subtaskState2_1, times(1)).discardState();
 			verify(subtaskState2_2, times(1)).discardState();
 			verify(subtaskState2_3, times(1)).discardState();
@@ -1562,10 +1550,6 @@ public class CheckpointCoordinatorTest {
 		verify(subtaskState1, never()).discardState();
 		verify(subtaskState2, never()).discardState();
 
-		// Savepoints are not supposed to have any shared state.
-		verify(subtaskState1, never()).unregisterSharedStates(any(SharedStateRegistry.class));
-		verify(subtaskState2, never()).unregisterSharedStates(any(SharedStateRegistry.class));
-
 		// validate that the relevant tasks got a confirmation message
 		{
 			verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class));
@@ -2088,15 +2072,6 @@ public class CheckpointCoordinatorTest {
 		// shutdown the store
 		store.shutdown(JobStatus.SUSPENDED);
 
-		// All shared states should be unregistered once the store is shut down
-		for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
-			for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) {
-				for (OperatorSubtaskState subtaskState : taskState.getStates()) {
-					verify(subtaskState, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
-				}
-			}
-		}
-
 		// restore the store
 		Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 985c662..fb5d7c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -37,11 +37,6 @@ import java.util.concurrent.TimeUnit;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 
 /**
  * Test for basic {@link CompletedCheckpointStore} contract.
@@ -114,12 +109,6 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 			expected[i - 1].awaitDiscard();
 			assertTrue(expected[i - 1].isDiscarded());
 			assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints());
-
-			for (OperatorState operatorState : taskStates) {
-				for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
-					verify(subtaskState, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
-				}
-			}
 		}
 	}
 
@@ -209,7 +198,8 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 		operatorGroupState.put(operatorID, operatorState);
 
 		for (int i = 0; i < numberOfStates; i++) {
-			OperatorSubtaskState subtaskState = mock(OperatorSubtaskState.class);
+			OperatorSubtaskState subtaskState =
+				new TestOperatorSubtaskState();
 
 			operatorState.putState(i, subtaskState);
 		}
@@ -217,18 +207,10 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 		return new TestCompletedCheckpoint(new JobID(), id, 0, operatorGroupState, props);
 	}
 
-	protected void resetCheckpoint(Collection<OperatorState> operatorStates) {
-		for (OperatorState operatorState : operatorStates) {
-			for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
-				Mockito.reset(subtaskState);
-			}
-		}
-	}
-
 	protected void verifyCheckpointRegistered(Collection<OperatorState> operatorStates, SharedStateRegistry registry) {
 		for (OperatorState operatorState : operatorStates) {
 			for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
-				verify(subtaskState, times(1)).registerSharedStates(eq(registry));
+				Assert.assertTrue(((TestOperatorSubtaskState)subtaskState).registered);
 			}
 		}
 	}
@@ -236,7 +218,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 	protected void verifyCheckpointDiscarded(Collection<OperatorState> operatorStates) {
 		for (OperatorState operatorState : operatorStates) {
 			for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
-				verify(subtaskState, times(1)).discardState();
+				Assert.assertTrue(((TestOperatorSubtaskState)subtaskState).discarded);
 			}
 		}
 	}
@@ -333,4 +315,37 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 		}
 	}
 
+	static class TestOperatorSubtaskState extends OperatorSubtaskState {
+		private static final long serialVersionUID = 522580433699164230L;
+
+		boolean registered;
+		boolean discarded;
+
+		public TestOperatorSubtaskState() {
+			super(null, null, null, null, null);
+			this.registered = false;
+			this.discarded = false;
+		}
+
+		@Override
+		public void discardState() {
+			super.discardState();
+			Assert.assertFalse(discarded);
+			discarded = true;
+			registered = false;
+		}
+
+		@Override
+		public void registerSharedStates(SharedStateRegistry sharedStateRegistry) {
+			super.registerSharedStates(sharedStateRegistry);
+			Assert.assertFalse(discarded);
+			registered = true;
+		}
+
+		public void reset() {
+			registered = false;
+			discarded = false;
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 589ff46..0bbb961 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -100,7 +100,6 @@ public class CompletedCheckpointTest {
 		checkpoint.discardOnSubsume(sharedStateRegistry);
 
 		verify(state, times(1)).discardState();
-		verify(state, times(1)).unregisterSharedStates(sharedStateRegistry);
 	}
 
 	/**
@@ -138,7 +137,6 @@ public class CompletedCheckpointTest {
 			checkpoint.discardOnShutdown(status, sharedStateRegistry);
 			verify(state, times(0)).discardState();
 			assertEquals(true, file.exists());
-			verify(state, times(0)).unregisterSharedStates(sharedStateRegistry);
 
 			// Discard
 			props = new CheckpointProperties(false, false, true, true, true, true, true);
@@ -152,7 +150,6 @@ public class CompletedCheckpointTest {
 
 			checkpoint.discardOnShutdown(status, sharedStateRegistry);
 			verify(state, times(1)).discardState();
-			verify(state, times(1)).unregisterSharedStates(sharedStateRegistry);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 6df01a0..a96b597 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -197,7 +197,6 @@ public class PendingCheckpointTest {
 
 		OperatorState state = mock(OperatorState.class);
 		doNothing().when(state).registerSharedStates(any(SharedStateRegistry.class));
-		doNothing().when(state).unregisterSharedStates(any(SharedStateRegistry.class));
 
 		String targetDir = tmpFolder.newFolder().getAbsolutePath();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 0d93289..44c802b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
@@ -100,11 +101,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
 		assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
 
-		resetCheckpoint(expected[0].getOperatorStates().values());
-		resetCheckpoint(expected[1].getOperatorStates().values());
-		resetCheckpoint(expected[2].getOperatorStates().values());
-
-		// Recover TODO!!! clear registry!
+		// Recover
 		checkpoints.recover();
 
 		assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());

http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
index b63782d..f985573 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle.StateMetaInfo;
-import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
@@ -273,18 +272,17 @@ public class CheckpointTestUtils {
 	private CheckpointTestUtils() {}
 
 
-	private static IncrementalKeyedStateHandle createDummyIncrementalKeyedStateHandle(Random rnd) {
+	public static IncrementalKeyedStateHandle createDummyIncrementalKeyedStateHandle(Random rnd) {
 		return new IncrementalKeyedStateHandle(
 			createRandomUUID(rnd).toString(),
 			new KeyGroupRange(1, 1),
 			42L,
-			createRandomOwnedHandleMap(rnd),
-			createRandomReferencedHandleMap(rnd),
-			createRandomOwnedHandleMap(rnd),
+			createRandomStateHandleMap(rnd),
+			createRandomStateHandleMap(rnd),
 			createDummyStreamStateHandle(rnd));
 	}
 
-	private static Map<StateHandleID, StreamStateHandle> createRandomOwnedHandleMap(Random rnd) {
+	public static Map<StateHandleID, StreamStateHandle> createRandomStateHandleMap(Random rnd) {
 		final int size = rnd.nextInt(4);
 		Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size);
 		for (int i = 0; i < size; ++i) {
@@ -296,24 +294,13 @@ public class CheckpointTestUtils {
 		return result;
 	}
 
-	private static Map<StateHandleID, StreamStateHandle> createRandomReferencedHandleMap(Random rnd) {
-		final int size = rnd.nextInt(4);
-		Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size);
-		for (int i = 0; i < size; ++i) {
-			StateHandleID randomId = new StateHandleID(createRandomUUID(rnd).toString());
-			result.put(randomId, new PlaceholderStreamStateHandle(rnd.nextInt(1024)));
-		}
-
-		return result;
-	}
-
-	private static KeyGroupsStateHandle createDummyKeyGroupStateHandle(Random rnd) {
+	public static KeyGroupsStateHandle createDummyKeyGroupStateHandle(Random rnd) {
 		return new KeyGroupsStateHandle(
 			new KeyGroupRangeOffsets(1, 1, new long[]{rnd.nextInt(1024)}),
 			createDummyStreamStateHandle(rnd));
 	}
 
-	private static StreamStateHandle createDummyStreamStateHandle(Random rnd) {
+	public static StreamStateHandle createDummyStreamStateHandle(Random rnd) {
 		return new TestByteStreamStateHandleDeepCompare(
 			String.valueOf(createRandomUUID(rnd)),
 			String.valueOf(createRandomUUID(rnd)).getBytes(ConfigConstants.DEFAULT_CHARSET));