You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/01/23 16:33:11 UTC

[1/2] flink git commit: [FLINK-5229] [state] Cleanup of operator snapshots if subsequent operator snapshots fail

Repository: flink
Updated Branches:
  refs/heads/master 5d0d279dc -> cfb95b907


[FLINK-5229] [state] Cleanup of operator snapshots if subsequent operator snapshots fail

This PR adds operator state cleanup to the StreamTask class. If a stream task contains multiple
stream operators, then every operator is checkpointed. In case that a snapshot operation fails
all state handles and OperatorSnapshotResults belonging to previous operators have to be freed.

Add test cases for failing checkpoint operations in StreamTask

Address PR comments

This closes #3179.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cfb95b90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cfb95b90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cfb95b90

Branch: refs/heads/master
Commit: cfb95b9074a05686f5ad290b2aaa4be89536a35b
Parents: e458975
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Jan 20 14:28:44 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jan 23 17:32:49 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/state/StateUtil.java   |  21 +++
 .../api/operators/OperatorSnapshotResult.java   |  22 +--
 .../streaming/runtime/tasks/StreamTask.java     |  56 +++++-
 .../streaming/runtime/tasks/StreamTaskTest.java | 182 ++++++++++++++++++-
 4 files changed, 251 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cfb95b90/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
index a4799bf..19afdec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.util.FutureUtil;
+
+import java.util.concurrent.RunnableFuture;
+
 /**
  * Helpers for {@link StateObject} related code.
  */
@@ -61,4 +65,21 @@ public class StateUtil {
 			}
 		}
 	}
