You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/03 07:57:41 UTC

[1/2] flink git commit: [FLINK-5173] Upgrade RocksDB dependency

Repository: flink
Updated Branches:
  refs/heads/master 4e336c692 -> e4f802dd5


[FLINK-5173] Upgrade RocksDB dependency


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e98a935
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e98a935
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e98a935

Branch: refs/heads/master
Commit: 6e98a9357903b113f76b2a7e91e2049c6e88fb35
Parents: 4e336c6
Author: Stefan Richter <s....@data-artisans.com>
Authored: Sun Nov 13 05:04:03 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Dec 2 18:00:30 2016 +0100

----------------------------------------------------------------------
 flink-contrib/flink-statebackend-rocksdb/pom.xml                 | 2 +-
 .../flink/contrib/streaming/state/RocksDBKeyedStateBackend.java  | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6e98a935/flink-contrib/flink-statebackend-rocksdb/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml b/flink-contrib/flink-statebackend-rocksdb/pom.xml
index ef5e563..f2607e3 100644
--- a/flink-contrib/flink-statebackend-rocksdb/pom.xml
+++ b/flink-contrib/flink-statebackend-rocksdb/pom.xml
@@ -57,7 +57,7 @@ under the License.
 		<dependency>
 			<groupId>org.rocksdb</groupId>
 			<artifactId>rocksdbjni</artifactId>
-			<version>4.5.1</version>
+			<version>4.11.2</version>
 		</dependency>
 
 		<!-- test dependencies -->

