You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/01 14:51:33 UTC

[1/2] flink git commit: [FLINK-5216] [checkpoints] 'Min Time Between Checkpoints' references timestamp after checkpoint

Repository: flink
Updated Branches:
  refs/heads/release-1.1 7b5d769ad -> 59f61bf6c


[FLINK-5216] [checkpoints] 'Min Time Between Checkpoints' references timestamp after checkpoint


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e475eb2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e475eb2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e475eb2d

Branch: refs/heads/release-1.1
Commit: e475eb2d9705b8948ce862f25adf91e25a4948b0
Parents: 7b5d769
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 30 20:31:07 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Dec 1 14:01:40 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  43 ++++--
 .../checkpoint/CheckpointCoordinatorTest.java   | 133 +++++++++----------
 2 files changed, 97 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e475eb2d/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 24cc3cb..0d09922 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -117,7 +117,7 @@ public class CheckpointCoordinator {
 
 	/** The min time(in ms) to delay after a checkpoint could be triggered. Allows to
 	 * enforce minimum processing time between checkpoint attempts */
-	private final long minPauseBetweenCheckpoints;
+	private final long minPauseBetweenCheckpointsNanos;
 
 	/** The maximum number of checkpoints that may be in progress at the same time */
 	private final int maxConcurrentCheckpointAttempts;
@@ -133,7 +133,8 @@ public class CheckpointCoordinator {
 
 	private ScheduledTrigger currentPeriodicTrigger;
 
-	private long lastTriggeredCheckpoint;
+	/** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed */
+	private long lastCheckpointCompletionNanos;
 
 	/** Flag whether a triggered checkpoint should immediately schedule the next checkpoint.
 	 * Non-volatile, because only accessed in synchronized scope */
@@ -201,10 +202,16 @@ public class CheckpointCoordinator {
 		checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0");
 		checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1");
 
+		// max "in between duration" can be one year - this is to prevent numeric overflows
+		if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) {
+			minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 1_000;
+			LOG.warn("Reducing minimum pause between checkpoints to " + minPauseBetweenCheckpoints + " ms (1 year)");
+		}
+
 		this.job = checkNotNull(job);
 		this.baseInterval = baseInterval;
 		this.checkpointTimeout = checkpointTimeout;
-		this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+		this.minPauseBetweenCheckpointsNanos = minPauseBetweenCheckpoints * 1_000_000;
 		this.maxConcurrentCheckpointAttempts = maxConcurrentCheckpointAttempts;
 		this.tasksToTrigger = checkNotNull(tasksToTrigger);
 		this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
@@ -417,13 +424,16 @@ public class CheckpointCoordinator {
 				return false;
 			}
 
-			//make sure the minimum interval between checkpoints has passed
-			if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp && baseInterval != Long.MAX_VALUE) {
+			// make sure the minimum interval between checkpoints has passed
+			final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
+			final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
+
+			if (durationTillNextMillis > 0 && baseInterval != Long.MAX_VALUE) {
 				if (currentPeriodicTrigger != null) {
 					currentPeriodicTrigger.cancel();
 				}
 				currentPeriodicTrigger = new ScheduledTrigger();
-				timer.scheduleAtFixedRate(currentPeriodicTrigger, minPauseBetweenCheckpoints, baseInterval);
+				timer.scheduleAtFixedRate(currentPeriodicTrigger, durationTillNextMillis, baseInterval);
 				return false;
 			}
 		}
@@ -458,8 +468,6 @@ public class CheckpointCoordinator {
 		}
 
 		// we will actually trigger this checkpoint!
-
-		lastTriggeredCheckpoint = timestamp;
 		final long checkpointID;
 		if (nextCheckpointId < 0) {
 			try {
@@ -532,6 +540,19 @@ public class CheckpointCoordinator {
 					return false;
 				}
 
+				// make sure the minimum interval between checkpoints has passed
+				final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
+				final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
+
+				if (durationTillNextMillis > 0 && baseInterval != Long.MAX_VALUE) {
+					if (currentPeriodicTrigger != null) {
+						currentPeriodicTrigger.cancel();
+					}
+					currentPeriodicTrigger = new ScheduledTrigger();
+					timer.scheduleAtFixedRate(currentPeriodicTrigger, durationTillNextMillis, baseInterval);
+					return false;
+				}
+
 				pendingCheckpoints.put(checkpointID, checkpoint);
 				timer.schedule(canceller, checkpointTimeout);
 			}
@@ -682,8 +703,10 @@ public class CheckpointCoordinator {
 
 				switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), message.getStateSize(), null)) {
 					case SUCCESS:
-						// TODO: Give KV-state to the acknowledgeTask method
+
 						if (checkpoint.isFullyAcknowledged()) {
+
+							lastCheckpointCompletionNanos = System.nanoTime();
 							completed = checkpoint.finalizeCheckpoint();
 
 							completedCheckpointStore.addCheckpoint(completed);
@@ -805,6 +828,8 @@ public class CheckpointCoordinator {
 	 * <p>NOTE: The caller of this method must hold the lock when invoking the method!
 	 */
 	private void triggerQueuedRequests() {
+		assert Thread.holdsLock(lock);
+
 		if (triggerRequestQueued) {
 			triggerRequestQueued = false;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e475eb2d/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 35cce85..9159711 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -41,6 +41,8 @@ import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -50,6 +52,7 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -1193,102 +1196,92 @@ public class CheckpointCoordinatorTest {
 	 * another is triggered.
 	 */
 	@Test
-	public void testMinInterval() {
-		try {
-			final JobID jid = new JobID();
+	public void testMinTimeBetweenCheckpointsInterval() throws Exception {
+		final JobID jid = new JobID();
 
-			// create some mock execution vertices and trigger some checkpoint
-			final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
-			ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+		// create some mock execution vertices and trigger some checkpoint
+		final ExecutionAttemptID attemptID = new ExecutionAttemptID();
+		ExecutionVertex vertex = mockExecutionVertex(attemptID);
 
-			final AtomicInteger numCalls = new AtomicInteger();
+		final BlockingQueue<TriggerCheckpoint> calls = new LinkedBlockingQueue<>();
 
-			doAnswer(new Answer<Void>() {
-				@Override
-				public Void answer(InvocationOnMock invocation) throws Throwable {
-					if (invocation.getArguments()[0] instanceof TriggerCheckpoint) {
-						numCalls.incrementAndGet();
-					}
-					return null;
-				}
-			}).when(vertex1).sendMessageToCurrentExecution(any(Serializable.class), any(ExecutionAttemptID.class));
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) throws Throwable {
+				calls.add((TriggerCheckpoint) invocation.getArguments()[0]);
+				return null;
+			}
+		}).when(vertex).sendMessageToCurrentExecution(isA(TriggerCheckpoint.class), eq(attemptID));
 
-			CheckpointCoordinator coord = new CheckpointCoordinator(
-				jid,
-				10,		// periodic interval is 10 ms
-				200000,	// timeout is very long (200 s)
-				500,	// 500ms delay between checkpoints
-				10,
-				42,
-				new ExecutionVertex[] { vertex1 },
-				new ExecutionVertex[] { vertex1 },
-				new ExecutionVertex[] { vertex1 },
-				cl,
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(2, cl),
-				RecoveryMode.STANDALONE,
-				new DisabledCheckpointStatsTracker(),
-				TestExecutors.directExecutor());
+		final long delay = 50;
 
+		final CheckpointCoordinator coord = new CheckpointCoordinator(
+			jid,
+			2,           // periodic interval is 2 ms
+			200_000,     // timeout is very long (200 s)
+			delay,       // 50ms delay between checkpoints
+			1,
+			42,
+			new ExecutionVertex[] { vertex },
+			new ExecutionVertex[] { vertex },
+			new ExecutionVertex[] { vertex },
+			cl,
+			new StandaloneCheckpointIDCounter(),
+			new StandaloneCompletedCheckpointStore(2, cl),
+			RecoveryMode.STANDALONE,
+			new DisabledCheckpointStatsTracker(),
+			TestExecutors.directExecutor());
+
+		try {
 			coord.startCheckpointScheduler();
 
-			//wait until the first checkpoint was triggered
-			for (int x=0; x<20; x++) {
-				Thread.sleep(100);
-				if (numCalls.get() > 0) {
-					break;
-				}
-			}
+			// wait until the first checkpoint was triggered
+			TriggerCheckpoint call = calls.take();
+			assertEquals(1L, call.getCheckpointId());
+			assertEquals(jid, call.getJob());
+			assertEquals(attemptID, call.getTaskExecutionId());
 
-			if (numCalls.get() == 0) {
-				fail("No checkpoint was triggered within the first 2000 ms.");
-			}
-			
-			long start = System.currentTimeMillis();
-
-			for (int x = 0; x < 20; x++) {
-				Thread.sleep(100);
-				int triggeredCheckpoints = numCalls.get();
-				long curT = System.currentTimeMillis();
-
-				/**
-				 * Within a given time-frame T only T/500 checkpoints may be triggered due to the configured minimum
-				 * interval between checkpoints. This value however does not not take the first triggered checkpoint
-				 * into account (=> +1). Furthermore we have to account for the mis-alignment between checkpoints
-				 * being triggered and our time measurement (=> +1); for T=1200 a total of 3-4 checkpoints may have been
-				 * triggered depending on whether the end of the minimum interval for the first checkpoints ends before
-				 * or after T=200.
-				 */
-				long maxAllowedCheckpoints = (curT - start) / 500 + 2;
-				assertTrue(maxAllowedCheckpoints >= triggeredCheckpoints);
-			}
+			// tell the coordinator that the checkpoint is done
+			final long ackTime = System.nanoTime();
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID, 1L));
 
-			coord.stopCheckpointScheduler();
+			// wait until the next checkpoint is triggered
+			TriggerCheckpoint nextCall = calls.take();
+			final long nextCheckpointTime = System.nanoTime();
 
+			assertEquals(2L, nextCall.getCheckpointId());
+			assertEquals(jid, nextCall.getJob());
+			assertEquals(attemptID, nextCall.getTaskExecutionId());
+
+			final long delayMillis = (nextCheckpointTime - ackTime) / 1_000_000;
+
+			// we need to add one ms here to account for rounding errors
+			if (delayMillis + 1 < delay) {
+				fail("checkpoint came too early: delay was " + delayMillis + " but should have been at least " + delay);
+			}
+		}
+		finally {
+			coord.stopCheckpointScheduler();
 			coord.shutdown();
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}		
 	}
 
 	@Test
 	public void testMaxConcurrentAttempts1() {
-		testMaxConcurrentAttemps(1);
+		testMaxConcurrentAttempts(1);
 	}
 
 	@Test
 	public void testMaxConcurrentAttempts2() {
-		testMaxConcurrentAttemps(2);
+		testMaxConcurrentAttempts(2);
 	}
 
 	@Test
 	public void testMaxConcurrentAttempts5() {
-		testMaxConcurrentAttemps(5);
+		testMaxConcurrentAttempts(5);
 	}
 	
-	private void testMaxConcurrentAttemps(int maxConcurrentAttempts) {
+	private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
 		try {
 			final JobID jid = new JobID();
 


[2/2] flink git commit: [FLINK-5218] [state backends] Eagerly close pending FsCheckpointStateOutputStream on task cancellation

Posted by se...@apache.org.
[FLINK-5218] [state backends] Eagerly close pending FsCheckpointStateOutputStream on task cancellation

This fix contains two modifications:
  1. State backends implement 'Closeable' and register themselves at the 'canceleables'
  2. The FsStateBackend tracks all it unclosed FsCheckpointOutputStreams and closes them on 'close()'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59f61bf6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59f61bf6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59f61bf6

Branch: refs/heads/release-1.1
Commit: 59f61bf6cb8351cec9369e2de39c6eeffbda10ea
Parents: e475eb2
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 30 22:38:23 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Dec 1 15:09:43 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    |  86 ++++--
 .../runtime/state/AbstractStateBackend.java     |  58 +++-
 .../state/filesystem/FsStateBackend.java        | 159 ++++++++--
 .../state/memory/MemoryStateBackend.java        |   2 +-
 .../FsCheckpointStateOutputStreamTest.java      | 132 ++++++++-
 .../state/FsStateBackendClosingTest.java        |  65 +++++
 .../streaming/runtime/tasks/StreamTask.java     |   7 +
 .../runtime/tasks/BlockingCheckpointsTest.java  | 290 +++++++++++++++++++
 .../streaming/runtime/StateBackendITCase.java   |   2 +-
 9 files changed, 731 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index deba9f9..0412a4a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -312,50 +312,74 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	}
 
 	@Override
-	public void dispose() {
-		super.dispose();
-		nonPartitionedStateBackend.dispose();
+	public void dispose() throws Exception {
+		Throwable exception = null;
 
-		// we have to lock because we might have an asynchronous checkpoint going on
-		synchronized (dbCleanupLock) {
-			if (db != null) {
-				if (this.dbOptions != null) {
-					this.dbOptions.dispose();
-					this.dbOptions = null;
-				}
-
-				for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) {
-					column.f0.dispose();
-				}
+		// make sure the individual states are disposed
+		try {
+			super.dispose();
+		}
+		catch (Throwable t) {
+			exception = t;
+		}
 
-				db.dispose();
-				db = null;
+		try {
+			nonPartitionedStateBackend.dispose();
+		}
+		catch (Throwable t) {
+			if (exception == null) {
+				exception = t;
+			} else {
+				exception.addSuppressed(t);
 			}
 		}
-	}
-
-	@Override
-	public void close() throws Exception {
-		nonPartitionedStateBackend.close();
 
 		// we have to lock because we might have an asynchronous checkpoint going on
-		synchronized (dbCleanupLock) {
-			if (db != null) {
-				if (this.dbOptions != null) {
-					this.dbOptions.dispose();
-					this.dbOptions = null;
-				}
+		// this must also happen in any case, regardless of earlier exceptions
+		try {
+			synchronized (dbCleanupLock) {
+				if (db != null) {
+					if (this.dbOptions != null) {
+						this.dbOptions.dispose();
+						this.dbOptions = null;
+					}
+
+					for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) {
+						column.f0.dispose();
+					}
 
-				for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) {
-					column.f0.dispose();
+					db.dispose();
+					db = null;
 				}
+			}
+		}
+		catch (Throwable t) {
+			if (exception == null) {
+				exception = t;
+			} else {
+				exception.addSuppressed(t);
+			}
+		}
 
-				db.dispose();
-				db = null;
+		if (exception != null) {
+			if (exception instanceof Exception) {
+				throw (Exception) exception;
+			} else if (exception instanceof Error) {
+				throw (Error) exception;
+			} else {
+				throw new Exception(exception.getMessage(), exception);
 			}
 		}
 	}
 
+	@Override
+	public void close() throws IOException {
+		// we only close all I/O streams here and do not yet dispose of the native resources
+		// otherwise this can lead to SEGFAULT problems
+		// native resources must only be released in the 'dispose()' method.
+		nonPartitionedStateBackend.close();
+	}
+
 	private File getDbPath(String stateName) {
 		return new File(new File(new File(getNextStoragePath(), jobId.toString()), operatorIdentifier), stateName);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index b86688b..ab9854c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -39,6 +39,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.execution.Environment;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
@@ -51,8 +52,8 @@ import java.util.Map;
 /**
  * A state backend defines how state is stored and snapshotted during checkpoints.
  */
-public abstract class AbstractStateBackend implements java.io.Serializable {
-	
+public abstract class AbstractStateBackend implements java.io.Serializable, Closeable {
+
 	private static final long serialVersionUID = 4620413814639220247L;
 
 	protected transient TypeSerializer<?> keySerializer;
@@ -102,23 +103,61 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
 	public abstract void disposeAllStateForCurrentJob() throws Exception;
 
 	/**
-	 * Closes the state backend, releasing all internal resources, but does not delete any persistent
-	 * checkpoint data.
+	 * Closes the state backend, dropping and aborting all I/O operations that are currently
+	 * pending.
 	 *
-	 * @throws Exception Exceptions can be forwarded and will be logged by the system
+	 * @throws IOException Exceptions can be forwarded and will be logged by the system
 	 */
-	public abstract void close() throws Exception;
+	public abstract void close() throws IOException;
+
+	/**
+	 * Releases all resources held by this state backend.
+	 * 
+	 * <p>This method must make sure that all resources are disposed, even if an exception happens
+	 * on the way.
+	 * 
+	 * @throws Exception This method should report exceptions that occur.
+	 */
+	public void dispose() throws Exception {
+		Throwable exception = null;
+
+		// make sure things are closed
+		try {
+			close();
+		}
+		catch (Throwable t) {
+			exception = t;
+		}
 
-	public void dispose() {
+		// now actually dispose things
 		lastName = null;
 		lastState = null;
 		if (keyValueStates != null) {
 			for (KvState<?, ?, ?, ?, ?> state : keyValueStates) {
-				state.dispose();
+				try {
+					state.dispose();
+				}
+				catch (Throwable t) {
+					if (exception == null) {
+						exception = t;
+					} else {
+						exception.addSuppressed(t);
+					}
+				}
 			}
 		}
 		keyValueStates = null;
 		keyValueStatesByName = null;
+
+		if (exception != null) {
+			if (exception instanceof Exception) {
+				throw (Exception) exception;
+			} else if (exception instanceof Error) {
+				throw (Error) exception;
+			} else {
+				throw new Exception(exception.getMessage(), exception);
+			}
+		}
 	}
 	
 	// ------------------------------------------------------------------------
@@ -444,6 +483,9 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
 		 * @throws IOException Thrown, if the stream cannot be closed.
 		 */
 		public StateHandle<DataInputView> closeAndGetHandle() throws IOException {
+			// we do not flush() here, because that forces the 'CheckpointStateOutputStream' to files,
+			// even when it could stay in a 'small chunk' memory handle.
+			// the 'DataOutputViewStreamWrapper' does not buffer data anyways
 			return new DataInputViewHandle(out.closeAndGetHandle());
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 8a8a26d..446f3ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -33,9 +33,10 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,9 +45,13 @@ import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.UUID;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The file state backend is a state backend that stores the state of streaming jobs in a file system.
  *
@@ -86,6 +91,10 @@ public class FsStateBackend extends AbstractStateBackend {
 	/** Cached handle to the file system for file operations */
 	private transient FileSystem filesystem;
 
+	/** Set of currently open streams */
+	private transient HashSet<FsCheckpointStateOutputStream> openStreams;
+
+	private transient volatile boolean closed;
 
 	/**
 	 * Creates a new state backend that stores its checkpoint data in the file system and location
@@ -236,9 +245,11 @@ public class FsStateBackend extends AbstractStateBackend {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void initializeForJob(Environment env,
-		String operatorIdentifier,
-		TypeSerializer<?> keySerializer) throws Exception {
+	public void initializeForJob(
+			Environment env,
+			String operatorIdentifier,
+			TypeSerializer<?> keySerializer) throws Exception {
+
 		super.initializeForJob(env, operatorIdentifier, keySerializer);
 
 		Path dir = new Path(basePath, env.getJobID().toString());
@@ -249,6 +260,7 @@ public class FsStateBackend extends AbstractStateBackend {
 		filesystem.mkdirs(dir);
 
 		checkpointDirectory = dir;
+		openStreams = new HashSet<>();
 	}
 
 	@Override
@@ -267,7 +279,42 @@ public class FsStateBackend extends AbstractStateBackend {
 	}
 
 	@Override
-	public void close() throws Exception {}
+	public void close() throws IOException {
+		closed = true;
+
+		// cache a copy on the heap for safety 
+		final HashSet<FsCheckpointStateOutputStream> openStreams = this.openStreams;
+		if (openStreams != null) {
+
+			// we need to draw a copy of the set, since the closing concurrently modifies the set
+			final ArrayList<FsCheckpointStateOutputStream> streams;
+
+			//noinspection SynchronizationOnLocalVariableOrMethodParameter
+			synchronized (openStreams) {
+				streams = new ArrayList<>(openStreams);
+				openStreams.clear();
+			}
+
+			// close all the streams, collect exceptions and record all but the first as suppressed
+			Throwable exception = null;
+			for (FsCheckpointStateOutputStream stream : streams) {
+				try {
+					stream.close();
+				}
+				catch (Throwable t) {
+					if (exception == null) {
+						exception = t;
+					} else {
+						exception.addSuppressed(t);
+					}
+				}
+			}
+
+			if (exception != null) {
+				ExceptionUtils.rethrowIOException(exception);
+			}
+		}
+	}
 
 	// ------------------------------------------------------------------------
 	//  state backend operations
@@ -299,15 +346,22 @@ public class FsStateBackend extends AbstractStateBackend {
 			S state, long checkpointID, long timestamp) throws Exception
 	{
 		checkFileSystemInitialized();
-		
+
 		Path checkpointDir = createCheckpointDirPath(checkpointID);
 		int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);
 
-		FsCheckpointStateOutputStream stream = 
-			new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold);
-		
-		try (ObjectOutputStream os = new ObjectOutputStream(stream)) {
+		try (FsCheckpointStateOutputStream stream = new FsCheckpointStateOutputStream(
+				checkpointDir, filesystem, openStreams, bufferSize, fileStateThreshold))
+		{
+			// perform the closing double-check AFTER! the creation of the stream
+			if (closed) {
+				throw new IOException("closed");
+			}
+
+			ObjectOutputStream os = new ObjectOutputStream(stream);
 			os.writeObject(state);
+			os.flush();
+
 			return stream.closeAndGetHandle().toSerializableHandle();
 		}
 	}
@@ -318,7 +372,16 @@ public class FsStateBackend extends AbstractStateBackend {
 
 		Path checkpointDir = createCheckpointDirPath(checkpointID);
 		int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);
-		return new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold);
+		FsCheckpointStateOutputStream stream = new FsCheckpointStateOutputStream(
+				checkpointDir, filesystem, openStreams, bufferSize, fileStateThreshold);
+
+		// perform the closing double-check AFTER! the creation of the stream
+		if (closed) {
+			stream.close();
+			throw new IOException("closed");
+		}
+
+		return stream;
 	}
 
 	// ------------------------------------------------------------------------
@@ -426,29 +489,40 @@ public class FsStateBackend extends AbstractStateBackend {
 		private int pos;
 
 		private FSDataOutputStream outStream;
-		
+
 		private final int localStateThreshold;
 
 		private final Path basePath;
 
 		private final FileSystem fs;
-		
+
+		private final HashSet<FsCheckpointStateOutputStream> openStreams;
+
 		private Path statePath;
-		
-		private boolean closed;
+
+		private volatile boolean closed;
 
 		public FsCheckpointStateOutputStream(
-					Path basePath, FileSystem fs,
-					int bufferSize, int localStateThreshold)
+					Path basePath,
+					FileSystem fs,
+					HashSet<FsCheckpointStateOutputStream> openStreams,
+					int bufferSize,
+					int localStateThreshold)
 		{
 			if (bufferSize < localStateThreshold) {
 				throw new IllegalArgumentException();
 			}
-			
+
 			this.basePath = basePath;
 			this.fs = fs;
+			this.openStreams = checkNotNull(openStreams);
 			this.writeBuffer = new byte[bufferSize];
 			this.localStateThreshold = localStateThreshold;
+
+			//noinspection SynchronizationOnLocalVariableOrMethodParameter
+			synchronized (openStreams) {
+				openStreams.add(this);
+			}
 		}
 
 
@@ -519,6 +593,9 @@ public class FsStateBackend extends AbstractStateBackend {
 					pos = 0;
 				}
 			}
+			else {
+				throw new IOException("stream is closed");
+			}
 		}
 
 		/**
@@ -530,6 +607,16 @@ public class FsStateBackend extends AbstractStateBackend {
 		public void close() {
 			if (!closed) {
 				closed = true;
+
+				// make sure that the write() methods cannot succeed any more
+				pos = writeBuffer.length;
+
+				// remove the stream from the open streams set
+				synchronized (openStreams) {
+					openStreams.remove(this);
+				}
+
+				// close all resources
 				if (outStream != null) {
 					try {
 						outStream.close();
@@ -551,15 +638,24 @@ public class FsStateBackend extends AbstractStateBackend {
 		public StreamStateHandle closeAndGetHandle() throws IOException {
 			synchronized (this) {
 				if (!closed) {
+
+					// remove the stream from the open streams set
+					synchronized (openStreams) {
+						openStreams.remove(this);
+					}
+
+					// close all resources
 					if (outStream == null && pos <= localStateThreshold) {
 						closed = true;
 						byte[] bytes = Arrays.copyOf(writeBuffer, pos);
+						pos = writeBuffer.length;
 						return new ByteStreamStateHandle(bytes);
 					}
 					else {
 						flush();
 						outStream.close();
 						closed = true;
+						pos = writeBuffer.length;
 						return new FileStreamStateHandle(statePath);
 					}
 				}
@@ -577,9 +673,17 @@ public class FsStateBackend extends AbstractStateBackend {
 		public Path closeAndGetPath() throws IOException {
 			synchronized (this) {
 				if (!closed) {
-					closed = true;
+
+					// remove the stream from the open streams set
+					synchronized (openStreams) {
+						openStreams.remove(this);
+					}
+
+					// close all resources
 					flush();
 					outStream.close();
+					closed = true;
+					pos = writeBuffer.length;
 					return statePath;
 				}
 				else {
@@ -587,5 +691,22 @@ public class FsStateBackend extends AbstractStateBackend {
 				}
 			}
 		}
+
+		public boolean isClosed() {
+			return closed;
+		}
+
+		// we need referential identity on the streams for the closing set to work
+		// properly, so we implement that via final methods here
+
+		@Override
+		public final int hashCode() {
+			return super.hashCode();
+		}
+
+		@Override
+		public final boolean equals(Object obj) {
+			return this == obj;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 7b9d21b..b155244 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -78,7 +78,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
 	}
 
 	@Override
-	public void close() throws Exception {}
+	public void close() {}
 
 	// ------------------------------------------------------------------------
 	//  State backend operations

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
index 5964b72..3aba9e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
@@ -22,34 +22,36 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend.FsCheckpointStateOutputStream;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 
 import org.junit.Test;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
+import java.util.HashSet;
 import java.util.Random;
 
 import static org.junit.Assert.*;
 
 public class FsCheckpointStateOutputStreamTest {
-	
+
 	/** The temp dir, obtained in a platform neutral way */
 	private static final Path TEMP_DIR_PATH = new Path(new File(System.getProperty("java.io.tmpdir")).toURI());
-	
-	
+
+
 	@Test(expected = IllegalArgumentException.class)
 	public void testWrongParameters() {
 		// this should fail
 		new FsStateBackend.FsCheckpointStateOutputStream(
-			TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 4000, 5000);
+			TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), new HashSet<FsCheckpointStateOutputStream>(), 4000, 5000);
 	}
 
-
 	@Test
 	public void testEmptyState() throws Exception {
 		AbstractStateBackend.CheckpointStateOutputStream stream = new FsStateBackend.FsCheckpointStateOutputStream(
-			TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 1024, 512);
+			TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), new HashSet<FsCheckpointStateOutputStream>(), 1024, 512);
 		
 		StreamStateHandle handle = stream.closeAndGetHandle();
 		assertTrue(handle instanceof ByteStreamStateHandle);
@@ -57,7 +59,101 @@ public class FsCheckpointStateOutputStreamTest {
 		InputStream inStream = handle.getState(ClassLoader.getSystemClassLoader());
 		assertEquals(-1, inStream.read());
 	}
-	
+
+	@Test
+	public void testCloseAndGetPath() throws Exception {
+		FsCheckpointStateOutputStream stream = new FsCheckpointStateOutputStream(
+				TEMP_DIR_PATH,
+				FileSystem.getLocalFileSystem(),
+				new HashSet<FsCheckpointStateOutputStream>(),
+				1024,
+				512);
+
+		stream.write(1);
+
+		Path path = stream.closeAndGetPath();
+		assertNotNull(path);
+
+		// cleanup
+		FileSystem.getLocalFileSystem().delete(path, false);
+	}
+
+	@Test
+	public void testWriteFailsFastWhenClosed() throws Exception {
+		final HashSet<FsCheckpointStateOutputStream> openStreams = new HashSet<>();
+
+		FsCheckpointStateOutputStream stream1 = new FsCheckpointStateOutputStream(
+				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+		FsCheckpointStateOutputStream stream2 = new FsCheckpointStateOutputStream(
+				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+		FsCheckpointStateOutputStream stream3 = new FsCheckpointStateOutputStream(
+				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+		assertFalse(stream1.isClosed());
+		assertFalse(stream2.isClosed());
+		assertFalse(stream3.isClosed());
+
+		// simple close
+		stream1.close();
+
+		// close with handle
+		StreamStateHandle handle = stream2.closeAndGetHandle();
+
+		// close with path
+		Path path = stream3.closeAndGetPath();
+
+		assertTrue(stream1.isClosed());
+		assertTrue(stream2.isClosed());
+		assertTrue(stream3.isClosed());
+
+		validateStreamNotWritable(stream1);
+		validateStreamNotWritable(stream2);
+		validateStreamNotWritable(stream3);
+
+		// clean up
+		handle.discardState();
+		FileSystem.getLocalFileSystem().delete(path, false);
+	}
+
+	@Test
+	public void testAddAndRemoveFromOpenStreamsSet() throws Exception {
+		final HashSet<FsCheckpointStateOutputStream> openStreams = new HashSet<>();
+
+		FsCheckpointStateOutputStream stream1 = new FsCheckpointStateOutputStream(
+				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+		FsCheckpointStateOutputStream stream2 = new FsCheckpointStateOutputStream(
+				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+		FsCheckpointStateOutputStream stream3 = new FsCheckpointStateOutputStream(
+				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+		assertTrue(openStreams.contains(stream1));
+		assertTrue(openStreams.contains(stream2));
+		assertTrue(openStreams.contains(stream3));
+		assertEquals(3, openStreams.size());
+
+		// simple close
+		stream1.close();
+
+		// close with handle
+		StreamStateHandle handle = stream2.closeAndGetHandle();
+
+		// close with path
+		Path path = stream3.closeAndGetPath();
+
+		assertFalse(openStreams.contains(stream1));
+		assertFalse(openStreams.contains(stream2));
+		assertFalse(openStreams.contains(stream3));
+		assertEquals(0, openStreams.size());
+
+		// clean up
+		handle.discardState();
+		FileSystem.getLocalFileSystem().delete(path, false);
+	}
+
 	@Test
 	public void testStateBlowMemThreshold() throws Exception {
 		runTest(222, 999, 512, false);
@@ -72,16 +168,17 @@ public class FsCheckpointStateOutputStreamTest {
 	public void testStateAboveMemThreshold() throws Exception {
 		runTest(576446, 259, 17, true);
 	}
-	
+
 	@Test
 	public void testZeroThreshold() throws Exception {
 		runTest(16678, 4096, 0, true);
 	}
-	
+
 	private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile) throws Exception {
 		AbstractStateBackend.CheckpointStateOutputStream stream =
 			new FsStateBackend.FsCheckpointStateOutputStream(
-				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), bufferSize, threshold);
+					TEMP_DIR_PATH, FileSystem.getLocalFileSystem(),
+					new HashSet<FsCheckpointStateOutputStream>(), bufferSize, threshold);
 		
 		Random rnd = new Random();
 		byte[] original = new byte[numBytes];
@@ -125,4 +222,19 @@ public class FsCheckpointStateOutputStreamTest {
 		
 		handle.discardState();
 	}
+
+	private void validateStreamNotWritable(FsCheckpointStateOutputStream stream) {
+		try {
+			stream.write(1);
+			fail();
+		} catch (IOException e) {
+			// expected
+		}
+		try {
+			stream.write(new byte[4], 1, 2);
+			fail();
+		} catch (IOException e) {
+			// expected
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsStateBackendClosingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsStateBackendClosingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsStateBackendClosingTest.java
new file mode 100644
index 0000000..6df488d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsStateBackendClosingTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend.FsCheckpointStateOutputStream;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class FsStateBackendClosingTest {
+
+	@Test
+	public void testStateBackendClosesStreams() throws Exception {
+		final URI tempFolder = new File(EnvironmentInformation.getTemporaryFileDirectory()).toURI();
+		final FsStateBackend backend = new FsStateBackend(tempFolder);
+
+		backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
+
+		FsCheckpointStateOutputStream stream = backend.createCheckpointStateOutputStream(17L, 12345L);
+
+		// stream is open, this should succeed
+		assertFalse(stream.isClosed());
+		stream.write(1);
+
+		// close the backend - that should close the stream
+		backend.close();
+
+		assertTrue(stream.isClosed());
+
+		try {
+			stream.write(2);
+			fail("stream is closed, 'write(int)' should fail with an exception");
+		}
+		catch (IOException e) {
+			// expected
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index aaaead0..99df060 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -772,7 +772,14 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 					}
 			}
 		}
+
 		stateBackend.initializeForJob(getEnvironment(), operatorIdentifier, keySerializer);
+
+		// make sure the state backend is closed eagerly in case of cancellation
+		synchronized (cancelables) {
+			cancelables.add(stateBackend);
+		}
+
 		return stateBackend;
 
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
new file mode 100644
index 0000000..ed993c7
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamFilter;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+/**
+ * This test checks that task checkpoints that block and do not react to thread interrupts
+ * are
+ */
+public class BlockingCheckpointsTest {
+
+	private static final OneShotLatch IN_CHECKPOINT_LATCH = new OneShotLatch();
+
+	@Test
+	public void testBlockingNonInterruptibleCheckpoint() throws Exception {
+
+		Configuration taskConfig = new Configuration();
+		StreamConfig cfg = new StreamConfig(taskConfig);
+		cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+		cfg.setStreamOperator(new TestOperator());
+		cfg.setStateBackend(new LockingStreamStateBackend());
+
+		Task task = createTask(taskConfig);
+
+		// start the task and wait until it is in "restore"
+		task.startTaskThread();
+		IN_CHECKPOINT_LATCH.await();
+
+		// cancel the task and wait. unless cancellation properly closes
+		// the streams, this will never terminate
+		task.cancelExecution();
+		task.getExecutingThread().join();
+
+		assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+		assertNull(task.getFailureCause());
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private static Task createTask(Configuration taskConfig) throws IOException {
+
+		JobInformation jobInformation = new JobInformation(
+			new JobID(),
+			"test job name",
+			new SerializedValue<>(new ExecutionConfig()),
+			new Configuration(),
+			Collections.<BlobKey>emptyList(),
+			Collections.<URL>emptyList());
+
+		TaskInformation taskInformation = new TaskInformation(
+			new JobVertexID(),
+			"test task name",
+			1,
+			TestStreamTask.class.getName(),
+			taskConfig);
+
+		return new Task(
+			jobInformation,
+			taskInformation,
+			new ExecutionAttemptID(),
+			0,
+			0,
+			Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+			Collections.<InputGateDeploymentDescriptor>emptyList(),
+			0,
+			null,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			mock(NetworkEnvironment.class),
+			mock(BroadcastVariableManager.class),
+			mock(ActorGateway.class),
+			mock(ActorGateway.class),
+			new FiniteDuration(10, TimeUnit.SECONDS),
+			new FallbackLibraryCacheManager(),
+			new FileCache(new Configuration()),
+			new TaskManagerRuntimeInfo(
+					"localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
+			new UnregisteredTaskMetricsGroup());
+		
+	}
+
+	// ------------------------------------------------------------------------
+	//  state backend with locking output stream
+	// ------------------------------------------------------------------------
+
+	private static class LockingStreamStateBackend extends AbstractStateBackend {
+
+		private static final long serialVersionUID = 1L;
+
+		private final LockingOutputStream out = new LockingOutputStream();
+
+		@Override
+		public void disposeAllStateForCurrentJob() {}
+
+		@Override
+		public void close() throws IOException {
+			out.close();
+		}
+
+		@Override
+		public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
+			return out;
+		}
+
+		@Override
+		protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public <S extends Serializable> StateHandle<S> checkpointStateSerializable(S state, long checkpointID, long timestamp) throws Exception {
+			throw new UnsupportedOperationException();
+		}
+	}
+
+	private static final class LockingOutputStream extends CheckpointStateOutputStream implements Serializable {
+		private static final long serialVersionUID = 1L;
+		
+		private final SerializableObject lock = new SerializableObject();
+		private volatile boolean closed;
+
+		@Override
+		public StreamStateHandle closeAndGetHandle() throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void write(int b) throws IOException {
+			// this needs to not react to interrupts until the handle is closed
+			synchronized (lock) {
+				while (!closed) {
+					try {
+						lock.wait();
+					}
+					catch (InterruptedException ignored) {}
+				}
+			}
+		}
+
+		@Override
+		public void close() throws IOException {
+			synchronized (lock) {
+				closed = true;
+				lock.notifyAll();
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test source operator that calls into the locking checkpoint output stream
+	// ------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static final class TestOperator extends StreamFilter<Object> {
+		private static final long serialVersionUID = 1L;
+
+		public TestOperator() {
+			super(new FilterFunction<Object>() {
+				@Override
+				public boolean filter(Object value) {
+					return false;
+				}
+			});
+		}
+
+		@Override
+		public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+			AbstractStateBackend stateBackend = getStateBackend();
+			CheckpointStateOutputStream outStream = stateBackend.createCheckpointStateOutputStream(checkpointId, timestamp);
+
+			IN_CHECKPOINT_LATCH.trigger();
+
+			// this should lock
+			outStream.write(1);
+
+			// this should be unreachable
+			return null;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  stream task that simply triggers a checkpoint
+	// ------------------------------------------------------------------------
+
+	public static final class TestStreamTask extends OneInputStreamTask<Object, Object> {
+
+		@Override
+		public void init() {}
+
+		@Override
+		protected void run() throws Exception {
+			triggerCheckpointOnBarrier(11L, System.currentTimeMillis());
+		}
+
+		@Override
+		protected void cleanup() {}
+
+		@Override
+		protected void cancelTask() {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index 6288946..4eb8b4a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -104,7 +104,7 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
 		public void disposeAllStateForCurrentJob() throws Exception {}
 
 		@Override
-		public void close() throws Exception {}
+		public void close() {}
 
 		@Override
 		protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception {