You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/01 20:22:41 UTC

[3/3] flink git commit: [FLINK-5214] Clean up checkpoint data in case of a failing checkpoint operation

[FLINK-5214] Clean up checkpoint data in case of a failing checkpoint operation

Adds exception handling to the stream operators for the snapshotState method. In case of an
exception while performing the snapshot operation, all until then checkpointed data will
be discarded/deleted. This makes sure that a failing checkpoint operation won't leave
orphaned checkpoint data (e.g. files) behind.

Add test case for FsCheckpointStateOutputStream

Add RocksDB FullyAsyncSnapshot cleanup test

Add proper state cleanup tests for window operator

Add state cleanup test for failing snapshot call of AbstractUdfStreamOperator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9c058871
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9c058871
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9c058871

Branch: refs/heads/release-1.1
Commit: 9c058871f778f748059829b1b350687e3f789f6f
Parents: 4b734d7
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Dec 1 13:25:05 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 1 18:04:43 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    | 102 ++++++----
 .../state/RocksDBAsyncKVSnapshotTest.java       |  67 ++++++-
 .../operator/AbstractCEPPatternOperator.java    |  67 +++++--
 .../AbstractKeyedCEPPatternOperator.java        |  53 +++++-
 .../runtime/state/AbstractStateBackend.java     |  25 ++-
 .../state/filesystem/FsStateBackend.java        |  79 ++++++--
 .../apache/flink/runtime/taskmanager/Task.java  |   4 +-
 .../FsCheckpointStateOutputStreamTest.java      | 146 ++++++++++++--
 .../source/ContinuousFileReaderOperator.java    |  80 ++++++--
 .../api/operators/AbstractStreamOperator.java   |  29 ++-
 .../operators/AbstractUdfStreamOperator.java    |  19 +-
 .../operators/GenericWriteAheadSink.java        |  37 +++-
 ...ractAlignedProcessingTimeWindowOperator.java |  59 +++++-
 .../operators/windowing/WindowOperator.java     |  50 ++++-
 .../streaming/runtime/tasks/StreamTask.java     |  29 ++-
 .../runtime/tasks/StreamTaskStateList.java      |   2 +-
 .../streaming/api/operators/StreamMapTest.java  |  64 ++++++-
 .../operators/windowing/WindowOperatorTest.java | 190 ++++++++++++++++++-
 18 files changed, 960 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 0412a4a..1561afc 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -750,59 +750,72 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 
 		@Override
 		public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {
-			try {
-				long startTime = System.currentTimeMillis();
+			long startTime = System.currentTimeMillis();
+			CheckpointStateOutputView outputView;
 
-				CheckpointStateOutputView outputView = backend.createCheckpointStateOutputView(checkpointId, startTime);
+			try {
+				try {
+					outputView = backend.createCheckpointStateOutputView(checkpointId, startTime);
+				} catch (Exception e) {
+					throw new Exception("Could not create a checkpoint state output view to " +
+						"materialize the checkpoint data into.", e);
+				}
 
-				outputView.writeInt(columnFamilies.size());
+				try {
+					outputView.writeInt(columnFamilies.size());
 
-				// we don't know how many key/value pairs there are in each column family.
-				// We prefix every written element with a byte that signifies to which
-				// column family it belongs, this way we can restore the column families
-				byte count = 0;
-				Map<String, Byte> columnFamilyMapping = new HashMap<>();
-				for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) {
-					columnFamilyMapping.put(column.getKey(), count);
+					// we don't know how many key/value pairs there are in each column family.
+					// We prefix every written element with a byte that signifies to which
+					// column family it belongs, this way we can restore the column families
+					byte count = 0;
+					Map<String, Byte> columnFamilyMapping = new HashMap<>();
+					for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : columnFamilies.entrySet()) {
+						columnFamilyMapping.put(column.getKey(), count);
 
-					outputView.writeByte(count);
+						outputView.writeByte(count);
 
-					ObjectOutputStream ooOut = new ObjectOutputStream(outputView);
-					ooOut.writeObject(column.getValue().f1);
-					ooOut.flush();
+						ObjectOutputStream ooOut = new ObjectOutputStream(outputView);
+						ooOut.writeObject(column.getValue().f1);
+						ooOut.flush();
 
-					count++;
-				}
+						count++;
+					}
 
-				ReadOptions readOptions = new ReadOptions();
-				readOptions.setSnapshot(snapshot);
+					ReadOptions readOptions = new ReadOptions();
+					readOptions.setSnapshot(snapshot);
 
-				for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) {
-					byte columnByte = columnFamilyMapping.get(column.getKey());
+					for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : columnFamilies.entrySet()) {
+						byte columnByte = columnFamilyMapping.get(column.getKey());
 
-					synchronized (dbCleanupLock) {
-						if (db == null) {
-							throw new RuntimeException("RocksDB instance was disposed. This happens " +
+						synchronized (dbCleanupLock) {
+							if (db == null) {
+								throw new RuntimeException("RocksDB instance was disposed. This happens " +
 									"when we are in the middle of a checkpoint and the job fails.");
-						}
-						RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions);
-						iterator.seekToFirst();
-						while (iterator.isValid()) {
-							outputView.writeByte(columnByte);
-							BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(),
+							}
+							RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions);
+							iterator.seekToFirst();
+							while (iterator.isValid()) {
+								outputView.writeByte(columnByte);
+								BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(),
 									outputView);
-							BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(),
+								BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(),
 									outputView);
-							iterator.next();
+								iterator.next();
+							}
 						}
 					}