+
+	/**
+	 * Discards the given state future by first trying to cancel it. If this is not possible, then
+	 * the state object contained in the future is calculated and afterwards discarded.
+	 *
+	 * @param stateFuture to be discarded
+	 * @throws Exception if the discard operation failed
+	 */
+	public static void discardStateFuture(RunnableFuture<? extends StateObject> stateFuture) throws Exception {
+		if (null != stateFuture) {
+			if (!stateFuture.cancel(true)) {
+				StateObject stateObject = FutureUtil.runIfNotDoneAndGet(stateFuture);
+
+				stateObject.discardState();
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb95b90/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
index 913928f..5a6c37b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
@@ -20,9 +20,8 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FutureUtil;
 
 import java.util.concurrent.RunnableFuture;
 
@@ -87,7 +86,7 @@ public class OperatorSnapshotResult {
 		Exception exception = null;
 
 		try {
-			cancelIfNotNull(getKeyedStateManagedFuture());
+			StateUtil.discardStateFuture(getKeyedStateManagedFuture());
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(
 				new Exception("Could not properly cancel managed keyed state future.", e),
@@ -95,7 +94,7 @@ public class OperatorSnapshotResult {
 		}
 
 		try {
-			cancelIfNotNull(getOperatorStateManagedFuture());
+			StateUtil.discardStateFuture(getOperatorStateManagedFuture());
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(
 				new Exception("Could not properly cancel managed operator state future.", e),
@@ -103,7 +102,7 @@ public class OperatorSnapshotResult {
 		}
 
 		try {
-			cancelIfNotNull(getKeyedStateRawFuture());
+			StateUtil.discardStateFuture(getKeyedStateRawFuture());
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(
 				new Exception("Could not properly cancel raw keyed state future.", e),
@@ -111,7 +110,7 @@ public class OperatorSnapshotResult {
 		}
 
 		try {
-			cancelIfNotNull(getOperatorStateRawFuture());
+			StateUtil.discardStateFuture(getOperatorStateRawFuture());
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(
 				new Exception("Could not properly cancel raw operator state future.", e),
@@ -122,15 +121,4 @@ public class OperatorSnapshotResult {
 			throw exception;
 		}
 	}
-
-	private static <T extends StreamStateHandle> void cancelIfNotNull(RunnableFuture<T> future) throws Exception {
-		if (null != future) {
-			if (!future.cancel(true)) {
-				// the cancellation was not successful because it might have been completed before
-				StreamStateHandle streamStateHandle = FutureUtil.runIfNotDoneAndGet(future);
-
-				streamStateHandle.discardState();
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb95b90/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 625e279..7647fbb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateBackendFactory;
+import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
@@ -531,7 +532,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			// propagate exceptions only if the task is still in "running" state
 			if (isRunning) {
 				throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() +
-					"for operator " + getName() + '.', e);
+					" for operator " + getName() + '.', e);
 			} else {
 				LOG.debug("Could not perform checkpoint {} for operator {} while the " +
 					"invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e);
@@ -954,12 +955,19 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 							owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis);
 				}
 			} catch (Exception e) {
+				try {
+					cleanup();
+				} catch (Exception cleanupException) {
+					e.addSuppressed(cleanupException);
+				}
+
 				// registers the exception and tries to fail the whole task
 				AsynchronousException asyncException = new AsynchronousException(
 					new Exception(
 						"Could not materialize checkpoint " + checkpointMetaData.getCheckpointId() +
 							" for operator " + owner.getName() + '.',
 						e));
+
 				owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException);
 			} finally {
 				owner.cancelables.unregisterClosable(this);
@@ -968,17 +976,37 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 		@Override
 		public void close() {
-			// cleanup/release ongoing snapshot operations
-			for (OperatorSnapshotResult snapshotResult : snapshotInProgressList) {
-				if (null != snapshotResult) {
+			try {
+				cleanup();
+			} catch (Exception cleanupException) {
+				LOG.warn("Could not properly clean up the async checkpoint runnable.", cleanupException);
+			}
+		}
+
+		private void cleanup() throws Exception {
+			Exception exception = null;
+
+			// clean up ongoing operator snapshot results and non partitioned state handles
+			for (OperatorSnapshotResult operatorSnapshotResult : snapshotInProgressList) {
+				if (operatorSnapshotResult != null) {
 					try {
-						snapshotResult.cancel();
-					} catch (Exception e) {
-						LOG.warn("Could not properly cancel operator snapshot result in async " +
-							"checkpoint runnable.", e);
+						operatorSnapshotResult.cancel();
+					} catch (Exception cancelException) {
+						exception = ExceptionUtils.firstOrSuppressed(cancelException, exception);
 					}
 				}
 			}
+
+			// discard non partitioned state handles
+			try {
+				StateUtil.bestEffortDiscardAllStateObjects(nonPartitionedStateHandles);
+			} catch (Exception discardException) {
+				exception = ExceptionUtils.firstOrSuppressed(discardException, exception);
+			}
+
+			if (null != exception) {
+				throw exception;
+			}
 		}
 	}
 
@@ -1058,6 +1086,18 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 						}
 					}
 
+					// Cleanup non partitioned state handles
+					for (StreamStateHandle nonPartitionedState : nonPartitionedStates) {
+						if (nonPartitionedState != null) {
+							try {
+								nonPartitionedState.discardState();
+							} catch (Exception e) {
+								LOG.warn("Could not properly discard a non partitioned " +
+									"state. This might leave some orphaned files behind.", e);
+							}
+						}
+					}
+
 					if (LOG.isDebugEnabled()) {
 						LOG.debug("{} - did NOT finish synchronous part of checkpoint {}." +
 								"Alignment duration: {} ms, snapshot duration {} ms",

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb95b90/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index ffacd3f..f5990ca 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -22,13 +22,16 @@ import akka.dispatch.Futures;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -52,30 +55,38 @@ import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateBackendFactory;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskExecutionStateListener;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
+import org.apache.flink.runtime.util.DirectExecutorService;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
 import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import scala.concurrent.Await;
@@ -91,7 +102,9 @@ import java.net.URL;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.PriorityQueue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -99,10 +112,14 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
 
-public class StreamTaskTest {
+public class StreamTaskTest extends TestLogger {
 
 	private static OneShotLatch SYNC_LATCH;
 
@@ -171,8 +188,8 @@ public class StreamTaskTest {
 		task.getExecutingThread().join();
 
 		// ensure that the state backends are closed
-		Mockito.verify(StateBackendTestSource.operatorStateBackend).close();
-		Mockito.verify(StateBackendTestSource.keyedStateBackend).close();
+		verify(StateBackendTestSource.operatorStateBackend).close();
+		verify(StateBackendTestSource.keyedStateBackend).close();
 
 		assertEquals(ExecutionState.FINISHED, task.getExecutionState());
 	}
@@ -195,8 +212,8 @@ public class StreamTaskTest {
 		task.getExecutingThread().join();
 
 		// ensure that the state backends are closed
-		Mockito.verify(StateBackendTestSource.operatorStateBackend).close();
-		Mockito.verify(StateBackendTestSource.keyedStateBackend).close();
+		verify(StateBackendTestSource.operatorStateBackend).close();
+		verify(StateBackendTestSource.keyedStateBackend).close();
 
 		assertEquals(ExecutionState.FAILED, task.getExecutionState());
 	}
@@ -241,6 +258,161 @@ public class StreamTaskTest {
 		assertEquals(ExecutionState.CANCELED, task.getExecutionState());
 	}
 
+	@Test
+	public void testFailingCheckpointStreamOperator() throws Exception {
+		final long checkpointId = 42L;
+		final long timestamp = 1L;
+
+		TaskInfo mockTaskInfo = mock(TaskInfo.class);
+		when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
+		when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
+		Environment mockEnvironment = mock(Environment.class);
+		when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
+
+		StreamTask<?, AbstractStreamOperator<?>> streamTask = mock(StreamTask.class, Mockito.CALLS_REAL_METHODS);
+		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
+		streamTask.setEnvironment(mockEnvironment);
+
+		StreamOperator<?> streamOperator1 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+		StreamOperator<?> streamOperator2 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+		StreamOperator<?> streamOperator3 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+
+		OperatorSnapshotResult operatorSnapshotResult1 = mock(OperatorSnapshotResult.class);
+		OperatorSnapshotResult operatorSnapshotResult2 = mock(OperatorSnapshotResult.class);
+
+		final Exception testException = new Exception("Test exception");
+
+		when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult1);
+		when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult2);
+		when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenThrow(testException);
+
+		StreamOperator<?>[] streamOperators = {streamOperator1, streamOperator2, streamOperator3};
+
+		OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
+		when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+
+		StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class);
+		StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class);
+		StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class);
+
+		CheckpointStreamFactory.CheckpointStateOutputStream outStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+		CheckpointStreamFactory.CheckpointStateOutputStream outStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+		CheckpointStreamFactory.CheckpointStateOutputStream outStream3 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+
+		when(outStream1.closeAndGetHandle()).thenReturn(streamStateHandle1);
+		when(outStream2.closeAndGetHandle()).thenReturn(streamStateHandle2);
+		when(outStream3.closeAndGetHandle()).thenReturn(streamStateHandle3);
+
+		CheckpointStreamFactory mockStreamFactory = mock(CheckpointStreamFactory.class);
+		when(mockStreamFactory.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn(
+			outStream1, outStream2, outStream3);
+
+		AbstractStateBackend mockStateBackend = mock(AbstractStateBackend.class);
+		when(mockStateBackend.createStreamFactory(any(JobID.class), anyString())).thenReturn(mockStreamFactory);
+
+		Whitebox.setInternalState(streamTask, "isRunning", true);
+		Whitebox.setInternalState(streamTask, "lock", new Object());
+		Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
+		Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
+		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
+		Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
+
+		try {
+			streamTask.triggerCheckpoint(checkpointMetaData);
+			fail("Expected test exception here.");
+		} catch (Exception e) {
+			assertEquals(testException, e.getCause());
+		}
+
+		verify(operatorSnapshotResult1).cancel();
+		verify(operatorSnapshotResult2).cancel();
+
+		verify(streamStateHandle1).discardState();
+		verify(streamStateHandle2).discardState();
+		verify(streamStateHandle3).discardState();
+	}
+
+	/**
+	 * Tests that in case of a failing AsyncCheckpointRunnable all operator snapshot results are
+	 * cancelled and all non partitioned state handles are discarded.
+	 */
+	@Test
+	public void testFailingAsyncCheckpointRunnable() throws Exception {
+		final long checkpointId = 42L;
+		final long timestamp = 1L;
+
+		TaskInfo mockTaskInfo = mock(TaskInfo.class);
+		when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
+		when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
+		Environment mockEnvironment = mock(Environment.class);
+		when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
+
+		StreamTask<?, AbstractStreamOperator<?>> streamTask = mock(StreamTask.class, Mockito.CALLS_REAL_METHODS);
+		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
+		streamTask.setEnvironment(mockEnvironment);
+
+		StreamOperator<?> streamOperator1 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+		StreamOperator<?> streamOperator2 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+		StreamOperator<?> streamOperator3 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+
+		OperatorSnapshotResult operatorSnapshotResult1 = mock(OperatorSnapshotResult.class);
+		OperatorSnapshotResult operatorSnapshotResult2 = mock(OperatorSnapshotResult.class);
+		OperatorSnapshotResult operatorSnapshotResult3 = mock(OperatorSnapshotResult.class);
+
+		RunnableFuture<OperatorStateHandle> failingFuture = mock(RunnableFuture.class);
+		when(failingFuture.get()).thenThrow(new ExecutionException(new Exception("Test exception")));
+
+		when(operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn(failingFuture);
+
+		when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult1);
+		when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult2);
+		when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult3);
+
+		StreamOperator<?>[] streamOperators = {streamOperator1, streamOperator2, streamOperator3};
+
+		OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
+		when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+
+		StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class);
+		StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class);
+		StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class);
+
+		CheckpointStreamFactory.CheckpointStateOutputStream outStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+		CheckpointStreamFactory.CheckpointStateOutputStream outStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+		CheckpointStreamFactory.CheckpointStateOutputStream outStream3 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+
+		when(outStream1.closeAndGetHandle()).thenReturn(streamStateHandle1);
+		when(outStream2.closeAndGetHandle()).thenReturn(streamStateHandle2);
+		when(outStream3.closeAndGetHandle()).thenReturn(streamStateHandle3);
+
+		CheckpointStreamFactory mockStreamFactory = mock(CheckpointStreamFactory.class);
+		when(mockStreamFactory.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn(
+			outStream1, outStream2, outStream3);
+
+		AbstractStateBackend mockStateBackend = mock(AbstractStateBackend.class);
+		when(mockStateBackend.createStreamFactory(any(JobID.class), anyString())).thenReturn(mockStreamFactory);
+
+		Whitebox.setInternalState(streamTask, "isRunning", true);
+		Whitebox.setInternalState(streamTask, "lock", new Object());
+		Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
+		Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
+		Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", new DirectExecutorService());
+		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
+		Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
+
+		streamTask.triggerCheckpoint(checkpointMetaData);
+
+		verify(streamTask).handleAsyncException(anyString(), any(Throwable.class));
+
+		verify(operatorSnapshotResult1).cancel();
+		verify(operatorSnapshotResult2).cancel();
+		verify(operatorSnapshotResult3).cancel();
+
+		verify(streamStateHandle1).discardState();
+		verify(streamStateHandle2).discardState();
+		verify(streamStateHandle3).discardState();
+	}
+
 	// ------------------------------------------------------------------------
 	//  Test Utilities
 	// ------------------------------------------------------------------------


[2/2] flink git commit: [FLINK-5214] Clean up checkpoint data in case of a failing checkpoint operation

Posted by tr...@apache.org.
[FLINK-5214] Clean up checkpoint data in case of a failing checkpoint operation

Adds exception handling to the stream operators for the snapshotState method. A failing
snapshot operation will trigger the clean up of all so far generated state resources.
This will avoid that in case of a failing snapshot operation resources (e.g. files) are
left behind.

Add test case for OperatorSnapshotResult

Add StateSnapshotContextSynchronousImplTest

Add AbstractStreamOperator failing snapshot tests

This closes #3178.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4589757
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4589757
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4589757

Branch: refs/heads/master
Commit: e458975756e137ae2abb94e09fd92578ecd739bc
Parents: 5d0d279
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Dec 1 13:25:05 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jan 23 17:32:49 2017 +0100

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |  12 +-
 .../state/RocksDBAsyncSnapshotTest.java         |  77 ++++++++-
 .../runtime/state/StateSnapshotContext.java     |   2 +-
 .../StateSnapshotContextSynchronousImpl.java    |  41 ++++-
 .../filesystem/FsCheckpointStreamFactory.java   |  67 +++++---
 .../FsCheckpointStateOutputStreamTest.java      |  73 +++++++++
 .../source/ContinuousFileReaderOperator.java    |  15 +-
 .../functions/util/StreamingFunctionUtils.java  |  11 +-
 .../api/operators/AbstractStreamOperator.java   |  92 ++++++++---
 .../api/operators/OperatorSnapshotResult.java   |  56 ++++++-
 .../api/operators/async/AsyncWaitOperator.java  |  21 ++-
 .../operators/GenericWriteAheadSink.java        |  14 +-
 .../streaming/runtime/tasks/StreamTask.java     |  42 +++--
 .../operators/AbstractStreamOperatorTest.java   | 157 ++++++++++++++++++-
 .../operators/OperatorSnapshotResultTest.java   |  97 +++++-------
 ...StateSnapshotContextSynchronousImplTest.java |  73 ++++++++-
 16 files changed, 698 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e4589757/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 21ef8c2..dccf3ac 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -286,8 +286,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 							}
 						}
 
-						LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", asynchronous part) in thread " +
-								Thread.currentThread() + " took " + (System.currentTimeMillis() - startTime) + " ms.");
+						LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.",
+							streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime));
 
 						return snapshotOperation.getSnapshotResultStateHandle();
 					}