http://git-wip-us.apache.org/repos/asf/flink/blob/6e98a935/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index f332d1e..e498b34 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -231,9 +231,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		// Dispose decoupled db
 		if (cleanupRockDBReference != null) {
 			for (Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> column : kvStateInformation.values()) {
-				column.f0.dispose();
+				column.f0.close();
 			}
-			cleanupRockDBReference.dispose();
+			cleanupRockDBReference.close();
 		}
 
 		try {


[2/2] flink git commit: [FLINK-5146] Improved resource cleanup in RocksDB keyed state backend

Posted by al...@apache.org.
[FLINK-5146] Improved resource cleanup in RocksDB keyed state backend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4f802dd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4f802dd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4f802dd

Branch: refs/heads/master
Commit: e4f802dd502f38b922f668c2813728d5511ca289
Parents: 6e98a93
Author: Stefan Richter <s....@data-artisans.com>
Authored: Sat Nov 12 21:13:28 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Dec 2 18:10:13 2016 +0100

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 312 ++++++++++------
 .../state/RocksDBStateBackendTest.java          | 364 ++++++++++++++++++-
 .../io/async/AbstractAsyncIOCallable.java       |   2 +-
 .../runtime/io/async/AsyncDoneCallback.java     |   4 +-
 .../async/AsyncStoppableTaskWithCallback.java   |   8 +-
 .../memory/MemCheckpointStreamFactory.java      |   2 +-
 .../api/operators/OperatorSnapshotResult.java   |   9 +-
 .../streaming/runtime/tasks/StreamTask.java     |  67 ++--
 .../flink/core/testutils/OneShotLatch.java      |   9 +
 9 files changed, 624 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index e498b34..bc5b17d 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -61,7 +61,7 @@ import org.rocksdb.Snapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.concurrent.GuardedBy;
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -103,14 +103,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 * asynchronous checkpoints and when disposing the DB. Otherwise, the asynchronous snapshot might try
 	 * iterating over a disposed DB. After aquriring the lock, always first check if (db == null).
 	 */
-	private final SerializableObject dbDisposeLock = new SerializableObject();
+	private final SerializableObject asyncSnapshotLock = new SerializableObject();
 
 	/**
 	 * Our RocksDB data base, this is used by the actual subclasses of {@link AbstractRocksDBState}
 	 * to store state. The different k/v states that we have don't each have their own RocksDB
 	 * instance. They all write to this instance but to their own column family.
 	 */
-	@GuardedBy("dbDisposeLock")
 	protected RocksDB db;
 
 	/**
@@ -136,7 +135,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	) throws Exception {
 
 		super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange);
-
 		this.operatorIdentifier = operatorIdentifier;
 		this.jobId = jobId;
 		this.columnOptions = columnFamilyOptions;
@@ -206,8 +204,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			LOG.debug("Restoring snapshot from state handles: {}.", restoreState);
 		}
 
-		RocksDBRestoreOperation restoreOperation = new RocksDBRestoreOperation(this);
-		restoreOperation.doRestore(restoreState);
+		try {
+			RocksDBRestoreOperation restoreOperation = new RocksDBRestoreOperation(this);
+			restoreOperation.doRestore(restoreState);
+		} catch (Exception ex) {
+			dispose();
+			throw ex;
+		}
 	}
 
 	/**
@@ -217,23 +220,22 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	public void dispose() {
 		super.dispose();
 
-		final RocksDB cleanupRockDBReference;
-
-		// Acquire the log on dbDisposeLock, so that no ongoing snapshots access the db during cleanup
-		synchronized (dbDisposeLock) {
+		// Acquire the lock, so that no ongoing snapshots access the db during cleanup
+		synchronized (asyncSnapshotLock) {
 			// IMPORTANT: null reference to signal potential async checkpoint workers that the db was disposed, as
 			// working on the disposed object results in SEGFAULTS. Other code has to check field #db for null
 			// and access it in a synchronized block that locks on #dbDisposeLock.
-			cleanupRockDBReference = db;
-			db = null;
-		}
+			if (db != null) {
 
-		// Dispose decoupled db
-		if (cleanupRockDBReference != null) {
-			for (Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> column : kvStateInformation.values()) {
-				column.f0.close();
+				for (Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> column : kvStateInformation.values()) {
+					column.f0.close();
+				}
+
+				kvStateInformation.clear();
+
+				db.close();
+				db = null;
 			}
-			cleanupRockDBReference.close();
 		}
 
 		try {
@@ -252,10 +254,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 * is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always
 	 * be called by the same thread.
 	 *
-	 * @param checkpointId The Id of the checkpoint.
-	 * @param timestamp The timestamp of the checkpoint.
+	 * @param checkpointId  The Id of the checkpoint.
+	 * @param timestamp     The timestamp of the checkpoint.
 	 * @param streamFactory The factory that we can use for writing our state to streams.
-	 *
 	 * @return Future to the state handle of the snapshot data.
 	 * @throws Exception
 	 */
@@ -267,14 +268,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		long startTime = System.currentTimeMillis();
 
-		if (kvStateInformation.isEmpty()) {
-			LOG.info("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp + " . Returning null.");
-			return new DoneFuture<>(null);
-		}
-
 		final RocksDBSnapshotOperation snapshotOperation = new RocksDBSnapshotOperation(this, streamFactory);
 		// hold the db lock while operation on the db to guard us against async db disposal
-		synchronized (dbDisposeLock) {
+		synchronized (asyncSnapshotLock) {
+
+			if (kvStateInformation.isEmpty()) {
+				LOG.info("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp +
+						" . Returning null.");
+
+				return new DoneFuture<>(null);
+			}
+
 			if (db != null) {
 				snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
 			} else {
@@ -295,18 +299,18 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					@Override
 					public KeyGroupsStateHandle performOperation() throws Exception {
 						long startTime = System.currentTimeMillis();
-						try {
-							// hold the db lock while operation on the db to guard us against async db disposal
-							synchronized (dbDisposeLock) {
-								if (db != null) {
-									snapshotOperation.writeDBSnapshot();
-								} else {
+						synchronized (asyncSnapshotLock) {
+							try {
+								// hold the db lock while operation on the db to guard us against async db disposal
+								if (db == null) {
 									throw new IOException("RocksDB closed.");
 								}
-							}
 
-						} finally {
-							snapshotOperation.closeCheckpointStream();
+								snapshotOperation.writeDBSnapshot();
+
+							} finally {
+								snapshotOperation.closeCheckpointStream();
+							}
 						}
 
 						LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", asynchronous part) in thread " +
@@ -315,15 +319,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 						return snapshotOperation.getSnapshotResultStateHandle();
 					}
 
-					@Override
-					public void done() {
+					private void releaseSnapshotOperationResources(boolean canceled) {
 						// hold the db lock while operation on the db to guard us against async db disposal
-						synchronized (dbDisposeLock) {
-							if (db != null) {
-								snapshotOperation.releaseDBSnapshot();
-							}
+						synchronized (asyncSnapshotLock) {
+							snapshotOperation.releaseSnapshotResources(canceled);
 						}
 					}
+
+					@Override
+					public void done(boolean canceled) {
+						releaseSnapshotOperationResources(canceled);
+					}
 				};
 
 		LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", synchronous part) in thread " +
@@ -348,14 +354,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private long checkpointTimeStamp;
 
 		private Snapshot snapshot;
+		private ReadOptions readOptions;
 		private CheckpointStreamFactory.CheckpointStateOutputStream outStream;
 		private DataOutputView outputView;
 		private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
 		private KeyGroupsStateHandle snapshotResultStateHandle;
 
-
-
-		public RocksDBSnapshotOperation(
+		RocksDBSnapshotOperation(
 				RocksDBKeyedStateBackend<?> stateBackend,
 				CheckpointStreamFactory checkpointStreamFactory) {
 
@@ -397,7 +402,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 * @throws IOException
 		 */
 		public void writeDBSnapshot() throws IOException, InterruptedException {
-			Preconditions.checkNotNull(snapshot, "No ongoing snapshot to write.");
+
+			if (null == snapshot) {
+				throw new IOException("No snapshot available. Might be released due to cancellation.");
+			}
+
 			Preconditions.checkNotNull(outStream, "No output stream to write snapshot.");
 			writeKVStateMetaData();
 			writeKVStateData();
@@ -412,6 +421,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			if (outStream != null) {
 				stateBackend.cancelStreamRegistry.unregisterClosable(outStream);
 				snapshotResultStateHandle = closeSnapshotStreamAndGetHandle();
+			} else {
+				snapshotResultStateHandle = null;
 			}
 		}
 
@@ -419,13 +430,36 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 * 5) Release the snapshot object for RocksDB and clean up.
 		 *
 		 */
-		public void releaseDBSnapshot() {
-			Preconditions.checkNotNull(snapshot, "No ongoing snapshot to release.");
-			stateBackend.db.releaseSnapshot(snapshot);
-			snapshot = null;
-			outStream = null;
-			outputView = null;
-			kvStateIterators = null;
+		public void releaseSnapshotResources(boolean canceled) {
+			if (null != kvStateIterators) {
+				for (Tuple2<RocksIterator, Integer> kvStateIterator : kvStateIterators) {
+					kvStateIterator.f0.close();
+				}
+				kvStateIterators = null;
+			}
+
+			if (null != snapshot) {
+				if(null != stateBackend.db) {
+					stateBackend.db.releaseSnapshot(snapshot);
+				}
+				snapshot.close();
+				snapshot = null;
+			}
+
+			if (null != readOptions) {
+				readOptions.close();
+				readOptions = null;
+			}
+
+			if (canceled) {
+				try {
+					if (null != snapshotResultStateHandle) {
+						snapshotResultStateHandle.discardState();
+					}
+				} catch (Exception ignored) {
+					LOG.warn("Exception occurred during snapshot state handle cleanup: " + ignored);
+				}
+			}
 		}
 
 		/**
@@ -462,7 +496,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				InstantiationUtil.serializeObject(outStream, column.getValue().f1);
 
 				//retrieve iterator for this k/v states
-				ReadOptions readOptions = new ReadOptions();
+				readOptions = new ReadOptions();
 				readOptions.setSnapshot(snapshot);
 				RocksIterator iterator = stateBackend.db.newIterator(column.getValue().f0, readOptions);
 				kvStateIterators.add(new Tuple2<>(iterator, kvStateId));
@@ -472,59 +506,64 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		private void writeKVStateData() throws IOException, InterruptedException {
 
-			RocksDBMergeIterator iterator = new RocksDBMergeIterator(kvStateIterators, stateBackend.keyGroupPrefixBytes);
-
 			byte[] previousKey = null;
 			byte[] previousValue = null;
 
-			//preamble: setup with first key-group as our lookahead
-			if (iterator.isValid()) {
-				//begin first key-group by recording the offset
-				keyGroupRangeOffsets.setKeyGroupOffset(iterator.keyGroup(), outStream.getPos());
-				//write the k/v-state id as metadata
-				//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-				outputView.writeShort(iterator.kvStateId());
-				previousKey = iterator.key();
-				previousValue = iterator.value();
-				iterator.next();
-			}
+			List<Tuple2<RocksIterator, Integer>> kvStateIteratorsHandover = this.kvStateIterators;
+			this.kvStateIterators = null;
 
-			//main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets.
-			while (iterator.isValid()) {
+			// Here we transfer ownership of RocksIterators to the RocksDBMergeIterator
+			try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator(
+					kvStateIteratorsHandover, stateBackend.keyGroupPrefixBytes)) {
 
-				assert (!hasMetaDataFollowsFlag(previousKey));
+				//preamble: setup with first key-group as our lookahead
+				if (mergeIterator.isValid()) {
+					//begin first key-group by recording the offset
+					keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos());
+					//write the k/v-state id as metadata
+					//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+					outputView.writeShort(mergeIterator.kvStateId());
+					previousKey = mergeIterator.key();
+					previousValue = mergeIterator.value();
+					mergeIterator.next();
+				}
 
-				//set signal in first key byte that meta data will follow in the stream after this k/v pair
-				if (iterator.isNewKeyGroup() || iterator.isNewKeyValueState()) {
+				//main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets.
+				while (mergeIterator.isValid()) {
 
-					//be cooperative and check for interruption from time to time in the hot loop
-					checkInterrupted();
+					assert (!hasMetaDataFollowsFlag(previousKey));
 
-					setMetaDataFollowsFlagInKey(previousKey);
-				}
+					//set signal in first key byte that meta data will follow in the stream after this k/v pair
+					if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
 
-				writeKeyValuePair(previousKey, previousValue);
+						//be cooperative and check for interruption from time to time in the hot loop
+						checkInterrupted();
 
-				//write meta data if we have to
-				if (iterator.isNewKeyGroup()) {
-					//
-					//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-					outputView.writeShort(END_OF_KEY_GROUP_MARK);
-					//begin new key-group
-					keyGroupRangeOffsets.setKeyGroupOffset(iterator.keyGroup(), outStream.getPos());
-					//write the kev-state
-					//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-					outputView.writeShort(iterator.kvStateId());
-				} else if (iterator.isNewKeyValueState()) {
-					//write the k/v-state
-					//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-					outputView.writeShort(iterator.kvStateId());
-				}
+						setMetaDataFollowsFlagInKey(previousKey);
+					}
 
-				//request next k/v pair
-				previousKey = iterator.key();
-				previousValue = iterator.value();
-				iterator.next();
+					writeKeyValuePair(previousKey, previousValue);
+
+					//write meta data if we have to
+					if (mergeIterator.isNewKeyGroup()) {
+						//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+						outputView.writeShort(END_OF_KEY_GROUP_MARK);
+						//begin new key-group
+						keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos());
+						//write the kev-state
+						//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+						outputView.writeShort(mergeIterator.kvStateId());
+					} else if (mergeIterator.isNewKeyValueState()) {
+						//write the k/v-state
+						//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+						outputView.writeShort(mergeIterator.kvStateId());
+					}
+
+					//request next k/v pair
+					previousKey = mergeIterator.key();
+					previousValue = mergeIterator.value();
+					mergeIterator.next();
+				}
 			}
 
 			//epilogue: write last key-group
@@ -540,11 +579,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private KeyGroupsStateHandle closeSnapshotStreamAndGetHandle() throws IOException {
 			StreamStateHandle stateHandle = outStream.closeAndGetHandle();
 			outStream = null;
-			if (stateHandle != null) {
-				return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
-			} else {
-				throw new IOException("Output stream returned null on close.");
-			}
+			return stateHandle != null ? new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle) : null;
 		}
 
 		private void writeKeyValuePair(byte[] key, byte[] value) throws IOException {
@@ -566,7 +601,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		private static void checkInterrupted() throws InterruptedException {
 			if(Thread.currentThread().isInterrupted()) {
-				throw new InterruptedException("Snapshot canceled.");
+				throw new InterruptedException("RocksDB snapshot interrupted.");
 			}
 		}
 	}
@@ -655,7 +690,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			//restore the empty columns for the k/v states through the metadata
 			for (int i = 0; i < numColumns; i++) {
 
-				StateDescriptor<?, ?> stateDescriptor = (StateDescriptor<?, ?>) InstantiationUtil.deserializeObject(
+				StateDescriptor<?, ?> stateDescriptor = InstantiationUtil.deserializeObject(
 						currentStateHandleInStream,
 						rocksDBKeyedStateBackend.userCodeClassLoader);
 
@@ -829,13 +864,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		public int getKvStateId() {
 			return kvStateId;
 		}
+
+		public void close() {
+			this.iterator.close();
+		}
 	}
 
 	/**
 	 * Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups.
 	 * The resulting iteration sequence is ordered by (key-group, kv-state).
 	 */
-	static final class RocksDBMergeIterator {
+	static final class RocksDBMergeIterator implements Closeable {
 
 		private final PriorityQueue<MergeIterator> heap;
 		private final int keyGroupPrefixByteCount;
@@ -845,18 +884,29 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		private MergeIterator currentSubIterator;
 
-		RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) throws IOException {
+		private static final List<Comparator<MergeIterator>> COMPARATORS;
+
+		static {
+			int maxBytes = 4;
+			COMPARATORS = new ArrayList<>(maxBytes);
+			for (int i = 0; i < maxBytes; ++i) {
+				final int currentBytes = i;
+				COMPARATORS.add(new Comparator<MergeIterator>() {
+					@Override
+					public int compare(MergeIterator o1, MergeIterator o2) {
+						int arrayCmpRes = compareKeyGroupsForByteArrays(
+								o1.currentKey, o2.currentKey, currentBytes);
+						return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
+					}
+				});
+			}
+		}
+
+		RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) {
 			Preconditions.checkNotNull(kvStateIterators);
 			this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
 
-			Comparator<MergeIterator> iteratorComparator = new Comparator<MergeIterator>() {
-				@Override
-				public int compare(MergeIterator o1, MergeIterator o2) {
-					int arrayCmpRes = compareKeyGroupsForByteArrays(
-							o1.currentKey, o2.currentKey, keyGroupPrefixByteCount);
-					return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
-				}
-			};
+			Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount);
 
 			if (kvStateIterators.size() > 0) {
 				this.heap = new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
@@ -866,8 +916,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					rocksIterator.seekToFirst();
 					if (rocksIterator.isValid()) {
 						heap.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
+					} else {
+						rocksIterator.close();
 					}
 				}
+
+				kvStateIterators.clear();
+
 				this.valid = !heap.isEmpty();
 				this.currentSubIterator = heap.poll();
 			} else {
@@ -901,14 +956,18 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					newKVState = currentSubIterator.getIterator() != rocksIterator;
 					detectNewKeyGroup(oldKey);
 				}
-			} else if (heap.isEmpty()) {
-				valid = false;
 			} else {
-				currentSubIterator = heap.poll();
-				newKVState = true;
-				detectNewKeyGroup(oldKey);
-			}
+				rocksIterator.close();
 
+				if (heap.isEmpty()) {
+					currentSubIterator = null;
+					valid = false;
+				} else {
+					currentSubIterator = heap.poll();
+					newKVState = true;
+					detectNewKeyGroup(oldKey);
+				}
+			}
 		}
 
 		private boolean isDifferentKeyGroup(byte[] a, byte[] b) {
@@ -986,6 +1045,21 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			}
 			return 0;
 		}
+
+		@Override
+		public void close() {
+
+			if (null != currentSubIterator) {
+				currentSubIterator.close();
+				currentSubIterator = null;
+			}
+
+			for (MergeIterator iterator : heap) {
+				iterator.close();
+			}
+
+			heap.clear();
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 9d25434..314717b 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -18,26 +18,72 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StateBackendTestBase;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 import org.apache.flink.util.OperatingSystem;
 import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Rule;
+import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.RocksObject;
+import org.rocksdb.Snapshot;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.RunnableFuture;
+
+import static junit.framework.TestCase.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.verify;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.spy;
 
 /**
  * Tests for the partitioned state part of {@link RocksDBStateBackend}.
  */
 public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBackend> {
 
+	private OneShotLatch blocker;
+	private OneShotLatch waiter;
+	private BlockerCheckpointStreamFactory testStreamFactory;
+	private RocksDBKeyedStateBackend<Integer> keyedStateBackend;
+	private List<RocksObject> allCreatedCloseables;
+	private ValueState<Integer> testState1;
+	private ValueState<String> testState2;
+
 	@Rule
 	public TemporaryFolder tempFolder = new TemporaryFolder();
 
 	@Before
-	public void checkOperatingSystem() {
+	public void checkOS() throws Exception {
 		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
 	}
 
@@ -49,4 +95,320 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 		backend.setDbStoragePath(dbPath);
 		return backend;
 	}
+
+	public void setupRocksKeyedStateBackend() throws Exception {
+
+		blocker = new OneShotLatch();
+		waiter = new OneShotLatch();
+		testStreamFactory = new BlockerCheckpointStreamFactory(1024 * 1024);
+		testStreamFactory.setBlockerLatch(blocker);
+		testStreamFactory.setWaiterLatch(waiter);
+		testStreamFactory.setAfterNumberInvocations(100);
+
+		RocksDBStateBackend backend = getStateBackend();
+		Environment env = new DummyEnvironment("TestTask", 1, 0);
+
+		keyedStateBackend = (RocksDBKeyedStateBackend<Integer>) backend.createKeyedStateBackend(
+				env,
+				new JobID(),
+				"Test",
+				IntSerializer.INSTANCE,
+				2,
+				new KeyGroupRange(0, 1),
+				mock(TaskKvStateRegistry.class));
+
+		testState1 = keyedStateBackend.getPartitionedState(
+				VoidNamespace.INSTANCE,
+				VoidNamespaceSerializer.INSTANCE,
+				new ValueStateDescriptor<>("TestState-1", Integer.class, 0));
+
+		testState2 = keyedStateBackend.getPartitionedState(
+				VoidNamespace.INSTANCE,
+				VoidNamespaceSerializer.INSTANCE,
+				new ValueStateDescriptor<>("TestState-2", String.class, ""));
+
+		allCreatedCloseables = new ArrayList<>();
+
+		keyedStateBackend.db = spy(keyedStateBackend.db);
+
+		doAnswer(new Answer<Object>() {
+
+			@Override
+			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				RocksIterator rocksIterator = spy((RocksIterator) invocationOnMock.callRealMethod());
+				allCreatedCloseables.add(rocksIterator);
+				return rocksIterator;
+			}
+		}).when(keyedStateBackend.db).newIterator(any(ColumnFamilyHandle.class), any(ReadOptions.class));
+
+		doAnswer(new Answer<Object>() {
+
+			@Override
+			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				Snapshot snapshot = spy((Snapshot) invocationOnMock.callRealMethod());
+				allCreatedCloseables.add(snapshot);
+				return snapshot;
+			}
+		}).when(keyedStateBackend.db).getSnapshot();
+
+		doAnswer(new Answer<Object>() {
+
+			@Override
+			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				ColumnFamilyHandle snapshot = spy((ColumnFamilyHandle) invocationOnMock.callRealMethod());
+				allCreatedCloseables.add(snapshot);
+				return snapshot;
+			}
+		}).when(keyedStateBackend.db).createColumnFamily(any(ColumnFamilyDescriptor.class));
+
+		for (int i = 0; i < 100; ++i) {
+			keyedStateBackend.setCurrentKey(i);
+			testState1.update(4200 + i);
+			testState2.update("S-" + (4200 + i));
+		}
+	}
+
+	@Test
+	public void testRunningSnapshotAfterBackendClosed() throws Exception {
+		setupRocksKeyedStateBackend();
+		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+
+		RocksDB spyDB = keyedStateBackend.db;
+
+		verify(spyDB, times(1)).getSnapshot();
+		verify(spyDB, times(0)).releaseSnapshot(any(Snapshot.class));
+
+		this.keyedStateBackend.dispose();
+		verify(spyDB, times(1)).close();
+		assertEquals(null, keyedStateBackend.db);
+
+		//Ensure every RocksObjects not closed yet
+		for (RocksObject rocksCloseable : allCreatedCloseables) {
+			verify(rocksCloseable, times(0)).close();
+		}
+
+		Thread asyncSnapshotThread = new Thread(snapshot);
+		asyncSnapshotThread.start();
+		try {
+			snapshot.get();
+			fail();
+		} catch (Exception ignored) {
+
+		}
+
+		asyncSnapshotThread.join();
+
+		//Ensure every RocksObject was closed exactly once
+		for (RocksObject rocksCloseable : allCreatedCloseables) {
+			verify(rocksCloseable, times(1)).close();
+		}
+
+	}
+
+	@Test
+	public void testReleasingSnapshotAfterBackendClosed() throws Exception {
+		setupRocksKeyedStateBackend();
+		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+
+		RocksDB spyDB = keyedStateBackend.db;
+
+		verify(spyDB, times(1)).getSnapshot();
+		verify(spyDB, times(0)).releaseSnapshot(any(Snapshot.class));
+
+		this.keyedStateBackend.dispose();
+		verify(spyDB, times(1)).close();
+		assertEquals(null, keyedStateBackend.db);
+
+		//Ensure every RocksObjects not closed yet
+		for (RocksObject rocksCloseable : allCreatedCloseables) {
+			verify(rocksCloseable, times(0)).close();
+		}
+
+		snapshot.cancel(true);
+
+		//Ensure every RocksObjects was closed exactly once
+		for (RocksObject rocksCloseable : allCreatedCloseables) {
+			verify(rocksCloseable, times(1)).close();
+		}
+
+	}
+
+	@Test
+	public void testDismissingSnapshot() throws Exception {
+		setupRocksKeyedStateBackend();
+		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+		snapshot.cancel(true);
+		verifyRocksObjectsReleased();
+	}
+
+	@Test
+	public void testDismissingSnapshotNotRunnable() throws Exception {
+		setupRocksKeyedStateBackend();
+		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+		snapshot.cancel(true);
+		Thread asyncSnapshotThread = new Thread(snapshot);
+		asyncSnapshotThread.start();
+		try {
+			snapshot.get();
+			fail();
+		} catch (Exception ignored) {
+
+		}
+		asyncSnapshotThread.join();
+		verifyRocksObjectsReleased();
+	}
+
+	@Test
+	public void testCompletingSnapshot() throws Exception {
+		setupRocksKeyedStateBackend();
+		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+		Thread asyncSnapshotThread = new Thread(snapshot);
+		asyncSnapshotThread.start();
+		waiter.await(); // wait for snapshot to run
+		waiter.reset();
+		runStateUpdates();
+		blocker.trigger(); // allow checkpointing to start writing
+		waiter.await(); // wait for snapshot stream writing to run
+		KeyGroupsStateHandle keyGroupsStateHandle = snapshot.get();
+		assertNotNull(keyGroupsStateHandle);
+		assertTrue(keyGroupsStateHandle.getStateSize() > 0);
+		assertEquals(2, keyGroupsStateHandle.getNumberOfKeyGroups());
+		assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
+		asyncSnapshotThread.join();
+		verifyRocksObjectsReleased();
+	}
+
+	@Test
+	public void testCancelRunningSnapshot() throws Exception {
+		setupRocksKeyedStateBackend();
+		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+		Thread asyncSnapshotThread = new Thread(snapshot);
+		asyncSnapshotThread.start();
+		waiter.await(); // wait for snapshot to run
+		waiter.reset();
+		runStateUpdates();
+		blocker.trigger(); // allow checkpointing to start writing
+		snapshot.cancel(true);
+		assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
+		waiter.await(); // wait for snapshot stream writing to run
+		try {
+			snapshot.get();
+			fail();
+		} catch (Exception ignored) {
+		}
+
+		verifyRocksObjectsReleased();
+		asyncSnapshotThread.join();
+	}
+
+	private void runStateUpdates() throws Exception{
+		for (int i = 50; i < 150; ++i) {
+			if (i % 10 == 0) {
+				Thread.sleep(1);
+			}
+			keyedStateBackend.setCurrentKey(i);
+			testState1.update(4200 + i);
+			testState2.update("S-" + (4200 + i));
+		}
+	}
+
+	private void verifyRocksObjectsReleased() {
+		//Ensure every RocksObject was closed exactly once
+		for (RocksObject rocksCloseable : allCreatedCloseables) {
+			verify(rocksCloseable, times(1)).close();
+		}
+
+		assertNotNull(null, keyedStateBackend.db);
+		RocksDB spyDB = keyedStateBackend.db;
+
+		verify(spyDB, times(1)).getSnapshot();
+		verify(spyDB, times(1)).releaseSnapshot(any(Snapshot.class));
+
+		keyedStateBackend.dispose();
+		verify(spyDB, times(1)).close();
+		assertEquals(null, keyedStateBackend.db);
+	}
+
+	static class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {
+
+		private final int maxSize;
+		private int afterNumberInvocations;
+		private OneShotLatch blocker;
+		private OneShotLatch waiter;
+
+		MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream;
+
+		public MemCheckpointStreamFactory.MemoryCheckpointOutputStream getLastCreatedStream() {
+			return lastCreatedStream;
+		}
+
+		public BlockerCheckpointStreamFactory(int maxSize) {
+			this.maxSize = maxSize;
+		}
+
+		public void setAfterNumberInvocations(int afterNumberInvocations) {
+			this.afterNumberInvocations = afterNumberInvocations;
+		}
+
+		public void setBlockerLatch(OneShotLatch latch) {
+			this.blocker = latch;
+		}
+
+		public void setWaiterLatch(OneShotLatch latch) {
+			this.waiter = latch;
+		}
+
+		@Override
+		public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
+			waiter.trigger();
+			this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) {
+
+				private int afterNInvocations = afterNumberInvocations;
+				private final OneShotLatch streamBlocker = blocker;
+				private final OneShotLatch streamWaiter = waiter;
+
+				@Override
+				public void write(int b) throws IOException {
+
+					if (afterNInvocations > 0) {
+						--afterNInvocations;
+					}
+
+					if (0 == afterNInvocations && null != streamBlocker) {
+						try {
+							streamBlocker.await();
+						} catch (InterruptedException ignored) {
+						}
+					}
+					try {
+						super.write(b);
+					} catch (IOException ex) {
+						if (null != streamWaiter) {
+							streamWaiter.trigger();
+						}
+						throw ex;
+					}
+
+					if (0 == afterNInvocations && null != streamWaiter) {
+						streamWaiter.trigger();
+					}
+				}
+
+				@Override
+				public void close() {
+					super.close();
+					if (null != streamWaiter) {
+						streamWaiter.trigger();
+					}
+				}
+			};
+
+			return lastCreatedStream;
+		}
+
+		@Override
+		public void close() throws Exception {
+
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
index 989e868..1968d40 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
@@ -131,7 +131,7 @@ public abstract class AbstractAsyncIOCallable<V, D extends Closeable> implements
 	 * it finished or was stopped.
 	 */
 	@Override
-	public void done() {
+	public void done(boolean canceled) {
 		//optional callback hook
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java
index 13d9057..dcc5525 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java
@@ -25,7 +25,9 @@ public interface AsyncDoneCallback {
 
 	/**
 	 * the callback
+	 *
+	 * @param canceled true if the callback is done, but was canceled
 	 */
-	void done();
+	void done(boolean canceled);
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java
index 8316e4f..1ca109c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java
@@ -36,17 +36,13 @@ public class AsyncStoppableTaskWithCallback<V> extends FutureTask<V> {
 
 	@Override
 	public boolean cancel(boolean mayInterruptIfRunning) {
-		
-		if (mayInterruptIfRunning) {
-			stoppableCallbackCallable.stop();
-		}
-
+		stoppableCallbackCallable.stop();
 		return super.cancel(mayInterruptIfRunning);
 	}
 
 	@Override
 	protected void done() {
-		stoppableCallbackCallable.done();
+		stoppableCallbackCallable.done(isCancelled());
 	}
 
 	public static <V> AsyncStoppableTaskWithCallback<V> from(StoppableCallbackCallable<V> callable) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
index 30de638..9b2b46f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
@@ -144,7 +144,7 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
 				return bytes;
 			}
 			else {
-				throw new IllegalStateException("stream has already been closed");
+				throw new IOException("stream has already been closed");
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
index 52c89f8..4265edc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
@@ -78,4 +78,11 @@ public class OperatorSnapshotResult {
 	public void setOperatorStateRawFuture(RunnableFuture<OperatorStateHandle> operatorStateRawFuture) {
 		this.operatorStateRawFuture = operatorStateRawFuture;
 	}
-}
\ No newline at end of file
+
+	public void cancel() {
+		getKeyedStateManagedFuture().cancel(true);
+		getOperatorStateManagedFuture().cancel(true);
+		getKeyedStateRawFuture().cancel(true);
+		getOperatorStateRawFuture().cancel(true);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 6595901..fac37c2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -942,9 +942,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 		@Override
 		public void close() {
-			//TODO Handle other state futures in case we actually run them. Currently they are just DoneFutures.
-			if (futureKeyedBackendStateHandles != null) {
-				futureKeyedBackendStateHandles.cancel(true);
+			// cleanup/release ongoing snapshot operations
+			for (OperatorSnapshotResult snapshotResult : snapshotInProgressList) {
+				snapshotResult.cancel();
 			}
 		}
 	}
@@ -985,35 +985,55 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 			startSyncPartNano = System.nanoTime();
 
-			for (StreamOperator<?> op : allOperators) {
+			boolean failed = true;
+			try {
 
-				createStreamFactory(op);
-				snapshotNonPartitionableState(op);
+				for (StreamOperator<?> op : allOperators) {
 
-				OperatorSnapshotResult snapshotInProgress =
-						op.snapshotState(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), streamFactory);
+					createStreamFactory(op);
+					snapshotNonPartitionableState(op);
 
-				snapshotInProgressList.add(snapshotInProgress);
-			}
+					OperatorSnapshotResult snapshotInProgress =
+							op.snapshotState(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), streamFactory);
 
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
-						checkpointMetaData.getCheckpointId(), owner.getName());
-			}
+					snapshotInProgressList.add(snapshotInProgress);
+				}
+
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
+							checkpointMetaData.getCheckpointId(), owner.getName());
+				}
+
+				startAsyncPartNano = System.nanoTime();
 
-			startAsyncPartNano= System.nanoTime();
+				checkpointMetaData.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);
 
-			checkpointMetaData.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);
+				// at this point we are transferring ownership over snapshotInProgressList for cleanup to the thread
+				runAsyncCheckpointingAndAcknowledge();
+				failed = false;
 
-			runAsyncCheckpointingAndAcknowledge();
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("{} - finished synchronous part of checkpoint {}." +
+									"Alignment duration: {} ms, snapshot duration {} ms",
+							owner.getName(), checkpointMetaData.getCheckpointId(),
+							checkpointMetaData.getAlignmentDurationNanos() / 1_000_000,
+							checkpointMetaData.getSyncDurationMillis());
+				}
+			} finally {
+				if (failed) {
+					// Cleanup to release resources
+					for (OperatorSnapshotResult operatorSnapshotResult : snapshotInProgressList) {
+						operatorSnapshotResult.cancel();
+					}
 
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("{} - finished synchronous part of checkpoint {}." +
-								"Alignment duration: {} ms, snapshot duration {} ms",
-						owner.getName(), checkpointMetaData.getCheckpointId(),
-						checkpointMetaData.getAlignmentDurationNanos() / 1_000_000,
-						checkpointMetaData.getSyncDurationMillis());
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("{} - did NOT finish synchronous part of checkpoint {}." +
+										"Alignment duration: {} ms, snapshot duration {} ms",
+								owner.getName(), checkpointMetaData.getCheckpointId());
+					}
+				}
 			}
+
 		}
 
 		private void createStreamFactory(StreamOperator<?> operator) throws IOException {
@@ -1051,6 +1071,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		}
 
 		public void runAsyncCheckpointingAndAcknowledge() throws IOException {
+
 			AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
 					owner,
 					nonPartitionedStates,

http://git-wip-us.apache.org/repos/asf/flink/blob/e4f802dd/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
index 0418bf5..b3d86e5 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
@@ -107,4 +107,13 @@ public final class OneShotLatch {
 	public boolean isTriggered() {
 		return triggered;
 	}
+
+	/**
+	 * resets the latch to triggered = false
+	 */
+	public void reset() {
+		synchronized (lock) {
+			triggered = false;
+		}
+	}
 }