-				}
-
-				StateHandle<DataInputView> stateHandle = outputView.closeAndGetHandle();
+				} catch (Exception e) {
+					try {
+						// closing the output view deletes the underlying data
+						outputView.close();
+					} catch (Exception closingException) {
+						LOG.warn("Could not close the checkpoint state output view. The " +
+							"written data might not be deleted.", closingException);
+					}
 
-				long endTime = System.currentTimeMillis();
-				LOG.info("Fully asynchronous RocksDB materialization to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms.");
-				return new FinalFullyAsyncSnapshot(stateHandle, checkpointId);
+					throw new Exception("Could not write the checkpoint data into the checkpoint " +
+						"state output view.", e);
+				}
 			} finally {
 				synchronized (dbCleanupLock) {
 					if (db != null) {
@@ -811,6 +824,19 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 				}
 				snapshot = null;
 			}
+
+			StateHandle<DataInputView> stateHandle;
+
+			try {
+				stateHandle = outputView.closeAndGetHandle();
+			} catch (Exception ioE) {
+				throw new Exception("Could not close the checkpoint state output view and " +
+					"obtain the state handle.", ioE);
+			}
+
+			long endTime = System.currentTimeMillis();
+			LOG.info("Fully asynchronous RocksDB materialization to {} (asynchronous part) took {} ms.", backupUri, (endTime - startTime));
+			return new FinalFullyAsyncSnapshot(stateHandle, checkpointId);
 		}
 
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
index a58686b..24728d7 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
@@ -27,8 +27,13 @@ import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
+import org.apache.flink.runtime.state.KvStateSnapshot;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -48,7 +53,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.junit.Assume;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -56,12 +63,22 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.URI;
+import java.util.Map;
 import java.util.UUID;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
 /**
  * Tests for asynchronous RocksDB Key/Value state checkpoints.
@@ -72,13 +89,16 @@ import static org.junit.Assert.assertTrue;
 @SuppressWarnings("serial")
 public class RocksDBAsyncKVSnapshotTest {
 
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
 	@Before
 	public void checkOperatingSystem() {
 		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
 	}
 
 	/**
-	 * This ensures that asynchronous state handles are actually materialized asynchonously.
+	 * This ensures that asynchronous state handles are actually materialized asynchronously.
 	 *
 	 * <p>We use latches to block at various stages and see if the code still continues through
 	 * the parts that are not asynchronous. If the checkpoint is not done asynchronously the
@@ -180,7 +200,7 @@ public class RocksDBAsyncKVSnapshotTest {
 	}
 
 	/**
-	 * This ensures that asynchronous state handles are actually materialized asynchonously.
+	 * This ensures that asynchronous state handles are actually materialized asynchronously.
 	 *
 	 * <p>We use latches to block at various stages and see if the code still continues through
 	 * the parts that are not asynchronous. If the checkpoint is not done asynchronously the
@@ -282,6 +302,49 @@ public class RocksDBAsyncKVSnapshotTest {
 		testHarness.waitForTaskCompletion();
 	}
 
+	@Test
+	public void testCleanupOfFullyAsyncSnapshotsInFailureCase() throws Exception {
+		long checkpointId = 1L;
+		long timestamp = 42L;
+
+		File chkDir = temporaryFolder.newFolder();
+		File dbDir = temporaryFolder.newFolder();
+
+		Environment env = new DummyEnvironment("test task", 1, 0);
+		AbstractStateBackend.CheckpointStateOutputStream outputStream = mock(AbstractStateBackend.CheckpointStateOutputStream.class);
+		doThrow(new IOException("Test exception")).when(outputStream).write(anyInt());
+
+		RocksDBStateBackend backend = spy(new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend()));
+		doReturn(outputStream).when(backend).createCheckpointStateOutputStream(anyLong(), anyLong());
+		backend.setDbStoragePath(dbDir.getAbsolutePath());
+		backend.enableFullyAsyncSnapshots();
+
+		backend.initializeForJob(
+			env,
+			"test operator",
+			VoidSerializer.INSTANCE
+		);
+		backend.getPartitionedState(null, VoidSerializer.INSTANCE, new ValueStateDescriptor<>("foobar", Object.class, new Object()));
+
+		Map<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStateSnapshotHashMap = backend.snapshotPartitionedState(checkpointId, timestamp);
+
+		for (KvStateSnapshot<?, ?, ?, ?, ?> kvStateSnapshot : kvStateSnapshotHashMap.values()) {
+			if (kvStateSnapshot instanceof AsynchronousKvStateSnapshot) {
+				AsynchronousKvStateSnapshot<?, ?, ?, ?, ?> asynchronousKvStateSnapshot = (AsynchronousKvStateSnapshot<?, ?, ?, ?, ?>) kvStateSnapshot;
+				try {
+					asynchronousKvStateSnapshot.materialize();
+					fail("Expected an Exception here.");
+				} catch (Exception expected) {
+					//expected exception
+				}
+			} else {
+				fail("Expected an asynchronous kv state snapshot.");
+			}
+		}
+
+		verify(outputStream).close();
+	}
+
 
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
index b523f46..4b5a703 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
@@ -111,22 +111,67 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBas
 	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
 		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
 
-		final AbstractStateBackend.CheckpointStateOutputStream os = this.getStateBackend().createCheckpointStateOutputStream(
-			checkpointId,
-			timestamp);
+		final AbstractStateBackend.CheckpointStateOutputStream os;
+
+		try {
+			os = this.getStateBackend().createCheckpointStateOutputStream(
+				checkpointId,
+				timestamp);
+		} catch (Exception e) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state for {}.", getOperatorName(), discardException);
+			}
+
+			throw new Exception("Could not create checkpoint state output stream for " +
+				getOperatorName() + '.', e);
+		}
 
-		final ObjectOutputStream oos = new ObjectOutputStream(os);
-		final AbstractStateBackend.CheckpointStateOutputView ov = new AbstractStateBackend.CheckpointStateOutputView(os);
+		try {
+			final ObjectOutputStream oos = new ObjectOutputStream(os);
+			final AbstractStateBackend.CheckpointStateOutputView ov = new AbstractStateBackend.CheckpointStateOutputView(os);
 
-		oos.writeObject(nfa);
+			oos.writeObject(nfa);
+			oos.writeInt(priorityQueue.size());
+			oos.flush();
 
-		ov.writeInt(priorityQueue.size());
+			for (StreamRecord<IN> streamRecord : priorityQueue) {
+				streamRecordSerializer.serialize(streamRecord, ov);
+			}
+
+			ov.flush();
+		} catch (Exception e) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state for {}.", getOperatorName(), discardException);
+			}
 
-		for (StreamRecord<IN> streamRecord: priorityQueue) {
-			streamRecordSerializer.serialize(streamRecord, ov);
+			try {
+				// closing the output stream should delete the written data
+				os.close();
+			} catch (Exception closeException) {
+				LOG.warn("Could not close the checkpoint state output stream of {}. The written " +
+					"data might not be deleted.", getOperatorName(), closeException);
+			}
+
+			throw new Exception("Could not write state for " + getOperatorName() +
+				" to checkpoint state output stream.", e);
 		}
 
-		taskState.setOperatorState(os.closeAndGetHandle());
+		try {
+			taskState.setOperatorState(os.closeAndGetHandle());
+		} catch (Exception e) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state for {}.", getOperatorName(), discardException);
+			}
+
+			throw new Exception("Could not close and get state handle from checkpoint state " +
+				"output stream of " + getOperatorName() + '.', e);
+		}
 
 		return taskState;
 	}
@@ -144,7 +189,7 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBas
 
 		nfa = (NFA<IN>)ois.readObject();
 
-		int numberPriorityQueueEntries = div.readInt();
+		int numberPriorityQueueEntries = ois.readInt();
 
 		priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 03e40ac..cb456b8 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -191,15 +191,58 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
 	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
 		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
 
-		AbstractStateBackend.CheckpointStateOutputView ov = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+		final AbstractStateBackend.CheckpointStateOutputView ov;
+
+		try {
+			ov = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+		} catch (Exception e) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state of {}.", getOperatorName(), discardException);
+			}
 
-		ov.writeInt(keys.size());
+			throw new Exception("Could not create checkpoint state output view for " +
+				getOperatorName() + '.', e);
+		}
 
-		for (KEY key: keys) {
-			keySerializer.serialize(key, ov);
+		try {
+			ov.writeInt(keys.size());
+
+			for (KEY key : keys) {
+				keySerializer.serialize(key, ov);
+			}
+		} catch (Exception exception) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state of {}.", getOperatorName(), discardException);
+			}
+
+			try {
+				// closing the output view should delete any written data
+				ov.close();
+			} catch (IOException closingException) {
+				LOG.warn("Could not close the checkpoint state output view of {}. The written data " +
+					"might not be deleted.", getOperatorName(), closingException);
+			}
+
+			throw new Exception("Could not write state of " + getOperatorName() +
+				" to the checkpoint state output view.", exception);
 		}
 
-		taskState.setOperatorState(ov.closeAndGetHandle());
+		try {
+			taskState.setOperatorState(ov.closeAndGetHandle());
+		} catch (Exception e) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state of {}.", getOperatorName(), discardException);
+			}
+
+			throw new Exception("Could not close and get state handle from checkpoint state " +
+				"output view of " + getOperatorName() + '.', e);
+		}
 
 		return taskState;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index ab9854c..068c6b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -38,6 +38,8 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.execution.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -56,6 +58,8 @@ public abstract class AbstractStateBackend implements java.io.Serializable, Clos
 
 	private static final long serialVersionUID = 4620413814639220247L;
 
+	private static final Logger LOG = LoggerFactory.getLogger(AbstractStateBackend.class);
+
 	protected transient TypeSerializer<?> keySerializer;
 
 	protected transient ClassLoader userCodeClassLoader;
@@ -354,10 +358,25 @@ public abstract class AbstractStateBackend implements java.io.Serializable, Clos
 		if (keyValueStates != null) {
 			HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshots = new HashMap<>(keyValueStatesByName.size());
 
-			for (Map.Entry<String, KvState<?, ?, ?, ?, ?>> entry : keyValueStatesByName.entrySet()) {
-				KvStateSnapshot<?, ?, ?, ?, ?> snapshot = entry.getValue().snapshot(checkpointId, timestamp);
-				snapshots.put(entry.getKey(), snapshot);
+			try {
+				for (Map.Entry<String, KvState<?, ?, ?, ?, ?>> entry : keyValueStatesByName.entrySet()) {
+					KvStateSnapshot<?, ?, ?, ?, ?> snapshot = entry.getValue().snapshot(checkpointId, timestamp);
+					snapshots.put(entry.getKey(), snapshot);
+				}
+			} catch (Exception e) {
+				for (Map.Entry<String, KvStateSnapshot<?, ?, ?, ?, ?>> entry : snapshots.entrySet()) {
+					KvStateSnapshot<?, ?, ?, ?, ?> kvStateSnapshot = entry.getValue();
+
+					try {
+						kvStateSnapshot.discardState();
+					} catch (Exception discardException) {
+						LOG.warn("Could not discard partitioned state {}.", entry.getKey(), discardException);
+					}
+				}
+
+				throw new Exception("Could not create a partitioned state snapshot.", e);
 			}
+
 			return snapshots;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 446f3ea..e783264 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -363,6 +363,8 @@ public class FsStateBackend extends AbstractStateBackend {
 			os.flush();
 
 			return stream.closeAndGetHandle().toSerializableHandle();
+		} catch (IOException ioE) {
+			throw new IOException("Could not serialize state.", ioE);
 		}
 	}
 
@@ -602,6 +604,8 @@ public class FsStateBackend extends AbstractStateBackend {
 		 * If the stream is only closed, we remove the produced file (cleanup through the auto close
 		 * feature, for example). This method throws no exception if the deletion fails, but only
 		 * logs the error.
+		 *
+		 * Important: This method should never throw any {@link Throwable}.
 		 */
 		@Override
 		public void close() {
@@ -620,15 +624,19 @@ public class FsStateBackend extends AbstractStateBackend {
 				if (outStream != null) {
 					try {
 						outStream.close();
-						fs.delete(statePath, false);
-
-						// attempt to delete the parent (will fail and be ignored if the parent has more files)
+					} catch (Throwable e) {
+						LOG.warn("Cannot delete closed and discarded state stream for {}.", statePath, e);
+					} finally {
 						try {
-							fs.delete(basePath, false);
-						} catch (IOException ignored) {}
-					}
-					catch (Exception e) {
-						LOG.warn("Cannot delete closed and discarded state stream for " + statePath, e);
+							fs.delete(statePath, false);
+
+							// attempt to delete the parent (will fail and be ignored if the parent has more files)
+							try {
+								fs.delete(basePath, false);
+							} catch (Throwable ignored) {}
+						} catch (Throwable ioE) {
+							LOG.warn("Could not delete stream file for {}.", statePath, ioE);
+						}
 					}
 				}
 			}
