You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/09/25 15:47:51 UTC

[2/2] flink git commit: [FLINK-7619] Improved abstraction of AbstractAsyncIOCallable to better fit the current usage pattern.

[FLINK-7619] Improved abstraction of AbstractAsyncIOCallable to better fit the current usage pattern.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5af463a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5af463a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5af463a9

Branch: refs/heads/master
Commit: 5af463a9c0ff62603bc342a78dfd5483d834e8a7
Parents: 0073204
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Sep 7 11:24:12 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Mon Sep 25 16:04:15 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 198 +++++++++++--------
 .../apache/flink/core/fs/CloseableRegistry.java |   4 +-
 .../flink/core/fs/ClosingFSDataInputStream.java |   4 +-
 .../core/fs/ClosingFSDataOutputStream.java      |   4 +-
 .../core/fs/SafetyNetCloseableRegistry.java     |   8 +-
 .../flink/util/AbstractCloseableRegistry.java   |  11 +-
 .../core/fs/AbstractCloseableRegistryTest.java  |  11 +-
 .../flink/core/fs/CloseableRegistryTest.java    |   2 +-
 .../AbstractAsyncSnapshotIOCallable.java        | 109 ----------
 .../AbstractAsyncCallableWithResources.java     | 194 ++++++++++++++++++
 .../io/async/AbstractAsyncIOCallable.java       | 157 ---------------
 .../flink/runtime/io/async/AsyncStoppable.java  |   4 +-
 .../state/DefaultOperatorStateBackend.java      |  66 +++++--
 .../state/StateInitializationContextImpl.java   |  17 +-
 .../StateSnapshotContextSynchronousImpl.java    |  25 +--
 .../state/heap/HeapKeyedStateBackend.java       |  85 +++++---
 .../streaming/runtime/tasks/StreamTask.java     |   8 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |   2 +-
 .../util/AbstractStreamOperatorTestHarness.java |   2 +-
 ...tractEventTimeWindowCheckpointingITCase.java |   3 +-
 20 files changed, 468 insertions(+), 446 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index dd5b852..a1500c7 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -43,7 +43,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