@@ -348,7 +348,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 * @param checkpointId id of the checkpoint for which we take the snapshot
 		 * @param checkpointTimeStamp timestamp of the checkpoint for which we take the snapshot
 		 */
-		public void takeDBSnapShot(long checkpointId, long checkpointTimeStamp) throws IOException {
+		public void takeDBSnapShot(long checkpointId, long checkpointTimeStamp) {
 			Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!");
 			this.kvStateIterators = new ArrayList<>(stateBackend.kvStateInformation.size());
 			this.checkpointId = checkpointId;
@@ -429,8 +429,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					if (null != snapshotResultStateHandle) {
 						snapshotResultStateHandle.discardState();
 					}
-				} catch (Exception ignored) {
-					LOG.warn("Exception occurred during snapshot state handle cleanup: " + ignored);
+				} catch (Exception e) {
+					LOG.warn("Exception occurred during snapshot state handle cleanup.", e);
 				}
 			}
 		}
@@ -454,7 +454,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			return snapshotResultStateHandle;
 		}
 
-		private void writeKVStateMetaData() throws IOException, InterruptedException {
+		private void writeKVStateMetaData() throws IOException {
 
 			List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoList =
 					new ArrayList<>(stateBackend.kvStateInformation.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/e4589757/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 70f74b0..46a184a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -31,8 +32,14 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
@@ -47,6 +54,7 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
 import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.FutureUtil;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -69,10 +77,21 @@ import java.net.URI;
 import java.util.Arrays;
 import java.util.UUID;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 /**
  * Tests for asynchronous RocksDB Key/Value state checkpoints.
@@ -182,7 +201,7 @@ public class RocksDBAsyncSnapshotTest {
 
 		testHarness.waitForTaskCompletion();
 		if (mockEnv.wasFailedExternally()) {
-			Assert.fail("Unexpected exception during execution.");
+			fail("Unexpected exception during execution.");
 		}
 	}
 
@@ -259,7 +278,7 @@ public class RocksDBAsyncSnapshotTest {
 			if (mockEnv.wasFailedExternally()) {
 				throw new AsynchronousException(new InterruptedException("Exception was thrown as expected."));
 			}
-			Assert.fail("Operation completed. Cancel failed.");
+			fail("Operation completed. Cancel failed.");
 		} catch (Exception expected) {
 			AsynchronousException asynchronousException = null;
 
@@ -268,7 +287,7 @@ public class RocksDBAsyncSnapshotTest {
 			} else if (expected.getCause() instanceof AsynchronousException) {
 				asynchronousException = (AsynchronousException) expected.getCause();
 			} else {
-				Assert.fail("Unexpected exception: " + expected);
+				fail("Unexpected exception: " + expected);
 			}
 
 			// we expect the exception from canceling snapshots
@@ -279,6 +298,58 @@ public class RocksDBAsyncSnapshotTest {
 		}
 	}
 
+	/**
+	 * Test that the snapshot files are cleaned up in case of a failure during the snapshot
+	 * procedure.
+	 */
+	@Test
+	public void testCleanupOfSnapshotsInFailureCase() throws Exception {
+		long checkpointId = 1L;
+		long timestamp = 42L;
+
+		Environment env = new DummyEnvironment("test task", 1, 0);
+
+		CheckpointStreamFactory.CheckpointStateOutputStream outputStream = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+		CheckpointStreamFactory checkpointStreamFactory = mock(CheckpointStreamFactory.class);
+		AbstractStateBackend stateBackend = mock(AbstractStateBackend.class);
+
+		final IOException testException = new IOException("Test exception");
+
+		doReturn(checkpointStreamFactory).when(stateBackend).createStreamFactory(any(JobID.class), anyString());
+		doThrow(testException).when(outputStream).write(anyInt());
+		doReturn(outputStream).when(checkpointStreamFactory).createCheckpointStateOutputStream(eq(checkpointId), eq(timestamp));
+
+		RocksDBStateBackend backend = new RocksDBStateBackend(stateBackend);
+
+		backend.setDbStoragePath("file:///tmp/foobar");
+
+		AbstractKeyedStateBackend<Void> keyedStateBackend = backend.createKeyedStateBackend(
+			env,
+			new JobID(),
+			"test operator",
+			VoidSerializer.INSTANCE,
+			1,
+			new KeyGroupRange(0, 0),
+			null);
+
+		// register a state so that the state backend has to checkpoint something
+		keyedStateBackend.getPartitionedState(
+			"namespace",
+			StringSerializer.INSTANCE,
+			new ValueStateDescriptor<>("foobar", String.class));
+
+		RunnableFuture<KeyGroupsStateHandle> snapshotFuture = keyedStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory);
+
+		try {
+			FutureUtil.runIfNotDoneAndGet(snapshotFuture);
+			fail("Expected an exception to be thrown here.");
+		} catch (ExecutionException e) {
+			Assert.assertEquals(testException, e.getCause());
+		}
+
+		verify(outputStream).close();
+	}
+
 	@Test
 	public void testConsistentSnapshotSerializationFlagsAndMasks() {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e4589757/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java
index 4dbbeaf..e5a748b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java
@@ -37,4 +37,4 @@ public interface StateSnapshotContext extends FunctionSnapshotContext {
 	 */
 	OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Exception;
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e4589757/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
index ce8a6c4..96edccb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
@@ -20,15 +20,17 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.RunnableFuture;
 
 /**
  * This class is a default implementation for StateSnapshotContext.
  */
-public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext {
+public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext, Closeable {
 	
 	private final long checkpointId;
 	private final long checkpointTimestamp;
@@ -127,4 +129,39 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
 		return new DoneFuture<>(stream.closeAndGetHandle());
 	}
 
-}
\ No newline at end of file
+	private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<T> stream) throws IOException {
+		Preconditions.checkNotNull(stream);
+
+		closableRegistry.unregisterClosable(stream.getDelegate());
+		stream.getDelegate().close();
+	}
+
+	@Override
+	public void close() throws IOException {
+		IOException exception = null;
+
+		if (keyedStateCheckpointOutputStream != null) {
+			try {
+				closeAndUnregisterStream(keyedStateCheckpointOutputStream);
+			} catch (IOException e) {
+				exception = ExceptionUtils.firstOrSuppressed(
+					new IOException("Could not close the raw keyed state checkpoint output stream.", e),
+					exception);
+			}
+		}
+
+		if (operatorStateCheckpointOutputStream != null) {
+			try {
+				closeAndUnregisterStream(operatorStateCheckpointOutputStream);
+			} catch (IOException e) {
+				exception = ExceptionUtils.firstOrSuppressed(
+					new IOException("Could not close the raw operator state checkpoint output stream.", e),
+					exception);
+			}
+		}
+
+		if (exception != null) {
+			throw exception;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e4589757/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 1be3abf..30b1da6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -266,17 +266,21 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 				if (outStream != null) {
 					try {
 						outStream.close();
-						fs.delete(statePath, false);
-
+					} catch (Throwable throwable) {
+						LOG.warn("Could not close the state stream for {}.", statePath, throwable);
+					} finally {
 						try {
-							FileUtils.deletePathIfEmpty(fs, basePath);
-						} catch (Exception ignored) {
-							LOG.debug("Could not delete the parent directory {}.", basePath, ignored);
+							fs.delete(statePath, false);
+
+							try {
+								FileUtils.deletePathIfEmpty(fs, basePath);
+							} catch (Exception ignored) {
+								LOG.debug("Could not delete the parent directory {}.", basePath, ignored);
+							}
+						} catch (Exception e) {
+							LOG.warn("Cannot delete closed and discarded state stream for {}.", statePath, e);
 						}
 					}
-					catch (Exception e) {
-						LOG.warn("Cannot delete closed and discarded state stream for " + statePath, e);
-					}
 				}
 			}
 		}
@@ -297,20 +301,41 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 						return new ByteStreamStateHandle(createStatePath().toString(), bytes);
 					}
 					else {
-						flush();
-
-						closed = true;
-						pos = writeBuffer.length;
-
-						long size = -1;
-						// make a best effort attempt to figure out the size
 						try {
-							size = outStream.getPos();
-						} catch (Exception ignored) {}
-
-						outStream.close();
-
-						return new FileStateHandle(statePath, size);
+							flush();
+
+							pos = writeBuffer.length;
+						
+							long size = -1L;
+
+							// make a best effort attempt to figure out the size
+							try {
+								size = outStream.getPos();
+							} catch (Exception ignored) {}
+
+							outStream.close();
+
+							return new FileStateHandle(statePath, size);
+						} catch (Exception exception) {
+							try {
+								fs.delete(statePath, false);
+
+								try {
+									FileUtils.deletePathIfEmpty(fs, basePath);
+								} catch (Exception parentDirDeletionFailure) {
+									LOG.debug("Could not delete the parent directory {}.", basePath, parentDirDeletionFailure);
+								}
+							} catch (Exception deleteException) {
+								LOG.warn("Could not delete the checkpoint stream file {}.",
+									statePath, deleteException);
+							}
+
+							throw new IOException("Could not flush and close the file system " +
+								"output stream to " + statePath + " in order to obtain the " +
+								"stream state handle", exception);
+						} finally {
+							closed = true;
+						}
 					}
 				}
 				else {

http://git-wip-us.apache.org/repos/asf/flink/blob/e4589757/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
index 6d371b1..8617193 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
@@ -18,14 +18,17 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
 import java.io.DataInputStream;
 import java.io.File;
@@ -37,6 +40,13 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class FsCheckpointStateOutputStreamTest {
 
@@ -108,6 +118,69 @@ public class FsCheckpointStateOutputStreamTest {
 		stream.closeAndGetHandle();
 	}
 
+	/**
+	 * Tests that the underlying stream file is deleted upon calling close.
+	 */
+	@Test
+	public void testCleanupWhenClosingStream() throws IOException {
+
+		final FileSystem fs = mock(FileSystem.class);
+		final FSDataOutputStream outputStream = mock(FSDataOutputStream.class);
+
+		final ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+
+		when(fs.create(pathCaptor.capture(), anyBoolean())).thenReturn(outputStream);
+
+		CheckpointStreamFactory.CheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
+			TEMP_DIR_PATH,
+			fs,
+			4,
+			0);
+
+		// this should create the underlying file stream
+		stream.write(new byte[]{1,2,3,4,5});
+
+		verify(fs).create(any(Path.class), anyBoolean());
+
+		stream.close();
+
+		verify(fs).delete(eq(pathCaptor.getValue()), anyBoolean());
+	}
+
+	/**
+	 * Tests that the underlying stream file is deleted if the closeAndGetHandle method fails.
+	 */
+	@Test
+	public void testCleanupWhenFailingCloseAndGetHandle() throws IOException {
+		final FileSystem fs = mock(FileSystem.class);
+		final FSDataOutputStream outputStream = mock(FSDataOutputStream.class);
+
+		final ArgumentCaptor<Path>  pathCaptor = ArgumentCaptor.forClass(Path.class);
+
+		when(fs.create(pathCaptor.capture(), anyBoolean())).thenReturn(outputStream);
+		doThrow(new IOException("Test IOException.")).when(outputStream).close();
+
+		CheckpointStreamFactory.CheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
+			TEMP_DIR_PATH,
+			fs,
+			4,
+			0);
+
+		// this should create the underlying file stream
+		stream.write(new byte[]{1,2,3,4,5});
+
+		verify(fs).create(any(Path.class), anyBoolean());
+
+		try {
+			stream.closeAndGetHandle();
+			fail("Expected IOException");
+		} catch (IOException ioE) {
+			// expected exception
+		}
+
+		verify(fs).delete(eq(pathCaptor.getValue()), anyBoolean());
+	}
+
 	private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile) throws Exception {
 		FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
 			new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(

http://git-wip-us.apache.org/repos/asf/flink/blob/e4589757/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 6419aa6..ab1ad1d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -394,10 +394,19 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
 		int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
 
 		checkpointedState.clear();
+
 		List<TimestampedFileInputSplit> readerState = reader.getReaderState();
-		for (TimestampedFileInputSplit split : readerState) {
-			// create a new partition for each entry.
-			checkpointedState.add(split);
+
+		try {
+			for (TimestampedFileInputSplit split : readerState) {
+				// create a new partition for each entry.
+				checkpointedState.add(split);
+			}
+		} catch (Exception e) {
+			checkpointedState.clear();
+
+			throw new Exception("Could not add timestamped file input splits to to operator " +
+				"state backend of operator " + getOperatorName() + '.', e);
 		}
 
 		if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e4589757/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java
index d1d264f..679ef0b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java
@@ -131,8 +131,15 @@ public final class StreamingFunctionUtils {
 			listState.clear();
 
 			if (null != partitionableState) {
-				for (Serializable statePartition : partitionableState) {
-					listState.add(statePartition);
+				try {
+					for (Serializable statePartition : partitionableState) {
+						listState.add(statePartition);
+					}
+				} catch (Exception e) {
+					listState.clear();
+
+					throw new Exception("Could not write partitionable state to operator " +
+						"state backend.", e);
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e4589757/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 6bb0a40..05f2ed5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -339,24 +339,38 @@ public abstract class AbstractStreamOperator<OUT>
 		KeyGroupRange keyGroupRange = null != keyedStateBackend ?
 				keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
 
-		StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
-				checkpointId, timestamp, streamFactory, keyGroupRange, getContainingTask().getCancelables());
+		OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult();
 
-		snapshotState(snapshotContext);
+		try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
+				checkpointId,
+				timestamp,
+				streamFactory,
+				keyGroupRange,
+				getContainingTask().getCancelables())) {
 
-		OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult();
+			snapshotState(snapshotContext);
 
-		snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
-		snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
+			snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
+			snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
 
-		if (null != operatorStateBackend) {
-			snapshotInProgress.setOperatorStateManagedFuture(
+			if (null != operatorStateBackend) {
+				snapshotInProgress.setOperatorStateManagedFuture(
 					operatorStateBackend.snapshot(checkpointId, timestamp, streamFactory));
-		}
+			}
 
-		if (null != keyedStateBackend) {
-			snapshotInProgress.setKeyedStateManagedFuture(
+			if (null != keyedStateBackend) {
+				snapshotInProgress.setKeyedStateManagedFuture(
 					keyedStateBackend.snapshot(checkpointId, timestamp, streamFactory));
+			}
+		}  catch (Exception snapshotException) {
+			try {
+				snapshotInProgress.cancel();
+			} catch (Exception e) {
+				snapshotException.addSuppressed(e);
+			}
+
+			throw new Exception("Could not complete snapshot " + checkpointId + " for operator " +
+				getOperatorName() + '.', snapshotException);
 		}
 
 		return snapshotInProgress;
@@ -369,21 +383,40 @@ public abstract class AbstractStreamOperator<OUT>
 	 */
 	public void snapshotState(StateSnapshotContext context) throws Exception {
 		if (getKeyedStateBackend() != null) {
-			KeyedStateCheckpointOutputStream out = context.getRawKeyedOperatorStateOutput();
+			KeyedStateCheckpointOutputStream out;
 
-			KeyGroupsList allKeyGroups = out.getKeyGroupList();
-			for (int keyGroupIdx : allKeyGroups) {
-				out.startNewKeyGroup(keyGroupIdx);
+			try {
+				out = context.getRawKeyedOperatorStateOutput();
+			} catch (Exception exception) {
+				throw new Exception("Could not open raw keyed operator state stream for " +
+					getOperatorName() + '.', exception);
+			}
 
-				DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out);
-				dov.writeInt(timerServices.size());
+			try {
+				KeyGroupsList allKeyGroups = out.getKeyGroupList();
+				for (int keyGroupIdx : allKeyGroups) {
+					out.startNewKeyGroup(keyGroupIdx);
 
-				for (Map.Entry<String, HeapInternalTimerService<?, ?>> entry : timerServices.entrySet()) {
-					String serviceName = entry.getKey();
-					HeapInternalTimerService<?, ?> timerService = entry.getValue();
+					DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out);
+					dov.writeInt(timerServices.size());
 
-					dov.writeUTF(serviceName);
-					timerService.snapshotTimersForKeyGroup(dov, keyGroupIdx);
+					for (Map.Entry<String, HeapInternalTimerService<?, ?>> entry : timerServices.entrySet()) {
+						String serviceName = entry.getKey();
+						HeapInternalTimerService<?, ?> timerService = entry.getValue();
+
+						dov.writeUTF(serviceName);
+						timerService.snapshotTimersForKeyGroup(dov, keyGroupIdx);
+					}
+				}
+			} catch (Exception exception) {
+				throw new Exception("Could not write timer service of " + getOperatorName() +
+					" to checkpoint state stream.", exception);
+			} finally {
+				try {
+					out.close();
+				} catch (Exception closeException) {
+					LOG.warn("Could not close raw keyed operator state stream for {}. This " +
+						"might have prevented deleting some state data.", getOperatorName(), closeException);
 				}
 			}
 		}
@@ -457,6 +490,21 @@ public abstract class AbstractStreamOperator<OUT>
 	public ClassLoader getUserCodeClassloader() {
 		return container.getUserCodeClassLoader();
 	}
+
+	/**
+	 * Return the operator name. If the runtime context has been set, then the task name with
+	 * subtask index is returned. Otherwise, the simple class name is returned.
+	 *
+	 * @return If runtime context is set, then return task name with subtask index. Otherwise return
+	 * 			simple class name.
+	 */
+	protected String getOperatorName() {
+		if (runtimeContext != null) {
+			return runtimeContext.getTaskNameWithSubtasks();
+		} else {
+			return getClass().getSimpleName();
+		}
+	}
 	
 	/**
 	 * Returns a context that allows the operator to query information about the execution and also

http://git-wip-us.apache.org/repos/asf/flink/blob/e4589757/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
index 8b9f758..913928f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
@@ -20,8 +20,10 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FutureUtil;
 
-import java.util.concurrent.Future;
 import java.util.concurrent.RunnableFuture;
 
 /**
@@ -81,16 +83,54 @@ public class OperatorSnapshotResult {
 		this.operatorStateRawFuture = operatorStateRawFuture;
 	}
 
-	public void cancel() {
-		cancelIfNotNull(getKeyedStateManagedFuture());
-		cancelIfNotNull(getOperatorStateManagedFuture());
-		cancelIfNotNull(getKeyedStateRawFuture());
-		cancelIfNotNull(getOperatorStateRawFuture());
+	public void cancel() throws Exception {
+		Exception exception = null;
+
+		try {
+			cancelIfNotNull(getKeyedStateManagedFuture());
+		} catch (Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(
+				new Exception("Could not properly cancel managed keyed state future.", e),
+				exception);
+		}
+
+		try {
+			cancelIfNotNull(getOperatorStateManagedFuture());
+		} catch (Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(
+				new Exception("Could not properly cancel managed operator state future.", e),
+				exception);
+		}
+
+		try {
+			cancelIfNotNull(getKeyedStateRawFuture());
+		} catch (Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(
+				new Exception("Could not properly cancel raw keyed state future.", e),
+				exception);
+		}
+
+		try {
+			cancelIfNotNull(getOperatorStateRawFuture());
+		} catch (Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(
+				new Exception("Could not properly cancel raw operator state future.", e),
+				exception);
+		}
+
+		if (exception != null) {
+			throw exception;
+		}
 	}
 
-	private static void cancelIfNotNull(Future<?> future) {
+	private static <T extends StreamStateHandle> void cancelIfNotNull(RunnableFuture<T> future) throws Exception {
 		if (null != future) {
-			future.cancel(true);
+			if (!future.cancel(true)) {
+				// the cancellation was not successful because it might have been completed before
+				StreamStateHandle streamStateHandle = FutureUtil.runIfNotDoneAndGet(future);
+
+				streamStateHandle.discardState();
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e4589757/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index 754b754..f43f8b9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -231,18 +231,25 @@ public class AsyncWaitOperator<IN, OUT>
 		super.snapshotState(context);
 
 		ListState<StreamElement> partitionableState =
-				getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
+			getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
 		partitionableState.clear();
 
 		Collection<StreamElementQueueEntry<?>> values = queue.values();
 
-		for (StreamElementQueueEntry<?> value : values) {
-			partitionableState.add(value.getStreamElement());
-		}
+		try {
+			for (StreamElementQueueEntry<?> value : values) {
+				partitionableState.add(value.getStreamElement());
+			}
+
+			// add the pending stream element queue entry if the stream element queue is currently full
+			if (pendingStreamElementQueueEntry != null) {
+				partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
+			}
+		} catch (Exception e) {
+			partitionableState.clear();
 
-		// add the pending stream element queue entry if the stream element queue is currently full
-		if (pendingStreamElementQueueEntry != null) {
-			partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
+			throw new Exception("Could not add stream element queue entries to operator state " +
+				"backend of operator " + getOperatorName() + '.', e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e4589757/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index 564fa22..7a571ec 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -162,9 +162,17 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
 		saveHandleInState(context.getCheckpointId(), context.getCheckpointTimestamp());
 
 		this.checkpointedState.clear();
-		for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
-			// create a new partition for each entry.
-			this.checkpointedState.add(pendingCheckpoint);
+
+		try {
+			for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
+				// create a new partition for each entry.
+				this.checkpointedState.add(pendingCheckpoint);
+			}
+		} catch (Exception e) {
+			checkpointedState.clear();
+
+			throw new Exception("Could not add panding checkpoints to operator state " +
+				"backend of operator " + getOperatorName() + '.', e);
 		}
 
 		int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();

http://git-wip-us.apache.org/repos/asf/flink/blob/e4589757/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 265cb5c..625e279 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -58,6 +58,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
@@ -591,8 +592,20 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				// we cannot broadcast the cancellation markers on the 'operator chain', because it may not
 				// yet be created
 				final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
+				Exception exception = null;
+
 				for (ResultPartitionWriter output : getEnvironment().getAllWriters()) {
-					output.writeBufferToAllChannels(EventSerializer.toBuffer(message));
+					try {
+						output.writeBufferToAllChannels(EventSerializer.toBuffer(message));
+					} catch (Exception e) {
+						exception = ExceptionUtils.firstOrSuppressed(
+							new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),
+							exception);
+					}
+				}
+
+				if (exception != null) {
+					throw exception;
 				}
 
 				return false;
@@ -958,7 +971,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			// cleanup/release ongoing snapshot operations
 			for (OperatorSnapshotResult snapshotResult : snapshotInProgressList) {
 				if (null != snapshotResult) {
-					snapshotResult.cancel();
+					try {
+						snapshotResult.cancel();
+					} catch (Exception e) {
+						LOG.warn("Could not properly cancel operator snapshot result in async " +
+							"checkpoint runnable.", e);
+					}
 				}
 			}
 		}
@@ -1022,24 +1040,30 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("{} - finished synchronous part of checkpoint {}." +
-									"Alignment duration: {} ms, snapshot duration {} ms",
-							owner.getName(), checkpointMetaData.getCheckpointId(),
-							checkpointMetaData.getAlignmentDurationNanos() / 1_000_000,
-							checkpointMetaData.getSyncDurationMillis());
+							"Alignment duration: {} ms, snapshot duration {} ms",
+						owner.getName(), checkpointMetaData.getCheckpointId(),
+						checkpointMetaData.getAlignmentDurationNanos() / 1_000_000,
+						checkpointMetaData.getSyncDurationMillis());
 				}
 			} finally {
 				if (failed) {
 					// Cleanup to release resources
 					for (OperatorSnapshotResult operatorSnapshotResult : snapshotInProgressList) {
 						if (null != operatorSnapshotResult) {
-							operatorSnapshotResult.cancel();
+							try {
+								operatorSnapshotResult.cancel();
+							} catch (Exception e) {
+								LOG.warn("Could not properly cancel an operator snapshot result.", e);
+							}
 						}
 					}
 
 					if (LOG.isDebugEnabled()) {
 						LOG.debug("{} - did NOT finish synchronous part of checkpoint {}." +
-										"Alignment duration: {} ms, snapshot duration {} ms",
-								owner.getName(), checkpointMetaData.getCheckpointId());
+								"Alignment duration: {} ms, snapshot duration {} ms",
+							owner.getName(), checkpointMetaData.getCheckpointId(),
+							checkpointMetaData.getAlignmentDurationNanos() / 1_000_000,
+							checkpointMetaData.getSyncDurationMillis());
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/e4589757/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index f4051c9..409a732 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -19,35 +19,61 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.RunnableFuture;
 
 import static junit.framework.TestCase.assertTrue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
 
 /**
  * Tests for the facilities provided by {@link AbstractStreamOperator}. This mostly
  * tests timers and state and whether they are correctly checkpointed/restored
  * with key-group reshuffling.
  */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(AbstractStreamOperator.class)
 public class AbstractStreamOperatorTest {
 
 	@Test
@@ -453,6 +479,133 @@ public class AbstractStreamOperatorTest {
 	}
 
 	/**
+	 * Checks that the state snapshot context is closed after a successful snapshot operation.
+	 */
+	@Test
+	public void testSnapshotMethod() throws Exception {
+		final long checkpointId = 42L;
+		final long timestamp = 1L;
+
+		final CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+		StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class);
+
+		whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
+
+		CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
+		StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
+		when(containingTask.getCancelables()).thenReturn(closeableRegistry);
+
+		AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
+		when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenCallRealMethod();
+		doReturn(containingTask).when(operator).getContainingTask();
+
+		operator.snapshotState(checkpointId, timestamp, streamFactory);
+
+		verify(context).close();
+	}
+
+	/**
+	 * Tests that the created StateSnapshotContextSynchronousImpl is closed in case of a failing
+	 * Operator#snapshotState(StaetSnapshotContextSynchronousImpl) call.
+	 */
+	@Test
+	public void testFailingSnapshotMethod() throws Exception {
+		final long checkpointId = 42L;
+		final long timestamp = 1L;
+
+		final Exception failingException = new Exception("Test exception");
+
+		final CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+		StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class);
+
+		whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
+
+		CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
+		StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
+		when(containingTask.getCancelables()).thenReturn(closeableRegistry);
+
+		AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
+		when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenCallRealMethod();
+		doReturn(containingTask).when(operator).getContainingTask();
+
+		// lets fail when calling the actual snapshotState method
+		doThrow(failingException).when(operator).snapshotState(eq(context));
+
+		try {
+			operator.snapshotState(checkpointId, timestamp, streamFactory);
+			fail("Exception expected.");
+		} catch (Exception e) {
+			assertEquals(failingException, e.getCause());
+		}
+
+		verify(context).close();
+	}
+
+	/**
+	 * Tests that a failing snapshot method call to the keyed state backend will trigger the closing
+	 * of the StateSnapshotContextSynchronousImpl and the cancellation of the
+	 * OperatorSnapshotResult. The latter is supposed to also cancel all assigned futures.
+	 */
+	@Test
+	public void testFailingBackendSnapshotMethod() throws Exception {
+		final long checkpointId = 42L;
+		final long timestamp = 1L;
+
+		final Exception failingException = new Exception("Test exception");
+
+		final CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+		RunnableFuture<KeyGroupsStateHandle> futureKeyGroupStateHandle = mock(RunnableFuture.class);
+		RunnableFuture<OperatorStateHandle> futureOperatorStateHandle = mock(RunnableFuture.class);
+
+		StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class);
+		when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyGroupStateHandle);
+		when(context.getOperatorStateStreamFuture()).thenReturn(futureOperatorStateHandle);
+
+		OperatorSnapshotResult operatorSnapshotResult = spy(new OperatorSnapshotResult());
+
+		whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
+		whenNew(OperatorSnapshotResult.class).withAnyArguments().thenReturn(operatorSnapshotResult);
+
+		CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
+		StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
+		when(containingTask.getCancelables()).thenReturn(closeableRegistry);
+
+		AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
+		when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenCallRealMethod();
+		doReturn(containingTask).when(operator).getContainingTask();
+
+		RunnableFuture<OperatorStateHandle> futureManagedOperatorStateHandle = mock(RunnableFuture.class);
+
+		OperatorStateBackend operatorStateBackend = mock(OperatorStateBackend.class);
+		when(operatorStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory))).thenReturn(futureManagedOperatorStateHandle);
+
+		AbstractKeyedStateBackend<?> keyedStateBackend = mock(AbstractKeyedStateBackend.class);
+		when(keyedStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory))).thenThrow(failingException);
+
+		Whitebox.setInternalState(operator, "operatorStateBackend", operatorStateBackend);
+		Whitebox.setInternalState(operator, "keyedStateBackend", keyedStateBackend);
+
+		try {
+			operator.snapshotState(checkpointId, timestamp, streamFactory);
+			fail("Exception expected.");
+		} catch (Exception e) {
+			assertEquals(failingException, e.getCause());
+		}
+
+		// verify that the context has been closed, the operator snapshot result has been cancelled
+		// and that all futures have been cancelled.
+		verify(context).close();
+		verify(operatorSnapshotResult).cancel();
+
+		verify(futureKeyGroupStateHandle).cancel(anyBoolean());
+		verify(futureOperatorStateHandle).cancel(anyBoolean());
+		verify(futureKeyGroupStateHandle).cancel(anyBoolean());
+	}
+
+	/**
 	 * Extracts the result values form the test harness and clear the output queue.
 	 */
 	@SuppressWarnings({"unchecked", "rawtypes"})

http://git-wip-us.apache.org/repos/asf/flink/blob/e4589757/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
index 7e0ce5b..490df52 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
@@ -20,80 +20,59 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.junit.Assert;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RunnableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
-public class OperatorSnapshotResultTest {
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.when;
 
+public class OperatorSnapshotResultTest extends TestLogger {
+
+	/**
+	 * Tests that all runnable futures in an OperatorSnapshotResult are properly cancelled and if
+	 * the StreamStateHandle result is retrievable that the state handle are discarded.
+	 */
 	@Test
-	public void testCancel() {
+	public void testCancelAndCleanup() throws Exception {
 		OperatorSnapshotResult operatorSnapshotResult = new OperatorSnapshotResult();
 
 		operatorSnapshotResult.cancel();
 
-		RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture = new TestRunnableFuture<>();
-		RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture = new TestRunnableFuture<>();
-		RunnableFuture<OperatorStateHandle> operatorStateManagedFuture = new TestRunnableFuture<>();
-		RunnableFuture<OperatorStateHandle> operatorStateRawFuture = new TestRunnableFuture<>();
-
-		operatorSnapshotResult = new OperatorSnapshotResult(
-				keyedStateManagedFuture,
-				keyedStateRawFuture,
-				operatorStateManagedFuture,
-				operatorStateRawFuture);
-
-		operatorSnapshotResult.cancel();
-
-		Assert.assertTrue(keyedStateManagedFuture.isCancelled());
-		Assert.assertTrue(keyedStateRawFuture.isCancelled());
-		Assert.assertTrue(operatorStateManagedFuture.isCancelled());
-		Assert.assertTrue(operatorStateRawFuture.isCancelled());
-
-	}
+		KeyGroupsStateHandle keyedManagedStateHandle = mock(KeyGroupsStateHandle.class);
+		RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture = mock(RunnableFuture.class);
+		when(keyedStateManagedFuture.get()).thenReturn(keyedManagedStateHandle);
 
-	static final class TestRunnableFuture<T> implements RunnableFuture<T> {
+		KeyGroupsStateHandle keyedRawStateHandle = mock(KeyGroupsStateHandle.class);
+		RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture = mock(RunnableFuture.class);
+		when(keyedStateRawFuture.get()).thenReturn(keyedRawStateHandle);
 
-		private boolean canceled;
+		OperatorStateHandle operatorManagedStateHandle = mock(OperatorStateHandle.class);
+		RunnableFuture<OperatorStateHandle> operatorStateManagedFuture = mock(RunnableFuture.class);
+		when(operatorStateManagedFuture.get()).thenReturn(operatorManagedStateHandle);
 
-		public TestRunnableFuture() {
-			this.canceled = false;
-		}
+		OperatorStateHandle operatorRawStateHandle = mock(OperatorStateHandle.class);
+		RunnableFuture<OperatorStateHandle> operatorStateRawFuture = mock(RunnableFuture.class);
+		when(operatorStateRawFuture.get()).thenReturn(operatorRawStateHandle);
 
-		@Override
-		public void run() {
-
-		}
-
-		@Override
-		public boolean cancel(boolean mayInterruptIfRunning) {
-			return canceled = true;
-		}
-
-		@Override
-		public boolean isCancelled() {
-			return canceled;
-		}
+		operatorSnapshotResult = new OperatorSnapshotResult(
+			keyedStateManagedFuture,
+			keyedStateRawFuture,
+			operatorStateManagedFuture,
+			operatorStateRawFuture);
 
-		@Override
-		public boolean isDone() {
-			return false;
-		}
+		operatorSnapshotResult.cancel();
 
-		@Override
-		public T get() throws InterruptedException, ExecutionException {
-			return null;
-		}
+		verify(keyedStateManagedFuture).cancel(true);
+		verify(keyedStateRawFuture).cancel(true);
+		verify(operatorStateManagedFuture).cancel(true);
+		verify(operatorStateRawFuture).cancel(true);
 
-		@Override
-		public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-			return null;
-		}
+		verify(keyedManagedStateHandle).discardState();
+		verify(keyedRawStateHandle).discardState();
+		verify(operatorManagedStateHandle).discardState();
+		verify(operatorRawStateHandle).discardState();
 	}
-
-
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e4589757/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
index 2b2df4c..277ced5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
@@ -25,11 +25,22 @@ import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
 import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-public class StateSnapshotContextSynchronousImplTest {
+import java.io.Closeable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class StateSnapshotContextSynchronousImplTest extends TestLogger {
 
 	private StateSnapshotContextSynchronousImpl snapshotContext;
 
@@ -43,8 +54,8 @@ public class StateSnapshotContextSynchronousImplTest {
 
 	@Test
 	public void testMetaData() {
-		Assert.assertEquals(42, snapshotContext.getCheckpointId());
-		Assert.assertEquals(4711, snapshotContext.getCheckpointTimestamp());
+		assertEquals(42, snapshotContext.getCheckpointId());
+		assertEquals(4711, snapshotContext.getCheckpointTimestamp());
 	}
 
 	@Test
@@ -58,4 +69,58 @@ public class StateSnapshotContextSynchronousImplTest {
 		OperatorStateCheckpointOutputStream stream = snapshotContext.getRawOperatorStateOutput();
 		Assert.assertNotNull(stream);
 	}
-}
\ No newline at end of file
+
+	/**
+	 * Tests that closing the StateSnapshotContextSynchronousImpl will also close the associated
+	 * output streams.
+	 */
+	@Test
+	public void testStreamClosingWhenClosing() throws Exception {
+		long checkpointId = 42L;
+		long checkpointTimestamp = 1L;
+
+		CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+		CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+
+		CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
+		when(streamFactory.createCheckpointStateOutputStream(eq(checkpointId), eq(checkpointTimestamp))).thenReturn(outputStream1, outputStream2);
+
+		InsightCloseableRegistry closableRegistry = new InsightCloseableRegistry();
+
+		KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
+
+		StateSnapshotContextSynchronousImpl context = new StateSnapshotContextSynchronousImpl(
+			checkpointId,
+			checkpointTimestamp,
+			streamFactory,
+			keyGroupRange,
+			closableRegistry);
+
+		// creating the output streams
+		context.getRawKeyedOperatorStateOutput();
+		context.getRawOperatorStateOutput();
+
+		verify(streamFactory, times(2)).createCheckpointStateOutputStream(eq(checkpointId), eq(checkpointTimestamp));
+
+		assertEquals(2, closableRegistry.size());
+		assertTrue(closableRegistry.contains(outputStream1));
+		assertTrue(closableRegistry.contains(outputStream2));
+
+		context.close();
+
+		verify(outputStream1).close();
+		verify(outputStream2).close();
+
+		assertEquals(0, closableRegistry.size());
+	}
+
+	static final class InsightCloseableRegistry extends CloseableRegistry {
+		public int size() {
+			return closeableToRef.size();
+		}
+
+		public boolean contains(Closeable closeable) {
+			return closeableToRef.containsKey(closeable);
+		}
+	}
+}