@@ -650,12 +658,29 @@ public class FsStateBackend extends AbstractStateBackend {
 						byte[] bytes = Arrays.copyOf(writeBuffer, pos);
 						pos = writeBuffer.length;
 						return new ByteStreamStateHandle(bytes);
-					}
-					else {
-						flush();
-						outStream.close();
-						closed = true;
-						pos = writeBuffer.length;
+					} else {
+						try {
+							flush();
+							outStream.close();
+						} catch (Exception exception) {
+							LOG.warn("Could not close the file system output stream. Trying to delete the underlying file.");
+
+							try {
+								fs.delete(statePath, false);
+
+								try {
+									fs.delete(basePath, false);
+								} catch (Throwable ignored) {}
+							} catch (Throwable deleteException) {
+								LOG.warn("Could not delete close and discarded state stream for {}.", statePath, deleteException);
+							}
+
+							throw new IOException("Could not close the file system output stream.", exception);
+						} finally {
+							closed = true;
+							pos = writeBuffer.length;
+						}
+
 						return new FileStreamStateHandle(statePath);
 					}
 				}
@@ -680,10 +705,28 @@ public class FsStateBackend extends AbstractStateBackend {
 					}
 
 					// close all resources
-					flush();
-					outStream.close();
-					closed = true;
-					pos = writeBuffer.length;
+					try {
+						flush();
+						outStream.close();
+					} catch (Exception exception) {
+						LOG.warn("Could not close the file system output stream. Trying to delete the underlying file.");
+
+						try {
+							fs.delete(statePath, false);
+
+							try {
+								fs.delete(basePath, false);
+							} catch (Throwable ignored) {}
+						} catch (Throwable deleteException) {
+							LOG.warn("Could not delete close and discarded state stream for {}.", statePath, deleteException);
+						}
+
+						throw new IOException("Could not close the file system output stream.", exception);
+					} finally {
+						closed = true;
+						pos = writeBuffer.length;
+					}
+
 					return statePath;
 				}
 				else {

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 9565666..e4e1b36 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -1008,8 +1008,8 @@ public class Task implements Runnable {
 						catch (Throwable t) {
 							if (getExecutionState() == ExecutionState.RUNNING) {
 								failExternally(new Exception(
-									"Error while triggering checkpoint for " + taskName,
-									t));
+									"Error while triggering checkpoint " + checkpointID + " for " +
+										taskName, t));
 							}
 						}
 					}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
