You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/01 14:51:34 UTC

[2/2] flink git commit: [FLINK-5218] [state backends] Eagerly close pending FsCheckpointStateOutputStream on task cancellation

[FLINK-5218] [state backends] Eagerly close pending FsCheckpointStateOutputStream on task cancellation

This fix contains two modifications:
  1. State backends implement 'Closeable' and register themselves at the 'canceleables'
  2. The FsStateBackend tracks all it unclosed FsCheckpointOutputStreams and closes them on 'close()'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59f61bf6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59f61bf6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59f61bf6

Branch: refs/heads/release-1.1
Commit: 59f61bf6cb8351cec9369e2de39c6eeffbda10ea
Parents: e475eb2
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 30 22:38:23 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Dec 1 15:09:43 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    |  86 ++++--
 .../runtime/state/AbstractStateBackend.java     |  58 +++-
 .../state/filesystem/FsStateBackend.java        | 159 ++++++++--
 .../state/memory/MemoryStateBackend.java        |   2 +-
 .../FsCheckpointStateOutputStreamTest.java      | 132 ++++++++-
 .../state/FsStateBackendClosingTest.java        |  65 +++++
 .../streaming/runtime/tasks/StreamTask.java     |   7 +
 .../runtime/tasks/BlockingCheckpointsTest.java  | 290 +++++++++++++++++++
 .../streaming/runtime/StateBackendITCase.java   |   2 +-
 9 files changed, 731 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index deba9f9..0412a4a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -312,50 +312,74 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	}
 
 	@Override