+import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
 import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -384,8 +384,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		final CheckpointStreamFactory streamFactory) throws Exception {
 
 		long startTime = System.currentTimeMillis();
+		final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry();
+
+		final RocksDBFullSnapshotOperation<K> snapshotOperation;
 
-		final RocksDBFullSnapshotOperation<K> snapshotOperation = new RocksDBFullSnapshotOperation<>(this, streamFactory);
 		// hold the db lock while operation on the db to guard us against async db disposal
 		synchronized (asyncSnapshotLock) {
 
@@ -399,6 +401,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					return DoneFuture.nullValue();
 				}
 
+				snapshotOperation =
+					new RocksDBFullSnapshotOperation<>(this, streamFactory, snapshotCloseableRegistry);
+
 				snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
 			} else {
 				throw new IOException("RocksDB closed.");
@@ -406,30 +411,55 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 
 		// implementation of the async IO operation, based on FutureTask
-		AbstractAsyncIOCallable<KeyedStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable =
-			new AbstractAsyncIOCallable<KeyedStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream>() {
+		AbstractAsyncCallableWithResources<KeyedStateHandle> ioCallable =
+			new AbstractAsyncCallableWithResources<KeyedStateHandle>() {
 
 				@Override
-				public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception {
+				protected void acquireResources() throws Exception {
+					cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
 					snapshotOperation.openCheckpointStream();
-					return snapshotOperation.getOutStream();
 				}
 
 				@Override
-				public KeyGroupsStateHandle performOperation() throws Exception {
-					long startTime = System.currentTimeMillis();
+				protected void releaseResources() throws Exception {
+					closeLocalRegistry();
+					releaseSnapshotOperationResources();
+				}
+
+				private void releaseSnapshotOperationResources() {
+					// hold the db lock while operation on the db to guard us against async db disposal
 					synchronized (asyncSnapshotLock) {
+						snapshotOperation.releaseSnapshotResources();
+					}
+				}
+
+				@Override
+				protected void stopOperation() throws Exception {
+					closeLocalRegistry();
+				}
+
+				private void closeLocalRegistry() {
+					if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
 						try {
-							// hold the db lock while operation on the db to guard us against async db disposal
-							if (db == null) {
-								throw new IOException("RocksDB closed.");
-							}
+							snapshotCloseableRegistry.close();
+						} catch (Exception ex) {
+							LOG.warn("Error closing local registry", ex);
+						}
+					}
+				}
 
-							snapshotOperation.writeDBSnapshot();
+				@Override
+				public KeyGroupsStateHandle performOperation() throws Exception {
+					long startTime = System.currentTimeMillis();
 
-						} finally {
-							snapshotOperation.closeCheckpointStream();
+					synchronized (asyncSnapshotLock) {
+						// hold the db lock while operation on the db to guard us against async db disposal
+						if (db == null) {
+							throw new IOException("RocksDB closed.");
 						}
+
+						snapshotOperation.writeDBSnapshot();
+						snapshotOperation.createSnapshotResultStateHandleFromOutputStream();
 					}
 
 					LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.",
@@ -437,18 +467,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 					return snapshotOperation.getSnapshotResultStateHandle();
 				}
-
-				private void releaseSnapshotOperationResources(boolean canceled) {
-					// hold the db lock while operation on the db to guard us against async db disposal
-					synchronized (asyncSnapshotLock) {
-						snapshotOperation.releaseSnapshotResources(canceled);
-					}
-				}
-
-				@Override
-				public void done(boolean canceled) {
-					releaseSnapshotOperationResources(canceled);
-				}
 			};
 
 		LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", synchronous part) in thread " +
@@ -468,6 +486,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private final RocksDBKeyedStateBackend<K> stateBackend;
 		private final KeyGroupRangeOffsets keyGroupRangeOffsets;
 		private final CheckpointStreamFactory checkpointStreamFactory;
+		private final CloseableRegistry snapshotCloseableRegistry;
 
 		private long checkpointId;
 		private long checkpointTimeStamp;
@@ -482,11 +501,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		RocksDBFullSnapshotOperation(
 			RocksDBKeyedStateBackend<K> stateBackend,
-			CheckpointStreamFactory checkpointStreamFactory) {
+			CheckpointStreamFactory checkpointStreamFactory,
+			CloseableRegistry registry) {
 
 			this.stateBackend = stateBackend;
 			this.checkpointStreamFactory = checkpointStreamFactory;
 			this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(stateBackend.keyGroupRange);
+			this.snapshotCloseableRegistry = registry;
 		}
 
 		/**
@@ -510,9 +531,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 */
 		public void openCheckpointStream() throws Exception {
 			Preconditions.checkArgument(outStream == null, "Output stream for snapshot is already set.");
-			outStream = checkpointStreamFactory.
-				createCheckpointStateOutputStream(checkpointId, checkpointTimeStamp);
-			stateBackend.cancelStreamRegistry.registerClosable(outStream);
+			outStream = checkpointStreamFactory.createCheckpointStateOutputStream(checkpointId, checkpointTimeStamp);
+			snapshotCloseableRegistry.registerCloseable(outStream);
 			outputView = new DataOutputViewStreamWrapper(outStream);
 		}
 
@@ -537,18 +557,25 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 *
 		 * @throws IOException
 		 */
-		public void closeCheckpointStream() throws IOException {
-			if (outStream != null) {
-				snapshotResultStateHandle = closeSnapshotStreamAndGetHandle();
-			} else {
-				snapshotResultStateHandle = null;
+		public void createSnapshotResultStateHandleFromOutputStream() throws IOException {
+
+			if (snapshotCloseableRegistry.unregisterCloseable(outStream)) {
+
+				StreamStateHandle stateHandle = outStream.closeAndGetHandle();
+				outStream = null;
+
+				if (stateHandle != null) {
+					this.snapshotResultStateHandle = new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
+				}
 			}
 		}
 
 		/**
 		 * 5) Release the snapshot object for RocksDB and clean up.
 		 */
-		public void releaseSnapshotResources(boolean canceled) {
+		public void releaseSnapshotResources() {
+
+			outStream = null;
 
 			if (null != kvStateIterators) {
 				for (Tuple2<RocksIterator, Integer> kvStateIterator : kvStateIterators) {
@@ -569,12 +596,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				IOUtils.closeQuietly(readOptions);
 				readOptions = null;
 			}
+		}
 
-			if (canceled) {
+		/**
+		 * Drop the created snapshot if we have ben cancelled.
+		 */
+		public void dropSnapshotResult() {
+			if (null != snapshotResultStateHandle) {
 				try {
-					if (null != snapshotResultStateHandle) {
-						snapshotResultStateHandle.discardState();
-					}
+					snapshotResultStateHandle.discardState();
 				} catch (Exception e) {
 					LOG.warn("Exception occurred during snapshot state handle cleanup.", e);
 				}
@@ -719,13 +749,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			}
 		}
 
-		private KeyGroupsStateHandle closeSnapshotStreamAndGetHandle() throws IOException {
-			stateBackend.cancelStreamRegistry.unregisterClosable(outStream);
-			StreamStateHandle stateHandle = outStream.closeAndGetHandle();
-			outStream = null;
-			return stateHandle != null ? new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle) : null;
-		}
-
 		private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException {
 			BytePrimitiveArraySerializer.INSTANCE.serialize(key, out);
 			BytePrimitiveArraySerializer.INSTANCE.serialize(value, out);
@@ -805,11 +828,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 				FileSystem backupFileSystem = backupPath.getFileSystem();
 				inputStream = backupFileSystem.open(filePath);
-				closeableRegistry.registerClosable(inputStream);
+				closeableRegistry.registerCloseable(inputStream);
 
 				outputStream = checkpointStreamFactory
 					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
-				closeableRegistry.registerClosable(outputStream);
+				closeableRegistry.registerCloseable(outputStream);
 
 				while (true) {
 					int numBytes = inputStream.read(buffer);
@@ -821,19 +844,19 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					outputStream.write(buffer, 0, numBytes);
 				}
 
-				closeableRegistry.unregisterClosable(outputStream);
-				StreamStateHandle result = outputStream.closeAndGetHandle();
-				outputStream = null;
-
+				StreamStateHandle result = null;
+				if (closeableRegistry.unregisterCloseable(outputStream)) {
+					result = outputStream.closeAndGetHandle();
+					outputStream = null;
+				}
 				return result;
+
 			} finally {
-				if (inputStream != null) {
-					closeableRegistry.unregisterClosable(inputStream);
+				if (inputStream != null && closeableRegistry.unregisterCloseable(inputStream)) {
 					inputStream.close();
 				}
 
-				if (outputStream != null) {
-					closeableRegistry.unregisterClosable(outputStream);
+				if (outputStream != null && closeableRegistry.unregisterCloseable(outputStream)) {
 					outputStream.close();
 				}
 			}
@@ -845,7 +868,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			try {
 				outputStream = checkpointStreamFactory
 					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
-				closeableRegistry.registerClosable(outputStream);
+				closeableRegistry.registerCloseable(outputStream);
 
 				//no need for compression scheme support because sst-files are already compressed
 				KeyedBackendSerializationProxy<K> serializationProxy =
@@ -858,15 +881,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 				serializationProxy.write(out);
 
-				closeableRegistry.unregisterClosable(outputStream);
-				StreamStateHandle result = outputStream.closeAndGetHandle();
-				outputStream = null;
-
+				StreamStateHandle result = null;
+				if (closeableRegistry.unregisterCloseable(outputStream)) {
+					result = outputStream.closeAndGetHandle();
+					outputStream = null;
+				}
 				return result;
 			} finally {
 				if (outputStream != null) {
-					closeableRegistry.unregisterClosable(outputStream);
-					outputStream.close();
+					if (closeableRegistry.unregisterCloseable(outputStream)) {
+						outputStream.close();
+					}
 				}
 			}
 		}
@@ -905,7 +930,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		KeyedStateHandle materializeSnapshot() throws Exception {
 
-			stateBackend.cancelStreamRegistry.registerClosable(closeableRegistry);
+			stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry);
 
 			// write meta data
 			metaStateHandle = materializeMetaData();
@@ -954,15 +979,25 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 
 		void stop() {
-			try {
-				closeableRegistry.close();
-			} catch (IOException e) {
-				LOG.warn("Could not properly close io streams.", e);
+
+			if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
+				try {
+					closeableRegistry.close();
+				} catch (IOException e) {
+					LOG.warn("Could not properly close io streams.", e);
+				}
 			}
 		}
 
 		void releaseResources(boolean canceled) {
-			stateBackend.cancelStreamRegistry.unregisterClosable(closeableRegistry);
+
+			if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
+				try {
+					closeableRegistry.close();
+				} catch (IOException e) {
+					LOG.warn("Exception on closing registry.", e);
+				}
+			}
 
 			if (backupPath != null) {
 				try {
@@ -1128,13 +1163,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			throws IOException, StateMigrationException, RocksDBException {
 			try {
 				currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream();
-				rocksDBKeyedStateBackend.cancelStreamRegistry.registerClosable(currentStateHandleInStream);
+				rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
 				currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream);
 				restoreKVStateMetaData();
 				restoreKVStateData();
 			} finally {
-				if (currentStateHandleInStream != null) {
-					rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterClosable(currentStateHandleInStream);
+				if (currentStateHandleInStream != null
+					&& rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) {
 					IOUtils.closeQuietly(currentStateHandleInStream);
 				}
 			}
@@ -1275,7 +1310,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			try {
 				inputStream = metaStateHandle.openInputStream();
-				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+				stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
 
 				KeyedBackendSerializationProxy<T> serializationProxy =
 					new KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader);
@@ -1298,8 +1333,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 				return serializationProxy.getStateMetaInfoSnapshots();
 			} finally {
-				if (inputStream != null) {
-					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+				if (inputStream != null && stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
 					inputStream.close();
 				}
 			}
@@ -1316,10 +1350,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			try {
 				inputStream = remoteFileHandle.openInputStream();
-				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+				stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
 
 				outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
-				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+				stateBackend.cancelStreamRegistry.registerCloseable(outputStream);
 
 				byte[] buffer = new byte[8 * 1024];
 				while (true) {
@@ -1331,13 +1365,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					outputStream.write(buffer, 0, numBytes);
 				}
 			} finally {
-				if (inputStream != null) {
-					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+				if (inputStream != null && stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
 					inputStream.close();
 				}
 
-				if (outputStream != null) {
-					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+				if (outputStream != null && stateBackend.cancelStreamRegistry.unregisterCloseable(outputStream)) {
 					outputStream.close();
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
index 29f363c..87d33d2 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
@@ -49,7 +49,7 @@ public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Obje
 	}
 
 	@Override
-	protected void doUnRegister(@Nonnull Closeable closeable, @Nonnull Map<Closeable, Object> closeableMap) {
-		closeableMap.remove(closeable);
+	protected boolean doUnRegister(@Nonnull Closeable closeable, @Nonnull Map<Closeable, Object> closeableMap) {
+		return closeableMap.remove(closeable) != null;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
index 7c97271..173a890 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
@@ -55,7 +55,7 @@ public class ClosingFSDataInputStream
 	public void close() throws IOException {
 		if (!closed) {
 			closed = true;
-			registry.unregisterClosable(this);
+			registry.unregisterCloseable(this);
 			inputStream.close();
 		}
 	}
@@ -93,7 +93,7 @@ public class ClosingFSDataInputStream
 			FSDataInputStream delegate, SafetyNetCloseableRegistry registry, String debugInfo) throws IOException{
 
 		ClosingFSDataInputStream inputStream = new ClosingFSDataInputStream(delegate, registry, debugInfo);
-		registry.registerClosable(inputStream);
+		registry.registerCloseable(inputStream);
 		return inputStream;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
index c517a83..cb7de92 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
@@ -60,7 +60,7 @@ public class ClosingFSDataOutputStream
 	public void close() throws IOException {
 		if (!closed) {
 			closed = true;
-			registry.unregisterClosable(this);
+			registry.unregisterCloseable(this);
 			outputStream.close();
 		}
 	}
@@ -98,7 +98,7 @@ public class ClosingFSDataOutputStream
 			FSDataOutputStream delegate, SafetyNetCloseableRegistry registry, String debugInfo) throws IOException {
 
 		ClosingFSDataOutputStream inputStream = new ClosingFSDataOutputStream(delegate, registry, debugInfo);
-		registry.registerClosable(inputStream);
+		registry.registerCloseable(inputStream);
 		return inputStream;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
index 6097334..9c4272f 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
@@ -98,7 +98,7 @@ public class SafetyNetCloseableRegistry extends
 	}
 
 	@Override
-	protected void doUnRegister(
+	protected boolean doUnRegister(
 		@Nonnull WrappingProxyCloseable<? extends Closeable> closeable,
 		@Nonnull Map<Closeable, PhantomDelegatingCloseableRef> closeableMap) {
 
@@ -106,11 +106,7 @@ public class SafetyNetCloseableRegistry extends
 
 		Closeable innerCloseable = WrappingProxyUtil.stripProxy(closeable.getWrappedDelegate());
 
-		if (null == innerCloseable) {
-			return;
-		}
-
-		closeableMap.remove(innerCloseable);
+		return null != innerCloseable && closeableMap.remove(innerCloseable) != null;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
index 4527b5e..14e765c 100644
--- a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -68,7 +68,7 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
 	 * @param closeable Closeable tor register
 	 * @throws IOException exception when the registry was closed before
 	 */
-	public final void registerClosable(C closeable) throws IOException {
+	public final void registerCloseable(C closeable) throws IOException {
 
 		if (null == closeable) {
 			return;
@@ -89,15 +89,16 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
 	 * Removes a {@link Closeable} from the registry.
 	 *
 	 * @param closeable instance to remove from the registry.
+	 * @return true if the closeable was previously registered and became unregistered through this call.
 	 */
-	public final void unregisterClosable(C closeable) {
+	public final boolean unregisterCloseable(C closeable) {
 
 		if (null == closeable) {
-			return;
+			return false;
 		}
 
 		synchronized (getSynchronizationLock()) {
-			doUnRegister(closeable, closeableToRef);
+			return doUnRegister(closeable, closeableToRef);
 		}
 	}
 
@@ -137,7 +138,7 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
 	 * Does the actual un-registration of the closeable from the registry map. This should not do any long running or
 	 * potentially blocking operations as is is executed under the registry's lock.
 	 */
-	protected abstract void doUnRegister(@Nonnull C closeable, @Nonnull Map<Closeable, T> closeableMap);
+	protected abstract boolean doUnRegister(@Nonnull C closeable, @Nonnull Map<Closeable, T> closeableMap);
 
 	/**
 	 * Returns the lock on which manipulations to members closeableToRef and closeable must be synchronized.

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
index 41b69c8..f9425f3 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
@@ -91,7 +91,7 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
 
 		try {
 
-			closeableRegistry.registerClosable(testCloseable);
+			closeableRegistry.registerCloseable(testCloseable);
 
 			Assert.fail("Closed registry should not accept closeables!");
 
@@ -120,7 +120,7 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
 			return null;
 		}).when(spyCloseable).close();
 
-		closeableRegistry.registerClosable(spyCloseable);
+		closeableRegistry.registerCloseable(spyCloseable);
 
 		Assert.assertEquals(1, closeableRegistry.getNumberOfRegisteredCloseables());
 
@@ -138,7 +138,7 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
 		final C testCloseable = spy(createCloseable());
 
 		try {
-			closeableRegistry.registerClosable(testCloseable);
+			closeableRegistry.registerCloseable(testCloseable);
 			Assert.fail("Closed registry should not accept closeables!");
 		}catch (IOException ignore) {
 		}
@@ -214,10 +214,7 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
 
 		@Override
 		public synchronized void close() throws IOException {
-			if (refCount != null) {
-				refCount.decrementAndGet();
-				refCount = null;
-			}
+			refCount.decrementAndGet();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
index eb8d1f4..c3bf6e6 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
@@ -52,7 +52,7 @@ public class CloseableRegistryTest extends AbstractCloseableRegistryTest<Closeab
 			@Override
 			protected void createAndRegisterStream() throws IOException {
 				TestStream testStream = new TestStream(unclosedCounter);
-				registry.registerClosable(testStream);
+				registry.registerCloseable(testStream);
 			}
 		};
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java
deleted file mode 100644
index 1aaa473..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.StateObject;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Abstract base class for async IO operations of snapshots against a
- * {@link java.util.zip.CheckedOutputStream}. This includes participating in lifecycle management
- * through a {@link CloseableRegistry}.
- */
-public abstract class AbstractAsyncSnapshotIOCallable<H extends StateObject>
-	extends AbstractAsyncIOCallable<H, CheckpointStreamFactory.CheckpointStateOutputStream> {
-
-	protected final  long checkpointId;
-	protected final  long timestamp;
-
-	protected final CheckpointStreamFactory streamFactory;
-	protected final CloseableRegistry closeStreamOnCancelRegistry;
-	protected final AtomicBoolean open;
-
-	public AbstractAsyncSnapshotIOCallable(
-		long checkpointId,
-		long timestamp,
-		CheckpointStreamFactory streamFactory,
-		CloseableRegistry closeStreamOnCancelRegistry) {
-
-		this.streamFactory = Preconditions.checkNotNull(streamFactory);
-		this.closeStreamOnCancelRegistry = Preconditions.checkNotNull(closeStreamOnCancelRegistry);
-		this.checkpointId = checkpointId;
-		this.timestamp = timestamp;
-		this.open = new AtomicBoolean(false);
-	}
-
-	@Override
-	public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception {
-		if (checkStreamClosedAndDoTransitionToOpen()) {
-			CheckpointStreamFactory.CheckpointStateOutputStream stream =
-				streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
-			try {
-				closeStreamOnCancelRegistry.registerClosable(stream);
-				return stream;
-			} catch (Exception ex) {
-				open.set(false);
-				throw ex;
-			}
-		} else {
-			throw new IOException("Async snapshot: a checkpoint stream was already opened.");
-		}
-	}
-
-	@Override
-	public void done(boolean canceled) {
-		if (checkStreamOpenAndDoTransitionToClose()) {
-			CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
-			if (stream != null) {
-				closeStreamOnCancelRegistry.unregisterClosable(stream);
-				IOUtils.closeQuietly(stream);
-			}
-		}
-	}
-
-	protected boolean checkStreamClosedAndDoTransitionToOpen() {
-		return open.compareAndSet(false, true);
-	}
-
-	protected boolean checkStreamOpenAndDoTransitionToClose() {
-		return open.compareAndSet(true, false);
-	}
-
-	protected StreamStateHandle closeStreamAndGetStateHandle() throws IOException {
-		if (checkStreamOpenAndDoTransitionToClose()) {
-			final CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
-			try {
-				return stream.closeAndGetHandle();
-			} finally {
-				closeStreamOnCancelRegistry.unregisterClosable(stream);
-			}
-		} else {
-			throw new IOException("Checkpoint stream already closed.");
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncCallableWithResources.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncCallableWithResources.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncCallableWithResources.java
new file mode 100644
index 0000000..bc0116c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncCallableWithResources.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.async;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.IOException;
+
+/**
+ * This abstract class encapsulates the lifecycle and execution strategy for asynchronous operations that use resources.
+ *
+ * @param <V> return type of the asynchronous call.
+ */
+public abstract class AbstractAsyncCallableWithResources<V> implements StoppableCallbackCallable<V> {
+
+	/** Tracks if the stop method was called on this object. */
+	private volatile boolean stopped;
+
+	/** Tracks if call method was executed (only before stop calls). */
+	private volatile boolean called;
+
+	/** Stores a collected exception if there was one during stop. */
+	private volatile Exception stopException;
+
+	public AbstractAsyncCallableWithResources() {
+		this.stopped = false;
+		this.called = false;
+	}
+
+	/**
+	 * This method implements the strategy for the actual IO operation:
+	 * <p>
+	 * 1) Acquire resources asynchronously and atomically w.r.t stopping.
+	 * 2) Performs the operation
+	 * 3) Releases resources.
+	 *
+	 * @return Result of the IO operation, e.g. a deserialized object.
+	 * @throws Exception exception that happened during the call.
+	 */
+	@Override
+	public final V call() throws Exception {
+
+		V result = null;
+		Exception collectedException = null;
+
+		try {
+			synchronized (this) {
+
+				if (stopped) {
+					throw new IOException("Task was already stopped.");
+				}
+
+				called = true;
+				// Get resources in async part, atomically w.r.t. stopping.
+				acquireResources();
+			}
+
+			// The main work is performed here.
+			result = performOperation();
+
+		} catch (Exception ex) {
+			collectedException = ex;
+		} finally {
+
+			try {
+				// Cleanup
+				releaseResources();
+			} catch (Exception relEx) {
+				collectedException = ExceptionUtils.firstOrSuppressed(relEx, collectedException);
+			}
+
+			if (collectedException != null) {
+				throw collectedException;
+			}
+		}
+
+		return result;
+	}
+
+	/**
+	 * Open the IO Handle (e.g. a stream) on which the operation will be performed.
+	 *
+	 * @return the opened IO handle that implements #Closeable
+	 * @throws Exception if there was a problem in acquiring.
+	 */
+	protected abstract void acquireResources() throws Exception;
+
+	/**
+	 * Implements the actual operation.
+	 *
+	 * @return Result of the operation
+	 * @throws Exception if there was a problem in executing the operation.
+	 */
+	protected abstract V performOperation() throws Exception;
+
+	/**
+	 * Releases resources acquired by this object.
+	 *
+	 * @throws Exception if there was a problem in releasing resources.
+	 */
+	protected abstract void releaseResources() throws Exception;
+
+	/**
+	 * This method implements how the operation is stopped. Usually this involves interrupting or closing some
+	 * resources like streams to return from blocking calls.
+	 *
+	 * @throws Exception on problems during the stopping.
+	 */
+	protected abstract void stopOperation() throws Exception;
+
+	/**
+	 * Stops the I/O operation by closing the I/O handle. If an exception is thrown on close, it can be accessed via
+	 * #getStopException().
+	 */
+	@Override
+	public final void stop() {
+
+		synchronized (this) {
+
+			// Make sure that call can not enter execution from here.
+			if (stopped) {
+				return;
+			} else {
+				stopped = true;
+			}
+		}
+
+		if (called) {
+			// Async call is executing -> attempt to stop it and releaseResources() will happen inside the async method.
+			try {
+				stopOperation();
+			} catch (Exception stpEx) {
+				this.stopException = stpEx;
+			}
+		} else {
+			// Async call was not executed, so we also need to releaseResources() here.
+			try {
+				releaseResources();
+			} catch (Exception relEx) {
+				stopException = relEx;
+			}
+		}
+	}
+
+	/**
+	 * Optional callback that subclasses can implement. This is called when the callable method completed, e.g. because
+	 * it finished or was stopped.
+	 */
+	@Override
+	public void done(boolean canceled) {
+		//optional callback hook
+	}
+
+	/**
+	 * True once the async method was called.
+	 */
+	public boolean isCalled() {
+		return called;
+	}
+
+	/**
+	 * Check if the IO operation is stopped
+	 *
+	 * @return true if stop() was called
+	 */
+	@Override
+	public boolean isStopped() {
+		return stopped;
+	}
+
+	/**
+	 * Returns a potential exception that might have been observed while stopping the operation.
+	 */
+	@Override
+	public Exception getStopException() {
+		return stopException;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
deleted file mode 100644
index 1968d40..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.async;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * The abstract class encapsulates the lifecycle and execution strategy for asynchronous IO operations
- *
- * @param <V> return type of the asynchronous call
- * @param <D> type of the IO handle
- */
-public abstract class AbstractAsyncIOCallable<V, D extends Closeable> implements StoppableCallbackCallable<V> {
-
-	private volatile boolean stopped;
-
-	/**
-	 * Closable handle to IO, e.g. an InputStream
-	 */
-	private volatile D ioHandle;
-
-	/**
-	 * Stores exception that might happen during close
-	 */
-	private volatile IOException stopException;
-
-
-	public AbstractAsyncIOCallable() {
-		this.stopped = false;
-	}
-
-	/**
-	 * This method implements the strategy for the actual IO operation:
-	 *
-	 * 1) Open the IO handle
-	 * 2) Perform IO operation
-	 * 3) Close IO handle
-	 *
-	 * @return Result of the IO operation, e.g. a deserialized object.
-	 * @throws Exception exception that happened during the call.
-	 */
-	@Override
-	public V call() throws Exception {
-
-		synchronized (this) {
-			if (isStopped()) {
-				throw new IOException("Task was already stopped. No I/O handle opened.");
-			}
-
-			ioHandle = openIOHandle();
-		}
-
-		try {
-
-			return performOperation();
-
-		} finally {
-			closeIOHandle();
-		}
-
-	}
-
-	/**
-	 * Open the IO Handle (e.g. a stream) on which the operation will be performed.
-	 *
-	 * @return the opened IO handle that implements #Closeable
-	 * @throws Exception
-	 */
-	protected abstract D openIOHandle() throws Exception;
-
-	/**
-	 * Implements the actual IO operation on the opened IO handle.
-	 *
-	 * @return Result of the IO operation
-	 * @throws Exception
-	 */
-	protected abstract V performOperation() throws Exception;
-
-	/**
-	 * Stops the I/O operation by closing the I/O handle. If an exception is thrown on close, it can be accessed via
-	 * #getStopException().
-	 */
-	@Override
-	public void stop() {
-		closeIOHandle();
-	}
-
-	private synchronized void closeIOHandle() {
-
-		if (!stopped) {
-			stopped = true;
-
-			final D handle = ioHandle;
-			if (handle != null) {
-				try {
-					handle.close();
-				} catch (IOException ex) {
-					stopException = ex;
-				}
-			}
-		}
-	}
-
-	/**
-	 * Returns the IO handle.
-	 * @return the IO handle
-	 */
-	protected D getIoHandle() {
-		return ioHandle;
-	}
-
-	/**
-	 * Optional callback that subclasses can implement. This is called when the callable method completed, e.g. because
-	 * it finished or was stopped.
-	 */
-	@Override
-	public void done(boolean canceled) {
-		//optional callback hook
-	}
-
-	/**
-	 * Check if the IO operation is stopped
-	 *
-	 * @return true if stop() was called
-	 */
-	@Override
-	public boolean isStopped() {
-		return stopped;
-	}
-
-	/**
-	 * Returns Exception that might happen on stop.
-	 *
-	 * @return Potential Exception that happened open stopping.
-	 */
-	@Override
-	public IOException getStopException() {
-		return stopException;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java
index 560e56a..8698600 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.io.async;
 
-import java.io.IOException;
-
 /**
  * An asynchronous operation that can be stopped.
  */
@@ -42,6 +40,6 @@ public interface AsyncStoppable {
 	 *
 	 * @return Exception that can happen during stop
 	 */
-	IOException getStopException();
+	Exception getStopException();
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index b16ac06..1fb03d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
@@ -34,11 +33,13 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
 import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
+
+import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -225,17 +226,37 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 		}
 
 		// implementation of the async IO operation, based on FutureTask
-		final AbstractAsyncSnapshotIOCallable<OperatorStateHandle> ioCallable =
-			new AbstractAsyncSnapshotIOCallable<OperatorStateHandle>(
-				checkpointId,
-				timestamp,
-				streamFactory,
-				closeStreamOnCancelRegistry) {
+		final AbstractAsyncCallableWithResources<OperatorStateHandle> ioCallable =
+			new AbstractAsyncCallableWithResources<OperatorStateHandle>() {
+
+				CheckpointStreamFactory.CheckpointStateOutputStream out = null;
+
+				@Override
+				protected void acquireResources() throws Exception {
+					out = streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+					closeStreamOnCancelRegistry.registerCloseable(out);
+				}
+
+				@Override
+				protected void releaseResources() throws Exception {
+					if (closeStreamOnCancelRegistry.unregisterCloseable(out)) {
+						IOUtils.closeQuietly(out);
+					}
+				}
+
+				@Override
+				protected void stopOperation() throws Exception {
+					if (closeStreamOnCancelRegistry.unregisterCloseable(out)) {
+						IOUtils.closeQuietly(out);
+					}
+				}
 
 				@Override
 				public OperatorStateHandle performOperation() throws Exception {
 					long asyncStartTime = System.currentTimeMillis();
 
+					CheckpointStreamFactory.CheckpointStateOutputStream localOut = this.out;
+
 					final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
 						new HashMap<>(registeredStatesDeepCopies.size());
 
@@ -246,8 +267,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 						metaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
 					}
 
-					CheckpointStreamFactory.CheckpointStateOutputStream out = getIoHandle();
-					DataOutputView dov = new DataOutputViewStreamWrapper(out);
+					DataOutputView dov = new DataOutputViewStreamWrapper(localOut);
 
 					OperatorBackendSerializationProxy backendSerializationProxy =
 						new OperatorBackendSerializationProxy(metaInfoSnapshots);
@@ -260,25 +280,30 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 						registeredStatesDeepCopies.entrySet()) {
 
 						PartitionableListState<?> value = entry.getValue();
-						long[] partitionOffsets = value.write(out);
+						long[] partitionOffsets = value.write(localOut);
 						OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
 						writtenStatesMetaData.put(
 							entry.getKey(),
 							new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
 					}
 
-					StreamStateHandle stateHandle = closeStreamAndGetStateHandle();
+					OperatorStateHandle retValue = null;
+
+					if (closeStreamOnCancelRegistry.unregisterCloseable(out)) {
+
+						StreamStateHandle stateHandle = out.closeAndGetHandle();
+
+						if (stateHandle != null) {
+							retValue = new OperatorStateHandle(writtenStatesMetaData, stateHandle);
+						}
+					}
 
 					if (asynchronousSnapshots) {
 						LOG.info("DefaultOperatorStateBackend snapshot ({}, asynchronous part) in thread {} took {} ms.",
 							streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime));
 					}
 
-					if (stateHandle == null) {
-						return null;
-					}
-
-					return new OperatorStateHandle(writtenStatesMetaData, stateHandle);
+					return retValue;
 				}
 			};
 
@@ -308,7 +333,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 			}
 
 			FSDataInputStream in = stateHandle.openInputStream();
-			closeStreamOnCancelRegistry.registerClosable(in);
+			closeStreamOnCancelRegistry.registerCloseable(in);
 
 			ClassLoader restoreClassLoader = Thread.currentThread().getContextClassLoader();
 
@@ -370,8 +395,9 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 
 			} finally {
 				Thread.currentThread().setContextClassLoader(restoreClassLoader);
-				closeStreamOnCancelRegistry.unregisterClosable(in);
-				IOUtils.closeQuietly(in);
+				if (closeStreamOnCancelRegistry.unregisterCloseable(in)) {
+					IOUtils.closeQuietly(in);
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
index 031d7c7..750d206 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
@@ -254,9 +254,10 @@ public class StateInitializationContextImpl implements StateInitializationContex
 						this.offsets = metaOffsets;
 						this.offPos = 0;
 
-						closableRegistry.unregisterClosable(currentStream);
-						IOUtils.closeQuietly(currentStream);
-						currentStream = null;
+						if (closableRegistry.unregisterCloseable(currentStream)) {
+							IOUtils.closeQuietly(currentStream);
+							currentStream = null;
+						}
 
 						return true;
 					}
@@ -308,14 +309,18 @@ public class StateInitializationContextImpl implements StateInitializationContex
 		}
 
 		protected void openCurrentStream() throws IOException {
+
+			Preconditions.checkState(currentStream == null);
+
 			FSDataInputStream stream = currentStateHandle.openInputStream();
-			closableRegistry.registerClosable(stream);
+			closableRegistry.registerCloseable(stream);
 			currentStream = stream;
 		}
 
 		protected void closeCurrentStream() {
-			closableRegistry.unregisterClosable(currentStream);
-			IOUtils.closeQuietly(currentStream);
+			if (closableRegistry.unregisterCloseable(currentStream)) {
+				IOUtils.closeQuietly(currentStream);
+			}
 			currentStream = null;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
index 5db0138..6a8a08f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
@@ -88,7 +88,7 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
 		CheckpointStreamFactory.CheckpointStateOutputStream cout =
 				streamFactory.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
 
-		closableRegistry.registerClosable(cout);
+		closableRegistry.registerCloseable(cout);
 		return cout;
 	}
 
@@ -120,22 +120,25 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
 	}
 
 	private <T extends StreamStateHandle> T closeAndUnregisterStreamToObtainStateHandle(
-			NonClosingCheckpointOutputStream<T> stream) throws IOException {
-		if (null == stream) {
+		NonClosingCheckpointOutputStream<T> stream) throws IOException {
+
+		if (null != stream && closableRegistry.unregisterCloseable(stream.getDelegate())) {
+			return stream.closeAndGetHandle();
+		} else {
 			return null;
 		}
-
-		closableRegistry.unregisterClosable(stream.getDelegate());
-
-		// for now we only support synchronous writing
-		return stream.closeAndGetHandle();
 	}
 
-	private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<T> stream) throws IOException {
+	private <T extends StreamStateHandle> void closeAndUnregisterStream(
+		NonClosingCheckpointOutputStream<T> stream) throws IOException {
+
 		Preconditions.checkNotNull(stream);
 
-		closableRegistry.unregisterClosable(stream.getDelegate());
-		stream.getDelegate().close();
+		CheckpointStreamFactory.CheckpointStateOutputStream delegate = stream.getDelegate();
+
+		if (closableRegistry.unregisterCloseable(delegate)) {
+			delegate.close();
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index e235b96..bf92b34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -35,8 +35,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
 import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -325,30 +325,56 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		//--------------------------------------------------- this becomes the end of sync part
 
 		// implementation of the async IO operation, based on FutureTask
-		final AbstractAsyncSnapshotIOCallable<KeyedStateHandle> ioCallable =
-			new AbstractAsyncSnapshotIOCallable<KeyedStateHandle>(
-				checkpointId,
-				timestamp,
-				streamFactory,
-				cancelStreamRegistry) {
+		final AbstractAsyncCallableWithResources<KeyedStateHandle> ioCallable =
+			new AbstractAsyncCallableWithResources<KeyedStateHandle>() {
+
+				CheckpointStreamFactory.CheckpointStateOutputStream stream = null;
+
+				@Override
+				protected void acquireResources() throws Exception {
+					stream = streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+					cancelStreamRegistry.registerCloseable(stream);
+				}
+
+				@Override
+				protected void releaseResources() throws Exception {
+
+					if (cancelStreamRegistry.unregisterCloseable(stream)) {
+						IOUtils.closeQuietly(stream);
+						stream = null;
+					}
+
+					for (StateTableSnapshot tableSnapshot : cowStateStableSnapshots.values()) {
+						tableSnapshot.release();
+					}
+				}
+
+				@Override
+				protected void stopOperation() throws Exception {
+					if (cancelStreamRegistry.unregisterCloseable(stream)) {
+						IOUtils.closeQuietly(stream);
+						stream = null;
+					}
+				}
 
 				@Override
 				public KeyGroupsStateHandle performOperation() throws Exception {
 					long asyncStartTime = System.currentTimeMillis();
 
-					CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
-					DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(stream);
+					CheckpointStreamFactory.CheckpointStateOutputStream localStream = this.stream;
+
+					DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(localStream);
 					serializationProxy.write(outView);
 
 					long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()];
 
 					for (int keyGroupPos = 0; keyGroupPos < keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
 						int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos);
-						keyGroupRangeOffsets[keyGroupPos] = stream.getPos();
+						keyGroupRangeOffsets[keyGroupPos] = localStream.getPos();
 						outView.writeInt(keyGroupId);
 
 						for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
-							OutputStream kgCompressionOut = keyGroupCompressionDecorator.decorateWithCompression(stream);
+							OutputStream kgCompressionOut = keyGroupCompressionDecorator.decorateWithCompression(localStream);
 							DataOutputViewStreamWrapper kgCompressionView = new DataOutputViewStreamWrapper(kgCompressionOut);
 							kgCompressionView.writeShort(kVStateToId.get(kvState.getKey()));
 							cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(kgCompressionView, keyGroupId);
@@ -356,21 +382,29 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 						}
 					}
 
-					final StreamStateHandle streamStateHandle = closeStreamAndGetStateHandle();
+					if (cancelStreamRegistry.unregisterCloseable(stream)) {
 
-					if (asynchronousSnapshots) {
-						LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.",
-							streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime));
-					}
+						final StreamStateHandle streamStateHandle = stream.closeAndGetHandle();
+						stream = null;
 
-					if (streamStateHandle == null) {
-						return null;
-					}
+						if (asynchronousSnapshots) {
+							LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.",
+								streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime));
+						}
 
-					KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
-					final KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle);
+						if (streamStateHandle != null) {
 
-					return keyGroupsStateHandle;
+							KeyGroupRangeOffsets offsets =
+								new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
+
+							final KeyGroupsStateHandle keyGroupsStateHandle =
+								new KeyGroupsStateHandle(offsets, streamStateHandle);
+
+							return keyGroupsStateHandle;
+						}
+					}
+
+					return null;
 				}
 			};
 
@@ -425,7 +459,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle;
 			FSDataInputStream fsDataInputStream = keyGroupsStateHandle.openInputStream();
-			cancelStreamRegistry.registerClosable(fsDataInputStream);
+			cancelStreamRegistry.registerCloseable(fsDataInputStream);
 
 			try {
 				DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream);
@@ -533,8 +567,9 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					}
 				}
 			} finally {
-				cancelStreamRegistry.unregisterClosable(fsDataInputStream);
-				IOUtils.closeQuietly(fsDataInputStream);
+				if (cancelStreamRegistry.unregisterCloseable(fsDataInputStream)) {
+					IOUtils.closeQuietly(fsDataInputStream);
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 6089240..631cdfc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -709,7 +709,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		OperatorStateBackend operatorStateBackend = stateBackend.createOperatorStateBackend(env, opId);
 
 		// let operator state backend participate in the operator lifecycle, i.e. make it responsive to cancelation
-		cancelables.registerClosable(operatorStateBackend);
+		cancelables.registerCloseable(operatorStateBackend);
 
 		// restore if we have some old state
 		if (null != restoreStateHandles) {
@@ -742,7 +742,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				getEnvironment().getTaskKvStateRegistry());
 
 		// let keyed state backend participate in the operator lifecycle, i.e. make it responsive to cancelation
-		cancelables.registerClosable(keyedStateBackend);
+		cancelables.registerCloseable(keyedStateBackend);
 
 		// restore if we have some old state
 		Collection<KeyedStateHandle> restoreKeyedStateHandles = null;
@@ -933,7 +933,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 				owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException);
 			} finally {
-				owner.cancelables.unregisterClosable(this);
+				owner.cancelables.unregisterCloseable(this);
 				FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
 			}
 		}
@@ -1086,7 +1086,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 					checkpointMetrics,
 					startAsyncPartNano);
 
-			owner.cancelables.registerClosable(asyncCheckpointRunnable);
+			owner.cancelables.registerCloseable(asyncCheckpointRunnable);
 			owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 9bb91ad..811d700 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -1117,7 +1117,7 @@ public class StreamTaskTest extends TestLogger {
 			holder.start();
 			try {
 				// cancellation should try and cancel this
-				getCancelables().registerClosable(holder);
+				getCancelables().registerCloseable(holder);
 
 				// wait till the lock holder has the lock
 				latch.await();

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 9156f34..793e8f6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -220,7 +220,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 						environment,
 						operator.getClass().getSimpleName());
 
-					mockTask.getCancelables().registerClosable(osb);
+					mockTask.getCancelables().registerCloseable(osb);
 
 					if (null != stateHandles) {
 						osb.restore(stateHandles);

http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 4d5fa71..829ac93 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -180,7 +180,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 			}
 			case ROCKSDB_FULLY_ASYNC: {
 				String rocksDb = tempFolder.newFolder().getAbsolutePath();
-				RocksDBStateBackend rdb = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE));
+				String backups = tempFolder.newFolder().getAbsolutePath();
+				RocksDBStateBackend rdb = new RocksDBStateBackend(new FsStateBackend("file://" + backups));
 				rdb.setDbStoragePath(rocksDb);
 				this.stateBackend = rdb;
 				break;