index 3aba9e1..30747ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
@@ -25,33 +26,50 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend.FsCheckpointStateOutputStream;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.ArgumentCaptor;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashSet;
 import java.util.Random;
 
 import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class FsCheckpointStateOutputStreamTest {
 
-	/** The temp dir, obtained in a platform neutral way */
-	private static final Path TEMP_DIR_PATH = new Path(new File(System.getProperty("java.io.tmpdir")).toURI());
+	/** The temp dir */
+	private Path tempDirPath = null;
 
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Before
+	public void setup() throws IOException {
+		tempDirPath = new Path(temporaryFolder.newFolder().toURI());
+	}
 
 	@Test(expected = IllegalArgumentException.class)
-	public void testWrongParameters() {
+	public void testWrongParameters() throws IOException {
 		// this should fail
 		new FsStateBackend.FsCheckpointStateOutputStream(
-			TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), new HashSet<FsCheckpointStateOutputStream>(), 4000, 5000);
+			tempDirPath, FileSystem.getLocalFileSystem(), new HashSet<FsCheckpointStateOutputStream>(), 4000, 5000);
 	}
 
 	@Test
 	public void testEmptyState() throws Exception {
 		AbstractStateBackend.CheckpointStateOutputStream stream = new FsStateBackend.FsCheckpointStateOutputStream(
-			TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), new HashSet<FsCheckpointStateOutputStream>(), 1024, 512);
+			tempDirPath, FileSystem.getLocalFileSystem(), new HashSet<FsCheckpointStateOutputStream>(), 1024, 512);
 		
 		StreamStateHandle handle = stream.closeAndGetHandle();
 		assertTrue(handle instanceof ByteStreamStateHandle);