-	public void dispose() {
-		super.dispose();
-		nonPartitionedStateBackend.dispose();
+	public void dispose() throws Exception {
+		Throwable exception = null;
 
-		// we have to lock because we might have an asynchronous checkpoint going on
-		synchronized (dbCleanupLock) {
-			if (db != null) {
-				if (this.dbOptions != null) {
-					this.dbOptions.dispose();
-					this.dbOptions = null;
-				}
-
-				for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) {
-					column.f0.dispose();
-				}
+		// make sure the individual states are disposed
+		try {
+			super.dispose();
+		}
+		catch (Throwable t) {
+			exception = t;
+		}
 
-				db.dispose();
-				db = null;
+		try {
+			nonPartitionedStateBackend.dispose();
+		}
+		catch (Throwable t) {
+			if (exception == null) {
+				exception = t;
+			} else {
+				exception.addSuppressed(t);
 			}
 		}
-	}
-
-	@Override
-	public void close() throws Exception {
-		nonPartitionedStateBackend.close();
 
 		// we have to lock because we might have an asynchronous checkpoint going on
-		synchronized (dbCleanupLock) {
-			if (db != null) {
-				if (this.dbOptions != null) {
-					this.dbOptions.dispose();
-					this.dbOptions = null;
-				}
+		// this must also happen in any case, regardless of earlier exceptions
+		try {
+			synchronized (dbCleanupLock) {
+				if (db != null) {
+					if (this.dbOptions != null) {
+						this.dbOptions.dispose();
+						this.dbOptions = null;
+					}
+
+					for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) {
+						column.f0.dispose();
+					}
 
-				for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) {
-					column.f0.dispose();
+					db.dispose();
+					db = null;
 				}
+			}
+		}
+		catch (Throwable t) {
+			if (exception == null) {
+				exception = t;
+			} else {
+				exception.addSuppressed(t);
+			}
+		}
 
-				db.dispose();
-				db = null;
+		if (exception != null) {
+			if (exception instanceof Exception) {
+				throw (Exception) exception;
+			} else if (exception instanceof Error) {
+				throw (Error) exception;
+			} else {
+				throw new Exception(exception.getMessage(), exception);
 			}
 		}
 	}
 
+	@Override
+	public void close() throws IOException {
+		// we only close all I/O streams here and do not yet dispose of the native resources
+		// otherwise this can lead to SEGFAULT problems
+		// native resources must only be released in the 'dispose()' method.
+		nonPartitionedStateBackend.close();
+	}
+
 	private File getDbPath(String stateName) {
 		return new File(new File(new File(getNextStoragePath(), jobId.toString()), operatorIdentifier), stateName);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index b86688b..ab9854c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -39,6 +39,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.execution.Environment;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
@@ -51,8 +52,8 @@ import java.util.Map;
 /**
  * A state backend defines how state is stored and snapshotted during checkpoints.
  */
-public abstract class AbstractStateBackend implements java.io.Serializable {
-	
+public abstract class AbstractStateBackend implements java.io.Serializable, Closeable {
+
 	private static final long serialVersionUID = 4620413814639220247L;
 
 	protected transient TypeSerializer<?> keySerializer;
@@ -102,23 +103,61 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
 	public abstract void disposeAllStateForCurrentJob() throws Exception;
 
 	/**
-	 * Closes the state backend, releasing all internal resources, but does not delete any persistent
-	 * checkpoint data.
+	 * Closes the state backend, dropping and aborting all I/O operations that are currently
+	 * pending.
 	 *
-	 * @throws Exception Exceptions can be forwarded and will be logged by the system
+	 * @throws IOException Exceptions can be forwarded and will be logged by the system
 	 */
-	public abstract void close() throws Exception;
+	public abstract void close() throws IOException;
+
+	/**
+	 * Releases all resources held by this state backend.
+	 * 
+	 * <p>This method must make sure that all resources are disposed, even if an exception happens
+	 * on the way.
+	 * 
+	 * @throws Exception This method should report exceptions that occur.
+	 */
+	public void dispose() throws Exception {
+		Throwable exception = null;
+
+		// make sure things are closed
+		try {
+			close();
+		}
+		catch (Throwable t) {
+			exception = t;
+		}
 
-	public void dispose() {
+		// now actually dispose things
 		lastName = null;
 		lastState = null;
 		if (keyValueStates != null) {
 			for (KvState<?, ?, ?, ?, ?> state : keyValueStates) {
-				state.dispose();
+				try {
+					state.dispose();
+				}
+				catch (Throwable t) {
+					if (exception == null) {
+						exception = t;
+					} else {
+						exception.addSuppressed(t);
+					}
+				}
 			}
 		}
 		keyValueStates = null;
 		keyValueStatesByName = null;
+
+		if (exception != null) {
+			if (exception instanceof Exception) {
+				throw (Exception) exception;
+			} else if (exception instanceof Error) {
+				throw (Error) exception;
+			} else {
+				throw new Exception(exception.getMessage(), exception);
+			}
+		}
 	}
 	
 	// ------------------------------------------------------------------------
@@ -444,6 +483,9 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
 		 * @throws IOException Thrown, if the stream cannot be closed.
 		 */
 		public StateHandle<DataInputView> closeAndGetHandle() throws IOException {
+			// we do not flush() here, because that forces the 'CheckpointStateOutputStream' to files,
+			// even when it could stay in a 'small chunk' memory handle.
+			// the 'DataOutputViewStreamWrapper' does not buffer data anyways
 			return new DataInputViewHandle(out.closeAndGetHandle());
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 8a8a26d..446f3ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -33,9 +33,10 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,9 +45,13 @@ import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.UUID;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The file state backend is a state backend that stores the state of streaming jobs in a file system.
  *
@@ -86,6 +91,10 @@ public class FsStateBackend extends AbstractStateBackend {
 	/** Cached handle to the file system for file operations */
 	private transient FileSystem filesystem;
 
+	/** Set of currently open streams */
+	private transient HashSet<FsCheckpointStateOutputStream> openStreams;
+
+	private transient volatile boolean closed;
 
 	/**
 	 * Creates a new state backend that stores its checkpoint data in the file system and location
@@ -236,9 +245,11 @@ public class FsStateBackend extends AbstractStateBackend {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void initializeForJob(Environment env,
-		String operatorIdentifier,
-		TypeSerializer<?> keySerializer) throws Exception {
+	public void initializeForJob(
+			Environment env,
+			String operatorIdentifier,
+			TypeSerializer<?> keySerializer) throws Exception {
+
 		super.initializeForJob(env, operatorIdentifier, keySerializer);
 
 		Path dir = new Path(basePath, env.getJobID().toString());
@@ -249,6 +260,7 @@ public class FsStateBackend extends AbstractStateBackend {
 		filesystem.mkdirs(dir);
 
 		checkpointDirectory = dir;
+		openStreams = new HashSet<>();
 	}
 
 	@Override
@@ -267,7 +279,42 @@ public class FsStateBackend extends AbstractStateBackend {
 	}
 
 	@Override
-	public void close() throws Exception {}
+	public void close() throws IOException {
+		closed = true;
+
+		// cache a copy on the heap for safety 
+		final HashSet<FsCheckpointStateOutputStream> openStreams = this.openStreams;
+		if (openStreams != null) {
+
+			// we need to draw a copy of the set, since the closing concurrently modifies the set
+			final ArrayList<FsCheckpointStateOutputStream> streams;
+
+			//noinspection SynchronizationOnLocalVariableOrMethodParameter
+			synchronized (openStreams) {
+				streams = new ArrayList<>(openStreams);
+				openStreams.clear();
+			}
+
+			// close all the streams, collect exceptions and record all but the first as suppressed
+			Throwable exception = null;
+			for (FsCheckpointStateOutputStream stream : streams) {
+				try {
+					stream.close();
+				}
+				catch (Throwable t) {
+					if (exception == null) {
+						exception = t;
+					} else {
+						exception.addSuppressed(t);
+					}
+				}
+			}
+
+			if (exception != null) {
+				ExceptionUtils.rethrowIOException(exception);
+			}
+		}
+	}
 
 	// ------------------------------------------------------------------------
 	//  state backend operations
@@ -299,15 +346,22 @@ public class FsStateBackend extends AbstractStateBackend {
 			S state, long checkpointID, long timestamp) throws Exception
 	{
 		checkFileSystemInitialized();
-		
+
 		Path checkpointDir = createCheckpointDirPath(checkpointID);
 		int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);
 
-		FsCheckpointStateOutputStream stream = 
-			new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold);
-		
-		try (ObjectOutputStream os = new ObjectOutputStream(stream)) {
+		try (FsCheckpointStateOutputStream stream = new FsCheckpointStateOutputStream(
+				checkpointDir, filesystem, openStreams, bufferSize, fileStateThreshold))
+		{
+			// perform the closing double-check AFTER! the creation of the stream
+			if (closed) {
+				throw new IOException("closed");
+			}
+
+			ObjectOutputStream os = new ObjectOutputStream(stream);
 			os.writeObject(state);
+			os.flush();
+
 			return stream.closeAndGetHandle().toSerializableHandle();
 		}
 	}
@@ -318,7 +372,16 @@ public class FsStateBackend extends AbstractStateBackend {
 
 		Path checkpointDir = createCheckpointDirPath(checkpointID);
 		int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);
-		return new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold);
+		FsCheckpointStateOutputStream stream = new FsCheckpointStateOutputStream(
+				checkpointDir, filesystem, openStreams, bufferSize, fileStateThreshold);
+
+		// perform the closing double-check AFTER! the creation of the stream
+		if (closed) {
+			stream.close();
+			throw new IOException("closed");
+		}
+
+		return stream;
 	}
 
 	// ------------------------------------------------------------------------
@@ -426,29 +489,40 @@ public class FsStateBackend extends AbstractStateBackend {
 		private int pos;
 
 		private FSDataOutputStream outStream;
-		
+
 		private final int localStateThreshold;
 
 		private final Path basePath;
 
 		private final FileSystem fs;
-		
+
+		private final HashSet<FsCheckpointStateOutputStream> openStreams;
+
 		private Path statePath;
-		
-		private boolean closed;
+
+		private volatile boolean closed;
 
 		public FsCheckpointStateOutputStream(
-					Path basePath, FileSystem fs,
-					int bufferSize, int localStateThreshold)
+					Path basePath,
+					FileSystem fs,
+					HashSet<FsCheckpointStateOutputStream> openStreams,
+					int bufferSize,
+					int localStateThreshold)
 		{
 			if (bufferSize < localStateThreshold) {
 				throw new IllegalArgumentException();
 			}
-			
+
 			this.basePath = basePath;
 			this.fs = fs;
+			this.openStreams = checkNotNull(openStreams);
 			this.writeBuffer = new byte[bufferSize];
 			this.localStateThreshold = localStateThreshold;
+
+			//noinspection SynchronizationOnLocalVariableOrMethodParameter
+			synchronized (openStreams) {
+				openStreams.add(this);
+			}
 		}
 
 
@@ -519,6 +593,9 @@ public class FsStateBackend extends AbstractStateBackend {
 					pos = 0;
 				}
 			}
+			else {
+				throw new IOException("stream is closed");
+			}
 		}
 
 		/**
@@ -530,6 +607,16 @@ public class FsStateBackend extends AbstractStateBackend {
 		public void close() {
 			if (!closed) {
 				closed = true;
+
+				// make sure that the write() methods cannot succeed any more
+				pos = writeBuffer.length;
+
+				// remove the stream from the open streams set
+				synchronized (openStreams) {
+					openStreams.remove(this);
+				}
+
+				// close all resources
 				if (outStream != null) {
 					try {
 						outStream.close();
@@ -551,15 +638,24 @@ public class FsStateBackend extends AbstractStateBackend {
 		public StreamStateHandle closeAndGetHandle() throws IOException {
 			synchronized (this) {
 				if (!closed) {
+
+					// remove the stream from the open streams set
+					synchronized (openStreams) {
+						openStreams.remove(this);
+					}
+
+					// close all resources
 					if (outStream == null && pos <= localStateThreshold) {
 						closed = true;
 						byte[] bytes = Arrays.copyOf(writeBuffer, pos);
+						pos = writeBuffer.length;
 						return new ByteStreamStateHandle(bytes);
 					}
 					else {
 						flush();
 						outStream.close();
 						closed = true;
+						pos = writeBuffer.length;
 						return new FileStreamStateHandle(statePath);
 					}
 				}
@@ -577,9 +673,17 @@ public class FsStateBackend extends AbstractStateBackend {
 		public Path closeAndGetPath() throws IOException {
 			synchronized (this) {
 				if (!closed) {
-					closed = true;
+
+					// remove the stream from the open streams set
+					synchronized (openStreams) {
+						openStreams.remove(this);
+					}
+
+					// close all resources
 					flush();
 					outStream.close();
+					closed = true;
+					pos = writeBuffer.length;
 					return statePath;
 				}
 				else {
@@ -587,5 +691,22 @@ public class FsStateBackend extends AbstractStateBackend {
 				}
 			}
 		}
+
+		public boolean isClosed() {
+			return closed;
+		}
+
+		// we need referential identity on the streams for the closing set to work
+		// properly, so we implement that via final methods here
+
+		@Override
+		public final int hashCode() {
+			return super.hashCode();
+		}
+
+		@Override
+		public final boolean equals(Object obj) {
+			return this == obj;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 7b9d21b..b155244 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -78,7 +78,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
 	}
 
 	@Override
-	public void close() throws Exception {}
+	public void close() {}
 
 	// ------------------------------------------------------------------------
 	//  State backend operations

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
index 5964b72..3aba9e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
@@ -22,34 +22,36 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend.FsCheckpointStateOutputStream;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 
 import org.junit.Test;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
+import java.util.HashSet;
 import java.util.Random;
 
 import static org.junit.Assert.*;
 
 public class FsCheckpointStateOutputStreamTest {
-	
+
 	/** The temp dir, obtained in a platform neutral way */
 	private static final Path TEMP_DIR_PATH = new Path(new File(System.getProperty("java.io.tmpdir")).toURI());
-	
-	
+
+
 	@Test(expected = IllegalArgumentException.class)
 	public void testWrongParameters() {
 		// this should fail
 		new FsStateBackend.FsCheckpointStateOutputStream(
-			TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 4000, 5000);
+			TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), new HashSet<FsCheckpointStateOutputStream>(), 4000, 5000);
 	}
 
-
 	@Test
 	public void testEmptyState() throws Exception {
 		AbstractStateBackend.CheckpointStateOutputStream stream = new FsStateBackend.FsCheckpointStateOutputStream(
-			TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 1024, 512);
+			TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), new HashSet<FsCheckpointStateOutputStream>(), 1024, 512);
 		
 		StreamStateHandle handle = stream.closeAndGetHandle();
 		assertTrue(handle instanceof ByteStreamStateHandle);
@@ -57,7 +59,101 @@ public class FsCheckpointStateOutputStreamTest {
 		InputStream inStream = handle.getState(ClassLoader.getSystemClassLoader());
 		assertEquals(-1, inStream.read());
 	}
-	
+
+	@Test
+	public void testCloseAndGetPath() throws Exception {
+		FsCheckpointStateOutputStream stream = new FsCheckpointStateOutputStream(
+				TEMP_DIR_PATH,
+				FileSystem.getLocalFileSystem(),
+				new HashSet<FsCheckpointStateOutputStream>(),
+				1024,
+				512);
+
+		stream.write(1);
+
+		Path path = stream.closeAndGetPath();
+		assertNotNull(path);
+
+		// cleanup
+		FileSystem.getLocalFileSystem().delete(path, false);
+	}
+
+	@Test
+	public void testWriteFailsFastWhenClosed() throws Exception {
+		final HashSet<FsCheckpointStateOutputStream> openStreams = new HashSet<>();
+
+		FsCheckpointStateOutputStream stream1 = new FsCheckpointStateOutputStream(
+				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+		FsCheckpointStateOutputStream stream2 = new FsCheckpointStateOutputStream(
+				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+		FsCheckpointStateOutputStream stream3 = new FsCheckpointStateOutputStream(
+				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+		assertFalse(stream1.isClosed());
+		assertFalse(stream2.isClosed());
+		assertFalse(stream3.isClosed());
+
+		// simple close
+		stream1.close();
+
+		// close with handle
+		StreamStateHandle handle = stream2.closeAndGetHandle();
+
+		// close with path
+		Path path = stream3.closeAndGetPath();
+
+		assertTrue(stream1.isClosed());
+		assertTrue(stream2.isClosed());
+		assertTrue(stream3.isClosed());
+
+		validateStreamNotWritable(stream1);
+		validateStreamNotWritable(stream2);
+		validateStreamNotWritable(stream3);
+
+		// clean up
+		handle.discardState();
+		FileSystem.getLocalFileSystem().delete(path, false);
+	}
+
+	@Test
+	public void testAddAndRemoveFromOpenStreamsSet() throws Exception {
+		final HashSet<FsCheckpointStateOutputStream> openStreams = new HashSet<>();
+
+		FsCheckpointStateOutputStream stream1 = new FsCheckpointStateOutputStream(
+				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+		FsCheckpointStateOutputStream stream2 = new FsCheckpointStateOutputStream(
+				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+		FsCheckpointStateOutputStream stream3 = new FsCheckpointStateOutputStream(
+				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+		assertTrue(openStreams.contains(stream1));
+		assertTrue(openStreams.contains(stream2));
+		assertTrue(openStreams.contains(stream3));
+		assertEquals(3, openStreams.size());
+
+		// simple close
+		stream1.close();
+
+		// close with handle
+		StreamStateHandle handle = stream2.closeAndGetHandle();
+
+		// close with path
+		Path path = stream3.closeAndGetPath();
+
+		assertFalse(openStreams.contains(stream1));
+		assertFalse(openStreams.contains(stream2));
+		assertFalse(openStreams.contains(stream3));
+		assertEquals(0, openStreams.size());
+
+		// clean up
+		handle.discardState();
+		FileSystem.getLocalFileSystem().delete(path, false);
+	}
+
 	@Test
 	public void testStateBlowMemThreshold() throws Exception {
 		runTest(222, 999, 512, false);
@@ -72,16 +168,17 @@ public class FsCheckpointStateOutputStreamTest {
 	public void testStateAboveMemThreshold() throws Exception {
 		runTest(576446, 259, 17, true);
 	}
-	
+
 	@Test
 	public void testZeroThreshold() throws Exception {
 		runTest(16678, 4096, 0, true);
 	}
-	
+
 	private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile) throws Exception {
 		AbstractStateBackend.CheckpointStateOutputStream stream =
 			new FsStateBackend.FsCheckpointStateOutputStream(
-				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), bufferSize, threshold);
+					TEMP_DIR_PATH, FileSystem.getLocalFileSystem(),
+					new HashSet<FsCheckpointStateOutputStream>(), bufferSize, threshold);
 		
 		Random rnd = new Random();
 		byte[] original = new byte[numBytes];
@@ -125,4 +222,19 @@ public class FsCheckpointStateOutputStreamTest {
 		
 		handle.discardState();
 	}
+
+	private void validateStreamNotWritable(FsCheckpointStateOutputStream stream) {
+		try {
+			stream.write(1);
+			fail();
+		} catch (IOException e) {
+			// expected
+		}
+		try {
+			stream.write(new byte[4], 1, 2);
+			fail();
+		} catch (IOException e) {
+			// expected
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsStateBackendClosingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsStateBackendClosingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsStateBackendClosingTest.java
new file mode 100644
index 0000000..6df488d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsStateBackendClosingTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend.FsCheckpointStateOutputStream;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class FsStateBackendClosingTest {
+
+	@Test
+	public void testStateBackendClosesStreams() throws Exception {
+		final URI tempFolder = new File(EnvironmentInformation.getTemporaryFileDirectory()).toURI();
+		final FsStateBackend backend = new FsStateBackend(tempFolder);
+
+		backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
+
+		FsCheckpointStateOutputStream stream = backend.createCheckpointStateOutputStream(17L, 12345L);
+
+		// stream is open, this should succeed
+		assertFalse(stream.isClosed());
+		stream.write(1);
+
+		// close the backend - that should close the stream
+		backend.close();
+
+		assertTrue(stream.isClosed());
+
+		try {
+			stream.write(2);
+			fail("stream is closed, 'write(int)' should fail with an exception");
+		}
+		catch (IOException e) {
+			// expected
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index aaaead0..99df060 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -772,7 +772,14 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 					}
 			}
 		}
+
 		stateBackend.initializeForJob(getEnvironment(), operatorIdentifier, keySerializer);
+
+		// make sure the state backend is closed eagerly in case of cancellation
+		synchronized (cancelables) {
+			cancelables.add(stateBackend);
+		}
+
 		return stateBackend;
 
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
new file mode 100644
index 0000000..ed993c7
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamFilter;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+/**
+ * This test checks that task checkpoints that block and do not react to thread interrupts
+ * are
+ */
+public class BlockingCheckpointsTest {
+
+	private static final OneShotLatch IN_CHECKPOINT_LATCH = new OneShotLatch();
+
+	@Test
+	public void testBlockingNonInterruptibleCheckpoint() throws Exception {
+
+		Configuration taskConfig = new Configuration();
+		StreamConfig cfg = new StreamConfig(taskConfig);
+		cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+		cfg.setStreamOperator(new TestOperator());
+		cfg.setStateBackend(new LockingStreamStateBackend());
+
+		Task task = createTask(taskConfig);
+
+		// start the task and wait until it is in "restore"
+		task.startTaskThread();
+		IN_CHECKPOINT_LATCH.await();
+
+		// cancel the task and wait. unless cancellation properly closes
+		// the streams, this will never terminate
+		task.cancelExecution();
+		task.getExecutingThread().join();
+
+		assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+		assertNull(task.getFailureCause());
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private static Task createTask(Configuration taskConfig) throws IOException {
+
+		JobInformation jobInformation = new JobInformation(
+			new JobID(),
+			"test job name",
+			new SerializedValue<>(new ExecutionConfig()),
+			new Configuration(),
+			Collections.<BlobKey>emptyList(),
+			Collections.<URL>emptyList());
+
+		TaskInformation taskInformation = new TaskInformation(
+			new JobVertexID(),
+			"test task name",
+			1,
+			TestStreamTask.class.getName(),
+			taskConfig);
+
+		return new Task(
+			jobInformation,
+			taskInformation,
+			new ExecutionAttemptID(),
+			0,
+			0,
+			Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+			Collections.<InputGateDeploymentDescriptor>emptyList(),
+			0,
+			null,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			mock(NetworkEnvironment.class),
+			mock(BroadcastVariableManager.class),
+			mock(ActorGateway.class),
+			mock(ActorGateway.class),
+			new FiniteDuration(10, TimeUnit.SECONDS),
+			new FallbackLibraryCacheManager(),
+			new FileCache(new Configuration()),
+			new TaskManagerRuntimeInfo(
+					"localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
+			new UnregisteredTaskMetricsGroup());
+		
+	}
+
+	// ------------------------------------------------------------------------
+	//  state backend with locking output stream
+	// ------------------------------------------------------------------------
+
+	private static class LockingStreamStateBackend extends AbstractStateBackend {
+
+		private static final long serialVersionUID = 1L;
+
+		private final LockingOutputStream out = new LockingOutputStream();
+
+		@Override
+		public void disposeAllStateForCurrentJob() {}
+
+		@Override
+		public void close() throws IOException {
+			out.close();
+		}
+
+		@Override
+		public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
+			return out;
+		}
+
+		@Override
+		protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public <S extends Serializable> StateHandle<S> checkpointStateSerializable(S state, long checkpointID, long timestamp) throws Exception {
+			throw new UnsupportedOperationException();
+		}
+	}
+
+	private static final class LockingOutputStream extends CheckpointStateOutputStream implements Serializable {
+		private static final long serialVersionUID = 1L;
+		
+		private final SerializableObject lock = new SerializableObject();
+		private volatile boolean closed;
+
+		@Override
+		public StreamStateHandle closeAndGetHandle() throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void write(int b) throws IOException {
+			// this needs to not react to interrupts until the handle is closed
+			synchronized (lock) {
+				while (!closed) {
+					try {
+						lock.wait();
+					}
+					catch (InterruptedException ignored) {}
+				}
+			}
+		}
+
+		@Override
+		public void close() throws IOException {
+			synchronized (lock) {
+				closed = true;
+				lock.notifyAll();
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test source operator that calls into the locking checkpoint output stream
+	// ------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static final class TestOperator extends StreamFilter<Object> {
+		private static final long serialVersionUID = 1L;
+
+		public TestOperator() {
+			super(new FilterFunction<Object>() {
+				@Override
+				public boolean filter(Object value) {
+					return false;
+				}
+			});
+		}
+
+		@Override
+		public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+			AbstractStateBackend stateBackend = getStateBackend();
+			CheckpointStateOutputStream outStream = stateBackend.createCheckpointStateOutputStream(checkpointId, timestamp);
+
+			IN_CHECKPOINT_LATCH.trigger();
+
+			// this should lock
+			outStream.write(1);
+
+			// this should be unreachable
+			return null;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  stream task that simply triggers a checkpoint
+	// ------------------------------------------------------------------------
+
+	public static final class TestStreamTask extends OneInputStreamTask<Object, Object> {
+
+		@Override
+		public void init() {}
+
+		@Override
+		protected void run() throws Exception {
+			triggerCheckpointOnBarrier(11L, System.currentTimeMillis());
+		}
+
+		@Override
+		protected void cleanup() {}
+
+		@Override
+		protected void cancelTask() {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index 6288946..4eb8b4a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -104,7 +104,7 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
 		public void disposeAllStateForCurrentJob() throws Exception {}
 
 		@Override
-		public void close() throws Exception {}
+		public void close() {}
 
 		@Override
 		protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception {