You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/02 19:36:32 UTC

flink git commit: [FLINK-5229] [tm] Discard completed checkpoints if a subsequent operator's checkpoint fails

Repository: flink
Updated Branches:
  refs/heads/release-1.1 388acbca9 -> 020da2ce1


[FLINK-5229] [tm] Discard completed checkpoints if a subsequent operator's checkpoint fails

In case of a failure of any StreamOperator#snapshotState method, all up to this point created
StreamTaskStates are discarded. This ensures that a failing checkpoint operation of a chained
operator won't leave orphaned checkpoint data behind.

This closes #2924.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/020da2ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/020da2ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/020da2ce

Branch: refs/heads/release-1.1
Commit: 020da2ce1c8be83789252d0db959896a761d7513
Parents: 388acbc
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Dec 2 15:33:06 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Dec 2 19:10:46 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    | 214 ++++++++++++-------
 .../org/apache/flink/util/ExceptionUtils.java   |  50 +++++
 .../runtime/checkpoint/PendingCheckpoint.java   |  14 +-
 .../state/AsynchronousKvStateSnapshot.java      |   5 -
 .../runtime/state/AsynchronousStateHandle.java  |   5 -
 .../streaming/runtime/tasks/StreamTask.java     |  69 ++++--
 .../runtime/tasks/StreamTaskState.java          |  28 ++-
 .../tasks/StreamTaskAsyncCheckpointTest.java    |   5 +
 .../streaming/runtime/tasks/StreamTaskTest.java | 105 +++++++++
 9 files changed, 377 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 1561afc..3d75bde 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -77,6 +77,7 @@ import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
 
+import org.rocksdb.Snapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -638,11 +639,15 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	 */
 	private static class SemiAsyncSnapshot extends AsynchronousKvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> {
 		private static final long serialVersionUID = 1L;
+
+		private final SerializableObject lock = new SerializableObject();
 		private final File localBackupPath;
 		private final URI backupUri;
 		private final List<StateDescriptor> stateDescriptors;
 		private final long checkpointId;
 
+		private volatile boolean discarded;
+
 		private SemiAsyncSnapshot(File localBackupPath,
 				URI backupUri,
 				List<StateDescriptor> columnFamilies,
@@ -651,22 +656,45 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 			this.backupUri = backupUri;
 			this.stateDescriptors = columnFamilies;
 			this.checkpointId = checkpointId;
+			this.discarded = false;
 		}
 
 		@Override
 		public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {
-			try {
-				long startTime = System.currentTimeMillis();
-				HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri);
-				long endTime = System.currentTimeMillis();
-				LOG.info("RocksDB materialization from " + localBackupPath + " to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms.");
-				return new FinalSemiAsyncSnapshot(backupUri, checkpointId, stateDescriptors);
-			} catch (Exception e) {
-				FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
-				fs.delete(new org.apache.hadoop.fs.Path(backupUri), true);
-				throw e;
-			} finally {
-				FileUtils.deleteQuietly(localBackupPath);
+			synchronized (lock) {
+				if (discarded) {
+					throw new Exception("The SemiAsyncSnapshot has already been discarded.");
+				} else {
+					try {
+						long startTime = System.currentTimeMillis();
+						HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri);
+						long endTime = System.currentTimeMillis();
+
+						LOG.info("RocksDB materialization from {} to {} (asynchronous part) took {} ms.", localBackupPath, backupUri, (endTime - startTime));
+
+						return new FinalSemiAsyncSnapshot(backupUri, checkpointId, stateDescriptors);
+					} catch (Exception e) {
+						FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
+						fs.delete(new org.apache.hadoop.fs.Path(backupUri), true);
+						throw e;
+					} finally {
+						discardState();
+					}
+				}
+			}
+		}
+
+		@Override
+		public void discardState() throws Exception {
+			if (!discarded) {
+				synchronized (lock) {
+					if (!discarded) {
+						discarded = true;
+						if (!FileUtils.deleteQuietly(localBackupPath)) {
+							LOG.warn("Could not delete the local backup file stored at {}.", localBackupPath);
+						}
+					}
+				}
 			}
 		}
 	}
@@ -732,10 +760,13 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 		private transient org.rocksdb.Snapshot snapshot;
 		private transient AbstractStateBackend backend;
 
+		private final SerializableObject lock = new SerializableObject();
 		private final URI backupUri;
 		private final Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamilies;
 		private final long checkpointId;
 
+		private volatile boolean discarded;
+
 		private FullyAsyncSnapshot(org.rocksdb.Snapshot snapshot,
 				AbstractStateBackend backend,
 				URI backupUri,
@@ -746,99 +777,122 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 			this.backupUri = backupUri;
 			this.columnFamilies = columnFamilies;
 			this.checkpointId = checkpointId;
+			this.discarded = false;
 		}
 
 		@Override
 		public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {
-			long startTime = System.currentTimeMillis();
-			CheckpointStateOutputView outputView;
-
-			try {
-				try {
-					outputView = backend.createCheckpointStateOutputView(checkpointId, startTime);
-				} catch (Exception e) {
-					throw new Exception("Could not create a checkpoint state output view to " +
-						"materialize the checkpoint data into.", e);
-				}
-
-				try {
-					outputView.writeInt(columnFamilies.size());
+			synchronized (lock) {
+				if (discarded) {
+					throw new Exception("FullyAsyncSnapshot has already been discarded.");
+				} else {
+					long startTime = System.currentTimeMillis();
+					CheckpointStateOutputView outputView;
 
-					// we don't know how many key/value pairs there are in each column family.
-					// We prefix every written element with a byte that signifies to which
-					// column family it belongs, this way we can restore the column families
-					byte count = 0;
-					Map<String, Byte> columnFamilyMapping = new HashMap<>();
-					for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : columnFamilies.entrySet()) {
-						columnFamilyMapping.put(column.getKey(), count);
+					try {
+						try {
+							outputView = backend.createCheckpointStateOutputView(checkpointId, startTime);
+						} catch (Exception e) {
+							throw new Exception("Could not create a checkpoint state output view to " +
+								"materialize the checkpoint data into.", e);
+						}
 
-						outputView.writeByte(count);
+						try {
+							outputView.writeInt(columnFamilies.size());
 
-						ObjectOutputStream ooOut = new ObjectOutputStream(outputView);
-						ooOut.writeObject(column.getValue().f1);
-						ooOut.flush();
+							// we don't know how many key/value pairs there are in each column family.
+							// We prefix every written element with a byte that signifies to which
+							// column family it belongs, this way we can restore the column families
+							byte count = 0;
+							Map<String, Byte> columnFamilyMapping = new HashMap<>();
+							for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : columnFamilies.entrySet()) {
+								columnFamilyMapping.put(column.getKey(), count);
 
-						count++;
-					}
+								outputView.writeByte(count);
 
-					ReadOptions readOptions = new ReadOptions();
-					readOptions.setSnapshot(snapshot);
+								ObjectOutputStream ooOut = new ObjectOutputStream(outputView);
+								ooOut.writeObject(column.getValue().f1);
+								ooOut.flush();
 
-					for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : columnFamilies.entrySet()) {
-						byte columnByte = columnFamilyMapping.get(column.getKey());
+								count++;
+							}
 
-						synchronized (dbCleanupLock) {
-							if (db == null) {
-								throw new RuntimeException("RocksDB instance was disposed. This happens " +
-									"when we are in the middle of a checkpoint and the job fails.");
+							ReadOptions readOptions = new ReadOptions();
+							readOptions.setSnapshot(snapshot);
+
+							for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : columnFamilies.entrySet()) {
+								byte columnByte = columnFamilyMapping.get(column.getKey());
+
+								synchronized (dbCleanupLock) {
+									if (db == null) {
+										throw new RuntimeException("RocksDB instance was disposed. This happens " +
+											"when we are in the middle of a checkpoint and the job fails.");
+									}
+									RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions);
+									iterator.seekToFirst();
+									while (iterator.isValid()) {
+										outputView.writeByte(columnByte);
+										BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(),
+											outputView);
+										BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(),
+											outputView);
+										iterator.next();
+									}
+								}
 							}
-							RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions);
-							iterator.seekToFirst();
-							while (iterator.isValid()) {
-								outputView.writeByte(columnByte);
-								BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(),
-									outputView);
-								BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(),
-									outputView);
-								iterator.next();
+						} catch (Exception e) {
+							try {
+								// closing the output view deletes the underlying data
+								outputView.close();
+							} catch (Exception closingException) {
+								LOG.warn("Could not close the checkpoint state output view. The " +
+									"written data might not be deleted.", closingException);
 							}
+
+							throw new Exception("Could not write the checkpoint data into the checkpoint " +
+								"state output view.", e);
 						}
+					} finally {
+						discardState();
 					}
-				} catch (Exception e) {
+
+					StateHandle<DataInputView> stateHandle;
+
 					try {
-						// closing the output view deletes the underlying data
-						outputView.close();
-					} catch (Exception closingException) {
-						LOG.warn("Could not close the checkpoint state output view. The " +
-							"written data might not be deleted.", closingException);
+						stateHandle = outputView.closeAndGetHandle();
+					} catch (Exception ioE) {
+						throw new Exception("Could not close the checkpoint state output view and " +
+							"obtain the state handle.", ioE);
 					}
 
-					throw new Exception("Could not write the checkpoint data into the checkpoint " +
-						"state output view.", e);
+					long endTime = System.currentTimeMillis();
+					LOG.info("Fully asynchronous RocksDB materialization to {} (asynchronous part) took {} ms.", backupUri, (endTime - startTime));
+					return new FinalFullyAsyncSnapshot(stateHandle, checkpointId);
+				}
+			}
+		}
+
+		@Override
+		public void discardState() throws Exception {
+			if (!discarded) {
+				final Snapshot snapshotToRelease = snapshot;
+
+				synchronized (lock) {
+					if (discarded) {
+						return;
+					} else {
+						discarded = true;
+						snapshot = null;
+					}
 				}
-			} finally {
+
 				synchronized (dbCleanupLock) {
 					if (db != null) {
-						db.releaseSnapshot(snapshot);
+						db.releaseSnapshot(snapshotToRelease);
 					}
 				}
-				snapshot = null;
-			}
-
-			StateHandle<DataInputView> stateHandle;
-
-			try {
-				stateHandle = outputView.closeAndGetHandle();
-			} catch (Exception ioE) {
-				throw new Exception("Could not close the checkpoint state output view and " +
-					"obtain the state handle.", ioE);
 			}
-
-			long endTime = System.currentTimeMillis();
-			LOG.info("Fully asynchronous RocksDB materialization to {} (asynchronous part) took {} ms.", backupUri, (endTime - startTime));
-			return new FinalFullyAsyncSnapshot(stateHandle, checkpointId);
 		}
-
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 7227006..516cc1d 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -26,10 +26,13 @@ package org.apache.flink.util;
 
 import org.apache.flink.annotation.Internal;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 @Internal
 public final class ExceptionUtils {
 
@@ -142,6 +145,53 @@ public final class ExceptionUtils {
 	}
 
 	/**
+	 * Adds a new exception as a {@link Throwable#addSuppressed(Throwable) suppressed exception}
+	 * to a prior exception, or returns the new exception, if no prior exception exists.
+	 *
+	 * <pre>{@code
+	 *
+	 * public void closeAllThings() throws Exception {
+	 *     Exception ex = null;
+	 *     try {
+	 *         component.shutdown();
+	 *     } catch (Exception e) {
+	 *         ex = firstOrSuppressed(e, ex);
+	 *     }
+	 *     try {
+	 *         anotherComponent.stop();
+	 *     } catch (Exception e) {
+	 *         ex = firstOrSuppressed(e, ex);
+	 *     }
+	 *     try {
+	 *         lastComponent.shutdown();
+	 *     } catch (Exception e) {
+	 *         ex = firstOrSuppressed(e, ex);
+	 *     }
+	 *
+	 *     if (ex != null) {
+	 *         throw ex;
+	 *     }
+	 * }
+	 * }</pre>
+	 *
+	 * @param newException The newly occurred exception
+	 * @param previous     The previously occurred exception, possibly null.
+	 *
+	 * @return The new exception, if no previous exception exists, or the previous exception with the
+	 *         new exception in the list of suppressed exceptions.
+	 */
+	public static <T extends Throwable> T firstOrSuppressed(T newException, @Nullable T previous) {
+		checkNotNull(newException, "newException");
+
+		if (previous == null) {
+			return newException;
+		} else {
+			previous.addSuppressed(newException);
+			return previous;
+		}
+	}
+
+	/**
 	 * Private constructor to prevent instantiation.
 	 */
 	private ExceptionUtils() {

http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 6f185bd..ca35417 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -237,21 +237,23 @@ public class PendingCheckpoint {
 					executor.execute(new Runnable() {
 						@Override
 						public void run() {
-							try {
-								for (TaskState taskState: taskStates.values()) {
+							for (TaskState taskState: taskStates.values()) {
+								try {
 									taskState.discard(userClassLoader);
+								} catch (Exception e) {
+									LOG.warn("Could not properly dispose the task state " +
+										"belonging to vertex {} of checkpoint {} and job {}.",
+										taskState.getJobVertexID(), checkpointId, jobId, e);
 								}
-							} catch (Exception e) {
-								LOG.warn("Could not properly dispose the pending checkpoint " +
-									"{} of job {}.", checkpointId, jobId, e);
 							}
+
+							taskStates.clear();
 						}
 					});
 
 				}
 			} finally {
 				discarded = true;
-				taskStates.clear();
 				notYetAcknowledgedTasks.clear();
 				acknowledgedTasks.clear();
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
index c2fc8a4..db9c273 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
@@ -52,11 +52,6 @@ public abstract class AsynchronousKvStateSnapshot<K, N, S extends State, SD exte
 	}
 
 	@Override
-	public void discardState() throws Exception {
-		throw new RuntimeException("This should never be called and probably points to a bug.");
-	}
-
-	@Override
 	public long getStateSize() throws Exception {
 		throw new RuntimeException("This should never be called and probably points to a bug.");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
index fee1efe..6c77c16 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
@@ -35,9 +35,4 @@ public abstract class AsynchronousStateHandle<T> implements StateHandle<T> {
 	public final T getState(ClassLoader userCodeClassLoader) throws Exception {
 		throw new UnsupportedOperationException("This must not be called. This is likely an internal bug.");
 	}
-
-	@Override
-	public final void discardState() throws Exception {
-		throw new UnsupportedOperationException("This must not be called. This is likely an internal bug.");
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 9531974..d7204a9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -642,7 +642,24 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 				for (int i = 0; i < states.length; i++) {
 					StreamOperator<?> operator = allOperators[i];
 					if (operator != null) {
-						StreamTaskState state = operator.snapshotOperatorState(checkpointId, timestamp);
+						StreamTaskState state;
+						try {
+							state = operator.snapshotOperatorState(checkpointId, timestamp);
+						} catch (Exception exception) {
+							for (int j = 0; j < i; j++) {
+								if (states[j] != null) {
+									try {
+										states[j].discardState();
+									} catch (Exception discardException) {
+										LOG.warn("Could not discard " + j + "th operator state.", discardException);
+									}
+								}
+							}
+
+							throw new Exception("Could not perform the checkpoint for " + i +
+								"th operator in chain.", exception);
+						}
+
 						if (state.getOperatorState() instanceof AsynchronousStateHandle) {
 							hasAsyncStates = true;
 						}
@@ -876,7 +893,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 
 	// ------------------------------------------------------------------------
 	
-	private static class AsyncCheckpointThread extends Thread implements Closeable {
+	static class AsyncCheckpointThread extends Thread implements Closeable {
 
 		private final StreamTask<?, ?> owner;
 
@@ -900,29 +917,43 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		@Override
 		public void run() {
 			try {
-				for (StreamTaskState state : states) {
+				for (int i = 0; i < states.length; i++) {
+					StreamTaskState state = states[i];
+
 					if (state != null) {
-						if (state.getFunctionState() instanceof AsynchronousStateHandle) {
-							AsynchronousStateHandle<Serializable> asyncState = (AsynchronousStateHandle<Serializable>) state.getFunctionState();
-							state.setFunctionState(asyncState.materialize());
-						}
-						if (state.getOperatorState() instanceof AsynchronousStateHandle) {
-							AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>) state.getOperatorState();
-							state.setOperatorState(asyncState.materialize());
-						}
-						if (state.getKvStates() != null) {
-							Set<String> keys = state.getKvStates().keySet();
-							HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates = state.getKvStates();
-							for (String key: keys) {
-								if (kvStates.get(key) instanceof AsynchronousKvStateSnapshot) {
-									AsynchronousKvStateSnapshot<?, ?, ?, ?, ?> asyncHandle = (AsynchronousKvStateSnapshot<?, ?, ?, ?, ?>) kvStates.get(key);
-									kvStates.put(key, asyncHandle.materialize());
+						try {
+							if (state.getFunctionState() instanceof AsynchronousStateHandle) {
+								AsynchronousStateHandle<Serializable> asyncState = (AsynchronousStateHandle<Serializable>) state.getFunctionState();
+								state.setFunctionState(asyncState.materialize());
+							}
+							if (state.getOperatorState() instanceof AsynchronousStateHandle) {
+								AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>) state.getOperatorState();
+								state.setOperatorState(asyncState.materialize());
+							}
+							if (state.getKvStates() != null) {
+								Set<String> keys = state.getKvStates().keySet();
+								HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates = state.getKvStates();
+								for (String key : keys) {
+									if (kvStates.get(key) instanceof AsynchronousKvStateSnapshot) {
+										AsynchronousKvStateSnapshot<?, ?, ?, ?, ?> asyncHandle = (AsynchronousKvStateSnapshot<?, ?, ?, ?, ?>) kvStates.get(key);
+										kvStates.put(key, asyncHandle.materialize());
+									}
+								}
+							}
+						} catch (Exception exception) {
+							for (int j = 0; j < states.length; j++) {
+								try {
+									states[j].discardState();
+								} catch (Exception discardException) {
+									LOG.warn("Could not discard the " + j + "th operator state.", discardException);
 								}
 							}
-						}
 
+							throw new Exception("Could not materialize the " + i + "th operator state.", exception);
+						}
 					}
 				}
+
 				StreamTaskStateList allStates = new StreamTaskStateList(states);
 				owner.lastCheckpointSize = allStates.getStateSize();
 				owner.getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);

http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
index 925dd8c..2ffb489 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
@@ -102,12 +102,24 @@ public class StreamTaskState implements Serializable, Closeable {
 		this.operatorState = null;
 		this.functionState = null;
 		this.kvStates = null;
+
+		Exception discardException = null;
 	
 		if (operatorState != null) {
-			operatorState.discardState();
+			try {
+				operatorState.discardState();
+			} catch (Exception exception) {
+				discardException = new Exception("Could not discard operator state.", exception);
+			}
 		}
 		if (functionState != null) {
-			functionState.discardState();
+			try {
+				functionState.discardState();
+			} catch (Exception exception) {
+				Exception newException = new Exception("Could not discard function state.", exception);
+
+				ExceptionUtils.firstOrSuppressed(newException, discardException);
+			}
 		}
 		if (kvStates != null) {
 			while (kvStates.size() > 0) {
@@ -115,7 +127,13 @@ public class StreamTaskState implements Serializable, Closeable {
 					Iterator<KvStateSnapshot<?, ?, ?, ?, ?>> values = kvStates.values().iterator();
 					while (values.hasNext()) {
 						KvStateSnapshot<?, ?, ?, ?, ?> s = values.next();
-						s.discardState();
+						try {
+							s.discardState();
+						} catch (Exception exception) {
+							Exception newException = new Exception("Could not discard key value state.", exception);
+
+							ExceptionUtils.firstOrSuppressed(newException, discardException);
+						}
 						values.remove();
 					}
 				}
@@ -124,6 +142,10 @@ public class StreamTaskState implements Serializable, Closeable {
 				}
 			}
 		}
+
+		if(discardException != null) {
+			throw discardException;
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
index cfaeaad..c7c957f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
@@ -194,6 +194,11 @@ public class StreamTaskAsyncCheckpointTest {
 		}
 
 		@Override
+		public void discardState() throws Exception {
+			// noop
+		}
+
+		@Override
 		public long getStateSize() {
 			return 0;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index e8315c7..f2dad46 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -43,8 +43,10 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.AsynchronousStateHandle;
 import org.apache.flink.runtime.state.StateBackendFactory;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -53,6 +55,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.ExceptionUtils;
@@ -60,6 +63,7 @@ import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
 
+import org.mockito.internal.util.reflection.Whitebox;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -69,12 +73,17 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.net.URL;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import static org.junit.Assert.assertEquals;
@@ -191,6 +200,73 @@ public class StreamTaskTest {
 		assertEquals(ExecutionState.CANCELED, task.getExecutionState());
 	}
 
+	/**
+	 * Tests that all created StreamTaskStates are properly cleaned up when a snapshotting method
+	 * of an operator fails.
+	 */
+	@Test
+	public void testStateCleanupWhenFailingCheckpoint() throws Exception {
+		final long checkpointId = 1L;
+		final long timestamp = 42L;
+
+		StreamTask<Integer, StreamOperator<Integer>> streamTask = new TestingStreamTask();
+		streamTask.setEnvironment(new DummyEnvironment("test task", 1, 0));
+
+		OperatorChain<Integer> operatorChain = mock(OperatorChain.class);
+
+		StreamOperator<Integer> firstOperator = mock(StreamOperator.class);
+		StreamTaskState firstStreamTaskState = mock(StreamTaskState.class);
+		StreamOperator<Integer> secondOperator = mock(StreamOperator.class);
+
+		doReturn(firstStreamTaskState).when(firstOperator).snapshotOperatorState(anyLong(), anyLong());
+		doThrow(new Exception("Test Exception")).when(secondOperator).snapshotOperatorState(anyLong(), anyLong());
+
+		doReturn(new StreamOperator<?>[]{firstOperator, secondOperator}).when(operatorChain).getAllOperators();
+
+		Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
+		Whitebox.setInternalState(streamTask, "isRunning", true);
+
+		try {
+			streamTask.triggerCheckpoint(checkpointId, timestamp);
+			fail("Expected exception here.");
+		} catch (Exception expected) {
+			// expected failing trigger checkpoint here
+		}
+
+		verify(firstStreamTaskState).discardState();
+	}
+
+	/**
+	 * Tests that the AsyncCheckpointThread discards the given StreamTaskStates in case a failure
+	 * occurs while materializing the asynchronous state handles.
+	 */
+	@Test
+	public void testAsyncCheckpointThreadStateCleanup() throws Exception {
+		final long checkpointId = 1L;
+		StreamTaskState firstState = mock(StreamTaskState.class);
+		StreamTaskState secondState = mock(StreamTaskState.class);
+		AsynchronousStateHandle<Integer> functionStateHandle = mock(AsynchronousStateHandle.class);
+
+		doReturn(functionStateHandle).when(firstState).getFunctionState();
+		doThrow(new Exception("Test exception")).when(functionStateHandle).materialize();
+
+		StreamTask<Integer, StreamOperator<Integer>> owner = mock(StreamTask.class);
+		StreamTaskState[] states = {firstState, secondState};
+
+		StreamTask.AsyncCheckpointThread asyncCheckpointThread = new StreamTask.AsyncCheckpointThread(
+			"AsyncCheckpointThread",
+			owner,
+			new HashSet<Closeable>(),
+			states,
+			checkpointId);
+
+		asyncCheckpointThread.run();
+
+		for (StreamTaskState streamTaskState : states) {
+			verify(streamTaskState).discardState();
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Test Utilities
 	// ------------------------------------------------------------------------
@@ -489,4 +565,33 @@ public class StreamTaskTest {
 			interrupt();
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Testing class for StreamTask methods
+	 */
+	private static class TestingStreamTask extends StreamTask<Integer, StreamOperator<Integer>> {
+
+		@Override
+		protected void init() throws Exception {
+
+		}
+
+		@Override
+		protected void run() throws Exception {
+
+		}
+
+		@Override
+		protected void cleanup() throws Exception {
+
+		}
+
+		@Override
+		protected void cancelTask() throws Exception {
+
+		}
+	}
 }