@@ -63,7 +81,7 @@ public class FsCheckpointStateOutputStreamTest {
 	@Test
 	public void testCloseAndGetPath() throws Exception {
 		FsCheckpointStateOutputStream stream = new FsCheckpointStateOutputStream(
-				TEMP_DIR_PATH,
+				tempDirPath,
 				FileSystem.getLocalFileSystem(),
 				new HashSet<FsCheckpointStateOutputStream>(),
 				1024,
@@ -83,13 +101,13 @@ public class FsCheckpointStateOutputStreamTest {
 		final HashSet<FsCheckpointStateOutputStream> openStreams = new HashSet<>();
 
 		FsCheckpointStateOutputStream stream1 = new FsCheckpointStateOutputStream(
-				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+				tempDirPath, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
 
 		FsCheckpointStateOutputStream stream2 = new FsCheckpointStateOutputStream(
-				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+				tempDirPath, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
 
 		FsCheckpointStateOutputStream stream3 = new FsCheckpointStateOutputStream(
-				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+				tempDirPath, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
 
 		assertFalse(stream1.isClosed());
 		assertFalse(stream2.isClosed());
@@ -122,13 +140,13 @@ public class FsCheckpointStateOutputStreamTest {
 		final HashSet<FsCheckpointStateOutputStream> openStreams = new HashSet<>();
 
 		FsCheckpointStateOutputStream stream1 = new FsCheckpointStateOutputStream(
-				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+				tempDirPath, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
 
 		FsCheckpointStateOutputStream stream2 = new FsCheckpointStateOutputStream(
-				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+				tempDirPath, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
 
 		FsCheckpointStateOutputStream stream3 = new FsCheckpointStateOutputStream(
-				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+				tempDirPath, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
 
 		assertTrue(openStreams.contains(stream1));
 		assertTrue(openStreams.contains(stream2));
@@ -174,10 +192,110 @@ public class FsCheckpointStateOutputStreamTest {
 		runTest(16678, 4096, 0, true);
 	}
 
+	/**
+	 * Tests that the underlying stream file is deleted upon calling close.
+	 */
+	@Test
+	public void testCleanupWhenClosingStream() throws IOException {
+
+		final FileSystem fs = mock(FileSystem.class);
+		final FSDataOutputStream outputStream = mock(FSDataOutputStream.class);
+
+		final ArgumentCaptor<Path>  pathCaptor = ArgumentCaptor.forClass(Path.class);
+
+		when(fs.create(pathCaptor.capture(), anyBoolean())).thenReturn(outputStream);
+
+		AbstractStateBackend.CheckpointStateOutputStream stream = new FsStateBackend.FsCheckpointStateOutputStream(
+			tempDirPath,
+			fs,
+			new HashSet<FsCheckpointStateOutputStream>(),
+			4,
+			0);
+
+		// this should create the underlying file stream
+		stream.write(new byte[]{1,2,3,4,5});
+
+		verify(fs).create(any(Path.class), anyBoolean());
+
+		stream.close();
+
+		verify(fs).delete(eq(pathCaptor.getValue()), anyBoolean());
+	}
+
+	/**
+	 * Tests that the underlying stream file is deleted if the closeAndGetHandle method fails.
+	 */
+	@Test
+	public void testCleanupWhenFailingCloseAndGetHandle() throws IOException {
+		final FileSystem fs = mock(FileSystem.class);
+		final FSDataOutputStream outputStream = mock(FSDataOutputStream.class);
+
+		final ArgumentCaptor<Path>  pathCaptor = ArgumentCaptor.forClass(Path.class);
+
+		when(fs.create(pathCaptor.capture(), anyBoolean())).thenReturn(outputStream);
+		doThrow(new IOException("Test IOException.")).when(outputStream).close();
+
+		AbstractStateBackend.CheckpointStateOutputStream stream = new FsStateBackend.FsCheckpointStateOutputStream(
+			tempDirPath,
+			fs,
+			new HashSet<FsCheckpointStateOutputStream>(),
+			4,
+			0);
+
+		// this should create the underlying file stream
+		stream.write(new byte[]{1,2,3,4,5});
+
+		verify(fs).create(any(Path.class), anyBoolean());
+
+		try {
+			stream.closeAndGetHandle();
+			fail("Expected IOException");
+		} catch (IOException ioE) {
+			// expected exception
+		}
+
+		verify(fs).delete(eq(pathCaptor.getValue()), anyBoolean());
+	}
+
+	/**
+	 * Tests that the underlying stream file is deleted if the closeAndGetPath method fails.
+	 */
+	@Test
+	public void testCleanupWhenFailingCloseAndGetPath() throws IOException {
+		final FileSystem fs = mock(FileSystem.class);
+		final FSDataOutputStream outputStream = mock(FSDataOutputStream.class);
+
+		final ArgumentCaptor<Path>  pathCaptor = ArgumentCaptor.forClass(Path.class);
+
+		when(fs.create(pathCaptor.capture(), anyBoolean())).thenReturn(outputStream);
+		doThrow(new IOException("Test IOException.")).when(outputStream).close();
+
+		FsStateBackend.FsCheckpointStateOutputStream stream = new FsStateBackend.FsCheckpointStateOutputStream(
+			tempDirPath,
+			fs,
+			new HashSet<FsCheckpointStateOutputStream>(),
+			4,
+			0);
+
+		// this should create the underlying file stream
+		stream.write(new byte[]{1,2,3,4,5});
+
+		verify(fs).create(any(Path.class), anyBoolean());
+
+		try {
+			stream.closeAndGetPath();
+			fail("Expected IOException");
+		} catch (IOException ioE) {
+			// expected exception
+		}
+
+		verify(fs).delete(eq(pathCaptor.getValue()), anyBoolean());
+	}
+	
 	private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile) throws Exception {
 		AbstractStateBackend.CheckpointStateOutputStream stream =
 			new FsStateBackend.FsCheckpointStateOutputStream(
-					TEMP_DIR_PATH, FileSystem.getLocalFileSystem(),
+					tempDirPath, FileSystem.getLocalFileSystem(),
 					new HashSet<FsCheckpointStateOutputStream>(), bufferSize, threshold);
 		
 		Random rnd = new Random();

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 923943f..cb0b001 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -406,29 +406,75 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
 		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
 
-		final AbstractStateBackend.CheckpointStateOutputStream os =
-			this.getStateBackend().createCheckpointStateOutputStream(checkpointId, timestamp);
+		final AbstractStateBackend.CheckpointStateOutputStream os;
 
-		final ObjectOutputStream oos = new ObjectOutputStream(os);
-		final AbstractStateBackend.CheckpointStateOutputView ov = new AbstractStateBackend.CheckpointStateOutputView(os);
+		try {
+			os = this.getStateBackend().createCheckpointStateOutputStream(checkpointId, timestamp);
+		} catch (Exception e) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state of {}.", getOperatorName(), discardException);
+			}
+
+			throw new Exception("Could not create the checkpoint state output stream for " +
+				getOperatorName() + '.', e);
+		}
+
+		try {
+			final ObjectOutputStream oos = new ObjectOutputStream(os);
+			final AbstractStateBackend.CheckpointStateOutputView ov = new AbstractStateBackend.CheckpointStateOutputView(os);
+
+			Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState = this.reader.getReaderState();
+			List<FileInputSplit> pendingSplits = readerState.f0;
+			FileInputSplit currSplit = readerState.f1;
+			S formatState = readerState.f2;
+
+			// write the current split
+			oos.writeObject(currSplit);
+
+			// write the pending ones
+			ov.writeInt(pendingSplits.size());
+			for (FileInputSplit split : pendingSplits) {
+				oos.writeObject(split);
+			}
 
-		Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState = this.reader.getReaderState();
-		List<FileInputSplit> pendingSplits = readerState.f0;
-		FileInputSplit currSplit = readerState.f1;
-		S formatState = readerState.f2;
+			// write the state of the reading channel
+			oos.writeObject(formatState);
 
-		// write the current split
-		oos.writeObject(currSplit);
+			oos.flush();
+		} catch (Exception exception) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard the stream task state of {}.", getOperatorName(), discardException);
+			}
+
+			try {
+				// closing the checkpoint output stream should delete the written data
+				os.close();
+			} catch (Exception closingException) {
+				LOG.warn("Could not close the checkpoint state output stream belonging to " +
+					"{}. The written data might not be deleted.", getOperatorName(), closingException);
+			}
+
+			throw new Exception("Could not write the stream task state of " + getOperatorName()
+				+ " into the checkpoint state output view.", exception);
+		}
+
+		try {
+			taskState.setOperatorState(os.closeAndGetHandle());
+		} catch (Exception e) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state of {}.", getOperatorName(), discardException);
+			}
 
-		// write the pending ones
-		ov.writeInt(pendingSplits.size());
-		for (FileInputSplit split : pendingSplits) {
-			oos.writeObject(split);
+			throw new Exception("Could not close and get state handle from checkpoint state " +
+				"output stream belonging to " + getOperatorName() + '.', e);
 		}
 
-		// write the state of the reading channel
-		oos.writeObject(formatState);
-		taskState.setOperatorState(os.closeAndGetHandle());
 		return taskState;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index d51c320..41b5f7e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -181,10 +181,16 @@ public abstract class AbstractStreamOperator<OUT>
 		StreamTaskState state = new StreamTaskState();
 
 		if (stateBackend != null) {
-			HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> partitionedSnapshots =
-				stateBackend.snapshotPartitionedState(checkpointId, timestamp);
-			if (partitionedSnapshots != null) {
-				state.setKvStates(partitionedSnapshots);
+			try {
+				HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> partitionedSnapshots =
+					stateBackend.snapshotPartitionedState(checkpointId, timestamp);
+
+				if (partitionedSnapshots != null) {
+					state.setKvStates(partitionedSnapshots);
+				}
+			} catch (Exception e) {
+				throw new Exception("Failed to snapshot partitioned state for operator " +
+					getOperatorName() + '.', e);
 			}
 		}
 
@@ -234,6 +240,21 @@ public abstract class AbstractStreamOperator<OUT>
 	public ClassLoader getUserCodeClassloader() {
 		return container.getUserCodeClassLoader();
 	}
+
+	/**
+	 * Return the operator name. If the runtime context has been set, then the task name with
+	 * subtask index is returned. Otherwise, the simple class name is returned.
+	 *
+	 * @return If runtime context is set, then return task name with subtask index. Otherwise return
+	 * 			simple class name.
+	 */
+	protected String getOperatorName() {
+		if (runtimeContext != null) {
+			return runtimeContext.getTaskNameWithSubtasks();
+		} else {
+			return getClass().getSimpleName();
+		}
+	}
 	
 	/**
 	 * Returns a context that allows the operator to query information about the execution and also

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 1ddd934..2fe1326 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -129,7 +129,14 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends
 				udfState = chkFunction.snapshotState(checkpointId, timestamp);
 			} 
 			catch (Exception e) {
-				throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e);
+				try {
+					state.discardState();
+				} catch (Exception discardException) {
+					LOG.warn("Could not discard stream task state of {}.", getOperatorName(), discardException);
+				}
+
+				throw new Exception("Failed to snapshot function state of " +
+					getOperatorName() + '.', e);
 			}
 			
 			if (udfState != null) {
@@ -140,8 +147,14 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends
 					state.setFunctionState(handle);
 				}
 				catch (Exception e) {
-					throw new Exception("Failed to add the state snapshot of the function to the checkpoint: "
-							+ e.getMessage(), e);
+					try {
+						state.discardState();
+					} catch (Exception discardException) {
+						LOG.warn("Could not discard stream task state of {}.", getOperatorName(), discardException);
+					}
+
+					throw new Exception("Failed to add the function state snapshot of " +
+						getOperatorName() + " to the checkpoint.", e);
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index afc28f4..497f85a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -89,14 +89,32 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
 	private void saveHandleInState(final long checkpointId, final long timestamp) throws Exception {
 		//only add handle if a new OperatorState was created since the last snapshot
 		if (out != null) {
-			StateHandle<DataInputView> handle = out.closeAndGetHandle();
+			StateHandle<DataInputView> handle;
+
+			try {
+				handle = out.closeAndGetHandle();
+			} catch (Exception e) {
+				throw new Exception("Could not close and get state handle from checkpoint " +
+					"state output view belonging to " +
+					getOperatorName() + '.', e);
+			}
+
 			if (state.pendingHandles.containsKey(checkpointId)) {
 				//we already have a checkpoint stored for that ID that may have been partially written,
 				//so we discard this "alternate version" and use the stored checkpoint
-				handle.discardState();
+				try {
+					handle.discardState();
+				} catch (Exception exception) {
+					LOG.warn("Could not discard state handle for checkpoint {} of {}, " +
+						"which already has been stored.", checkpointId,
+						getOperatorName(), exception);
+				}
 			} else {
 				state.pendingHandles.put(checkpointId, new Tuple2<>(timestamp, handle));
 			}
+
+			// only set out stream to null in case that we could obtain a state handle
+			// otherwise we might lose some data if we allow failing checkpoints
 			out = null;
 		}
 	}
@@ -104,7 +122,20 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
 	@Override
 	public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception {
 		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
-		saveHandleInState(checkpointId, timestamp);
+
+		try {
+			saveHandleInState(checkpointId, timestamp);
+		} catch (Exception e) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state of {}.", getOperatorName(), discardException);
+			}
+
+			throw new Exception("Could not save handle in state of " +
+				getOperatorName() + '.', e);
+		}
+
 		taskState.setFunctionState(state);
 		return taskState;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index fdc8117..c84bc7f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -248,15 +248,58 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
 		
 		// we write the panes with the key/value maps into the stream, as well as when this state
-		// should have triggered and slided
-		AbstractStateBackend.CheckpointStateOutputView out =
-				getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+		// should have triggered and slidedq
+		AbstractStateBackend.CheckpointStateOutputView out;
+
+		try {
+			out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+		} catch (Exception e) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state for {}.", getOperatorName(), discardException);
+			}
+
+			throw new Exception("Could not create checkpoint state output view to write the " +
+				getOperatorName() + " state into.", e);
+		}
+
+		try {
+			out.writeLong(nextEvaluationTime);
+			out.writeLong(nextSlideTime);
+			panes.writeToOutput(out, keySerializer, stateTypeSerializer);
+		} catch (Exception ioE) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state for {}.", getOperatorName(), discardException);
+			}
+
+			try {
+				// closing the checkpoint output view stream should delete the written data
+				out.close();
+			} catch (Exception closingException) {
+				LOG.warn("Could not close the checkpointed output view for {}. The written data " +
+					"might not be deleted.", getOperatorName(), closingException);
+			}
+
+			throw new Exception("Could not write the state for " + getOperatorName() +
+				" into the checkpoint state output view.", ioE);
+		}
+
+		try {
+			taskState.setOperatorState(out.closeAndGetHandle());
+		} catch (Exception ioE) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state for {}.", getOperatorName(), discardException);
+			}
+
+			throw new Exception("Could not close and obtain the state handle from the checkpoint " +
+				"output view of " + getOperatorName() + '.', ioE);
+		}
 
-		out.writeLong(nextEvaluationTime);
-		out.writeLong(nextSlideTime);
-		panes.writeToOutput(out, keySerializer, stateTypeSerializer);
-		
-		taskState.setOperatorState(out.closeAndGetHandle());
 		return taskState;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 2434843..16784d8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -871,12 +871,54 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
 
-		AbstractStateBackend.CheckpointStateOutputView out =
-			getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+		final AbstractStateBackend.CheckpointStateOutputView out;
 
-		snapshotTimers(out);
+		try {
+			out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+		} catch (Exception ioE) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state of {}.", getOperatorName(), discardException);
+			}
+
+			throw new Exception("Could not create checkpoint state output view for " +
+				getOperatorName() + '.', ioE);
+		}
+
+		try {
+			snapshotTimers(out);
+		} catch (Exception e) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state of {}.", getOperatorName(), discardException);
+			}
+
+			try {
+				// closing the checkpoint output stream should delete the written data
+				out.close();
+			} catch (Exception closingException) {
+				LOG.warn("Could not close the checkpoint state output view of {}. The written data " +
+					"might not be deleted.", getOperatorName(), closingException);
+			}
+
+			throw new Exception("Could not snapshot the window operators timers of " +
+				getOperatorName() + '.', e);
+		}
 
-		taskState.setOperatorState(out.closeAndGetHandle());
+		try {
+			taskState.setOperatorState(out.closeAndGetHandle());
+		} catch (Exception e) {
+			try {
+				taskState.discardState();
+			} catch (Exception discardException) {
+				LOG.warn("Could not discard stream task state of {}.", getOperatorName(), discardException);
+			}
+
+			throw new Exception("Could not close and get state handle from checkpoint output view of " +
+				getOperatorName() + '.', e);
+		}
 
 		return taskState;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 99df060..9531974 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -50,6 +50,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -602,7 +603,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			throw e;
 		}
 		catch (Exception e) {
-			throw new Exception("Error while performing a checkpoint", e);
+			throw new Exception("Error while performing checkpoint " + checkpointId + '.', e);
 		}
 	}
 
@@ -671,7 +672,12 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 				if (allStates.isEmpty()) {
 					getEnvironment().acknowledgeCheckpoint(checkpointId);
 				} else if (!hasAsyncStates) {
-					this.lastCheckpointSize = allStates.getStateSize();
+					try {
+						this.lastCheckpointSize = allStates.getStateSize();
+					} catch (Exception ioE) {
+						LOG.warn("Could not calculate the total state size for checkpoint {}.", checkpointId, ioE);
+					}
+
 					getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
 				} else {
 					// start a Thread that does the asynchronous materialization and
@@ -694,11 +700,26 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 				// we cannot broadcast the cancellation markers on the 'operator chain', because it may not
 				// yet be created
 				final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointId);
+
+				Exception exception = null;
+
 				for (ResultPartitionWriter output : getEnvironment().getAllWriters()) {
-					output.writeEventToAllChannels(message);
+					try {
+						output.writeEventToAllChannels(message);
+					} catch (IOException ioE) {
+						if (exception == null) {
+							exception = new Exception("Could not send CancelCheckpointMarker to downstream tasks.", ioE);
+						} else {
+							exception.addSuppressed(ioE);
+						}
+					}
 				}
 
-				return false;
+				if (exception == null) {
+					return false;
+				} else {
+					throw exception;
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java
index ae85d86..4a208a4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java
@@ -37,7 +37,7 @@ public class StreamTaskStateList implements StateHandle<StreamTaskState[]> {
 	/** The states for all operator */
 	private final StreamTaskState[] states;
 
-	public StreamTaskStateList(StreamTaskState[] states) throws Exception {
+	public StreamTaskStateList(StreamTaskState[] states) {
 		this.states = states;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
index f0113d1..184e67b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
@@ -17,17 +17,28 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import java.io.IOException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
 
 /**
  * Tests for {@link StreamMap}. These test that:
@@ -38,6 +49,8 @@ import org.junit.Test;
  *     <li>Watermarks are correctly forwarded</li>
  * </ul>
  */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(AbstractStreamOperator.class)
 public class StreamMapTest {
 
 	private static class Map implements MapFunction<Integer, String> {
@@ -91,6 +104,51 @@ public class StreamMapTest {
 		Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
 	}
 
+	@Test
+	public void testFailingSnapshot() throws Exception {
+		final long checkpointId = 1L;
+		final long timestamp = 42L;
+
+		StreamTaskState streamTaskState = mock(StreamTaskState.class);
+		whenNew(StreamTaskState.class).withAnyArguments().thenReturn(streamTaskState);
+
+		StreamMap<String, String> operator = new StreamMap<>(new TestCheckpointedMapFunction());
+
+		OneInputStreamOperatorTestHarness<String, String> testHarness = new OneInputStreamOperatorTestHarness<String, String>(operator);
+
+		testHarness.open();
+
+		try {
+			testHarness.snapshot(checkpointId, timestamp);
+
+			fail("Expected exception here.");
+		} catch (Exception expected) {
+			// expected exception
+		}
+
+		verify(streamTaskState).discardState();
+	}
+
+	private static class TestCheckpointedMapFunction implements MapFunction<String, String>, Checkpointed<String> {
+
+		private static final long serialVersionUID = 2353250741656753525L;
+
+		@Override
+		public String map(String value) throws Exception {
+			return value;
+		}
+
+		@Override
+		public String snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			throw new IOException("Test exception.");
+		}
+
+		@Override
+		public void restoreState(String state) throws Exception {
+			// noop
+		}
+	}
+
 	// This must only be used in one test, otherwise the static fields will be changed
 	// by several tests concurrently
 	private static class TestOpenCloseMapFunction extends RichMapFunction<String, String> {
@@ -103,7 +161,7 @@ public class StreamMapTest {
 		public void open(Configuration parameters) throws Exception {
 			super.open(parameters);
 			if (closeCalled) {
-				Assert.fail("Close called before open.");
+				fail("Close called before open.");
 			}
 			openCalled = true;
 		}
@@ -112,7 +170,7 @@ public class StreamMapTest {
 		public void close() throws Exception {
 			super.close();
 			if (!openCalled) {
-				Assert.fail("Open was not called before close.");
+				fail("Open was not called before close.");
 			}
 			closeCalled = true;
 		}
@@ -120,7 +178,7 @@ public class StreamMapTest {
 		@Override
 		public String map(String value) throws Exception {
 			if (!openCalled) {
-				Assert.fail("Open was not called before run.");
+				fail("Open was not called before run.");
 			}
 			return value;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c058871/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index dfa353c..72939d5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -32,13 +32,14 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
@@ -70,7 +71,11 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -81,7 +86,16 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(AbstractStreamOperator.class)
 public class WindowOperatorTest {
 
 	// For counting if close() is called the correct number of times on the SumReducer
@@ -2391,6 +2405,178 @@ public class WindowOperatorTest {
 		testHarness.close();
 	}
 
+	/**
+	 * Tests that the StreamTaskState and the CheckpointStateOutputStream are discarded and closed
+	 * in case of a failure while writing to the CheckpointStateOutputStream.
+	 */
+	@Test
+	public void testCleanupInCaseOfFailingSnapshotCall() throws Exception {
+		final int WINDOW_SIZE = 10;
+		final long checkpointId = 1L;
+		final long timestamp = 42L;
+
+		StreamTaskState streamTaskState = mock(StreamTaskState.class);
+		AbstractStateBackend stateBackend = mock(AbstractStateBackend.class);
+		AbstractStateBackend.CheckpointStateOutputStream outputStream = mock(AbstractStateBackend.CheckpointStateOutputStream.class);
+
+		doThrow(new IOException("Test Exception")).when(outputStream).write(anyInt());
+
+		when(stateBackend.createCheckpointStateOutputView(anyLong(), anyLong())).thenCallRealMethod();
+		when(stateBackend.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn(outputStream);
+		whenNew(StreamTaskState.class).withAnyArguments().thenReturn(streamTaskState);
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
+			new SumReducer(),
+			inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
+			GlobalWindows.create(),
+			new GlobalWindow.Serializer(),
+			new TupleKeySelector(),
+			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+			stateDesc,
+			new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
+			PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)),
+			0);
+
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse(
+			"Tuple2<String, Integer>"), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+			new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.setStateBackend(stateBackend);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.open();
+
+		try {
+			testHarness.snapshot(checkpointId, timestamp);
+			fail("Expected Exception here.");
+		} catch (Exception expected) {
+			// expected the exception here
+		}
+
+		verify(outputStream).close();
+		verify(streamTaskState).discardState();
+	}
+
+	/**
+	 * Tests that the StreamTaskState is discarded in case of a failure while obtaining the
+	 * CheckpointStateOutputStream.
+	 */
+	@Test
+	public void testCleanupInCaseOfFailingCheckpointStateOutputStreamCreation() throws Exception {
+		final int WINDOW_SIZE = 10;
+		final long checkpointId = 1L;
+		final long timestamp = 42L;
+
+		StreamTaskState streamTaskState = mock(StreamTaskState.class);
+		AbstractStateBackend stateBackend = mock(AbstractStateBackend.class);
+
+		when(stateBackend.createCheckpointStateOutputView(anyLong(), anyLong())).thenThrow(new IOException("Test Exception"));
+		whenNew(StreamTaskState.class).withAnyArguments().thenReturn(streamTaskState);
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
+			new SumReducer(),
+			inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
+			GlobalWindows.create(),
+			new GlobalWindow.Serializer(),
+			new TupleKeySelector(),
+			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+			stateDesc,
+			new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
+			PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)),
+			0);
+
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse(
+			"Tuple2<String, Integer>"), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+			new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.setStateBackend(stateBackend);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.open();
+
+		try {
+			testHarness.snapshot(checkpointId, timestamp);
+			fail("Expected Exception here.");
+		} catch (Exception expected) {
+			// expected the exception here
+		}
+
+		verify(streamTaskState).discardState();
+	}
+
+	/**
+	 * Tests that the StreamTaskState is discarded in case of a failure while closing and getting
+	 * the state handle from the CheckpointStateOutputStream.
+	 */
+	@Test
+	public void testCleanupInCaseOfFailingCloseAndGetHandleInSnapshotMethod() throws Exception {
+		final int WINDOW_SIZE = 10;
+		final long checkpointId = 1L;
+		final long timestamp = 42L;
+
+		StreamTaskState streamTaskState = mock(StreamTaskState.class);
+		AbstractStateBackend stateBackend = mock(AbstractStateBackend.class);
+		AbstractStateBackend.CheckpointStateOutputStream outputStream = mock(AbstractStateBackend.CheckpointStateOutputStream.class);
+
+		doThrow(new IOException("Test Exception")).when(outputStream).closeAndGetHandle();
+
+		when(stateBackend.createCheckpointStateOutputView(anyLong(), anyLong())).thenCallRealMethod();
+		when(stateBackend.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn(outputStream);
+		whenNew(StreamTaskState.class).withAnyArguments().thenReturn(streamTaskState);
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
+			new SumReducer(),
+			inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
+			GlobalWindows.create(),
+			new GlobalWindow.Serializer(),
+			new TupleKeySelector(),
+			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+			stateDesc,
+			new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
+			PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)),
+			0);
+
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse(
+			"Tuple2<String, Integer>"), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+			new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.setStateBackend(stateBackend);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.open();
+
+		try {
+			testHarness.snapshot(checkpointId, timestamp);
+			fail("Expected Exception here.");
+		} catch (Exception expected) {
+			// expected the exception here
+		}
+
+		verify(outputStream).closeAndGetHandle();
+		verify(streamTaskState).discardState();
+	}
+
 	// ------------------------------------------------------------------------
 	//  UDFs
 	// ------------------------------------------------------------------------