You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/01/23 16:37:32 UTC

[2/2] flink git commit: [FLINK-5229] [state] Cleanup of operator snapshots if subsequent operator snapshots fail

[FLINK-5229] [state] Cleanup of operator snapshots if subsequent operator snapshots fail

This PR adds operator state cleanup to the StreamTask class. If a stream task contains multiple
stream operators, then every operator is checkpointed. In case that a snapshot operation fails
all state handles and OperatorSnapshotResults belonging to previous operators have to be freed.

Add test cases for failing checkpoint operations in StreamTask

Address PR comments

This closes #3183.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/840b779c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/840b779c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/840b779c

Branch: refs/heads/release-1.2
Commit: 840b779c542462cf7bd4bed40620dd68e90ec6bd
Parents: 006fcc4
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Jan 20 14:28:44 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jan 23 17:36:55 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/state/StateUtil.java   |  21 +++
 .../api/operators/OperatorSnapshotResult.java   |  22 +--
 .../streaming/runtime/tasks/StreamTask.java     |  56 +++++-
 .../streaming/runtime/tasks/StreamTaskTest.java | 182 ++++++++++++++++++-
 4 files changed, 251 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/840b779c/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
index a4799bf..19afdec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.util.FutureUtil;
+
+import java.util.concurrent.RunnableFuture;
+
 /**
  * Helpers for {@link StateObject} related code.
  */
@@ -61,4 +65,21 @@ public class StateUtil {
 			}
 		}
 	}
+
+	/**
+	 * Discards the given state future by first trying to cancel it. If this is not possible, then
+	 * the state object contained in the future is calculated and afterwards discarded.
+	 *
+	 * @param stateFuture to be discarded
+	 * @throws Exception if the discard operation failed
+	 */
+	public static void discardStateFuture(RunnableFuture<? extends StateObject> stateFuture) throws Exception {
+		if (null != stateFuture) {
+			if (!stateFuture.cancel(true)) {
+				StateObject stateObject = FutureUtil.runIfNotDoneAndGet(stateFuture);
+
+				stateObject.discardState();
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/840b779c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
index 913928f..5a6c37b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
@@ -20,9 +20,8 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FutureUtil;
 
 import java.util.concurrent.RunnableFuture;
 
@@ -87,7 +86,7 @@ public class OperatorSnapshotResult {
 		Exception exception = null;
 
 		try {
-			cancelIfNotNull(getKeyedStateManagedFuture());
+			StateUtil.discardStateFuture(getKeyedStateManagedFuture());
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(
 				new Exception("Could not properly cancel managed keyed state future.", e),
@@ -95,7 +94,7 @@ public class OperatorSnapshotResult {
 		}
 
 		try {
-			cancelIfNotNull(getOperatorStateManagedFuture());
+			StateUtil.discardStateFuture(getOperatorStateManagedFuture());
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(
 				new Exception("Could not properly cancel managed operator state future.", e),
@@ -103,7 +102,7 @@ public class OperatorSnapshotResult {
 		}
 
 		try {
-			cancelIfNotNull(getKeyedStateRawFuture());
+			StateUtil.discardStateFuture(getKeyedStateRawFuture());
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(
 				new Exception("Could not properly cancel raw keyed state future.", e),
@@ -111,7 +110,7 @@ public class OperatorSnapshotResult {
 		}
 
 		try {
-			cancelIfNotNull(getOperatorStateRawFuture());
+			StateUtil.discardStateFuture(getOperatorStateRawFuture());
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(
 				new Exception("Could not properly cancel raw operator state future.", e),
@@ -122,15 +121,4 @@ public class OperatorSnapshotResult {
 			throw exception;
 		}
 	}
-
-	private static <T extends StreamStateHandle> void cancelIfNotNull(RunnableFuture<T> future) throws Exception {
-		if (null != future) {
-			if (!future.cancel(true)) {
-				// the cancellation was not successful because it might have been completed before
-				StreamStateHandle streamStateHandle = FutureUtil.runIfNotDoneAndGet(future);
-
-				streamStateHandle.discardState();
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/840b779c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 775475d..95f9d17 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateBackendFactory;
+import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
@@ -530,7 +531,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			// propagate exceptions only if the task is still in "running" state
 			if (isRunning) {
 				throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() +
-					"for operator " + getName() + '.', e);
+					" for operator " + getName() + '.', e);
 			} else {
 				LOG.debug("Could not perform checkpoint {} for operator {} while the " +
 					"invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e);
@@ -953,12 +954,19 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 							owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis);
 				}
 			} catch (Exception e) {
+				try {
+					cleanup();
+				} catch (Exception cleanupException) {
+					e.addSuppressed(cleanupException);
+				}
+
 				// registers the exception and tries to fail the whole task
 				AsynchronousException asyncException = new AsynchronousException(
 					new Exception(
 						"Could not materialize checkpoint " + checkpointMetaData.getCheckpointId() +
 							" for operator " + owner.getName() + '.',
 						e));
+
 				owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException);
 			} finally {
 				owner.cancelables.unregisterClosable(this);
@@ -967,17 +975,37 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 		@Override
 		public void close() {
-			// cleanup/release ongoing snapshot operations
-			for (OperatorSnapshotResult snapshotResult : snapshotInProgressList) {
-				if (null != snapshotResult) {
+			try {
+				cleanup();
+			} catch (Exception cleanupException) {
+				LOG.warn("Could not properly clean up the async checkpoint runnable.", cleanupException);
+			}
+		}
+
+		private void cleanup() throws Exception {
+			Exception exception = null;
+
+			// clean up ongoing operator snapshot results and non partitioned state handles
+			for (OperatorSnapshotResult operatorSnapshotResult : snapshotInProgressList) {
+				if (operatorSnapshotResult != null) {
 					try {
-						snapshotResult.cancel();
-					} catch (Exception e) {
-						LOG.warn("Could not properly cancel operator snapshot result in async " +
-							"checkpoint runnable.", e);
+						operatorSnapshotResult.cancel();
+					} catch (Exception cancelException) {
+						exception = ExceptionUtils.firstOrSuppressed(cancelException, exception);
 					}
 				}
 			}
+
+			// discard non partitioned state handles
+			try {
+				StateUtil.bestEffortDiscardAllStateObjects(nonPartitionedStateHandles);
+			} catch (Exception discardException) {
+				exception = ExceptionUtils.firstOrSuppressed(discardException, exception);
+			}
+
+			if (null != exception) {
+				throw exception;
+			}
 		}
 	}
 
@@ -1057,6 +1085,18 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 						}
 					}
 
+					// Cleanup non partitioned state handles
+					for (StreamStateHandle nonPartitionedState : nonPartitionedStates) {
+						if (nonPartitionedState != null) {
+							try {
+								nonPartitionedState.discardState();
+							} catch (Exception e) {
+								LOG.warn("Could not properly discard a non partitioned " +
+									"state. This might leave some orphaned files behind.", e);
+							}
+						}
+					}
+
 					if (LOG.isDebugEnabled()) {
 						LOG.debug("{} - did NOT finish synchronous part of checkpoint {}." +
 								"Alignment duration: {} ms, snapshot duration {} ms",

http://git-wip-us.apache.org/repos/asf/flink/blob/840b779c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index b55c288..ffdb09d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -20,15 +20,19 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import akka.dispatch.Futures;
 
+import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.Environment;
@@ -51,9 +55,12 @@ import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateBackendFactory;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
@@ -65,16 +72,20 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
 import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import scala.concurrent.Await;
@@ -90,7 +101,9 @@ import java.net.URL;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.PriorityQueue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -98,10 +111,14 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
 
-public class StreamTaskTest {
+public class StreamTaskTest extends TestLogger {
 
 	private static OneShotLatch SYNC_LATCH;
 
@@ -170,8 +187,8 @@ public class StreamTaskTest {
 		task.getExecutingThread().join();
 
 		// ensure that the state backends are closed
-		Mockito.verify(StateBackendTestSource.operatorStateBackend).close();
-		Mockito.verify(StateBackendTestSource.keyedStateBackend).close();
+		verify(StateBackendTestSource.operatorStateBackend).close();
+		verify(StateBackendTestSource.keyedStateBackend).close();
 
 		assertEquals(ExecutionState.FINISHED, task.getExecutionState());
 	}
@@ -194,8 +211,8 @@ public class StreamTaskTest {
 		task.getExecutingThread().join();
 
 		// ensure that the state backends are closed
-		Mockito.verify(StateBackendTestSource.operatorStateBackend).close();
-		Mockito.verify(StateBackendTestSource.keyedStateBackend).close();
+		verify(StateBackendTestSource.operatorStateBackend).close();
+		verify(StateBackendTestSource.keyedStateBackend).close();
 
 		assertEquals(ExecutionState.FAILED, task.getExecutionState());
 	}
@@ -240,6 +257,161 @@ public class StreamTaskTest {
 		assertEquals(ExecutionState.CANCELED, task.getExecutionState());
 	}
 
+	@Test
+	public void testFailingCheckpointStreamOperator() throws Exception {
+		final long checkpointId = 42L;
+		final long timestamp = 1L;
+
+		TaskInfo mockTaskInfo = mock(TaskInfo.class);
+		when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
+		when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
+		Environment mockEnvironment = mock(Environment.class);
+		when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
+
+		StreamTask<?, AbstractStreamOperator<?>> streamTask = mock(StreamTask.class, Mockito.CALLS_REAL_METHODS);
+		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
+		streamTask.setEnvironment(mockEnvironment);
+
+		StreamOperator<?> streamOperator1 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+		StreamOperator<?> streamOperator2 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+		StreamOperator<?> streamOperator3 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+
+		OperatorSnapshotResult operatorSnapshotResult1 = mock(OperatorSnapshotResult.class);
+		OperatorSnapshotResult operatorSnapshotResult2 = mock(OperatorSnapshotResult.class);
+
+		final Exception testException = new Exception("Test exception");
+
+		when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult1);
+		when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult2);
+		when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenThrow(testException);
+
+		StreamOperator<?>[] streamOperators = {streamOperator1, streamOperator2, streamOperator3};
+
+		OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
+		when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+
+		StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class);
+		StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class);
+		StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class);
+
+		CheckpointStreamFactory.CheckpointStateOutputStream outStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+		CheckpointStreamFactory.CheckpointStateOutputStream outStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+		CheckpointStreamFactory.CheckpointStateOutputStream outStream3 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+
+		when(outStream1.closeAndGetHandle()).thenReturn(streamStateHandle1);
+		when(outStream2.closeAndGetHandle()).thenReturn(streamStateHandle2);
+		when(outStream3.closeAndGetHandle()).thenReturn(streamStateHandle3);
+
+		CheckpointStreamFactory mockStreamFactory = mock(CheckpointStreamFactory.class);
+		when(mockStreamFactory.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn(
+			outStream1, outStream2, outStream3);
+
+		AbstractStateBackend mockStateBackend = mock(AbstractStateBackend.class);
+		when(mockStateBackend.createStreamFactory(any(JobID.class), anyString())).thenReturn(mockStreamFactory);
+
+		Whitebox.setInternalState(streamTask, "isRunning", true);
+		Whitebox.setInternalState(streamTask, "lock", new Object());
+		Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
+		Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
+		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
+		Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
+
+		try {
+			streamTask.triggerCheckpoint(checkpointMetaData);
+			fail("Expected test exception here.");
+		} catch (Exception e) {
+			assertEquals(testException, e.getCause());
+		}
+
+		verify(operatorSnapshotResult1).cancel();
+		verify(operatorSnapshotResult2).cancel();
+
+		verify(streamStateHandle1).discardState();
+		verify(streamStateHandle2).discardState();
+		verify(streamStateHandle3).discardState();
+	}
+
+	/**
+	 * Tests that in case of a failing AsyncCheckpointRunnable all operator snapshot results are
+	 * cancelled and all non partitioned state handles are discarded.
+	 */
+	@Test
+	public void testFailingAsyncCheckpointRunnable() throws Exception {
+		final long checkpointId = 42L;
+		final long timestamp = 1L;
+
+		TaskInfo mockTaskInfo = mock(TaskInfo.class);
+		when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
+		when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
+		Environment mockEnvironment = mock(Environment.class);
+		when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
+
+		StreamTask<?, AbstractStreamOperator<?>> streamTask = mock(StreamTask.class, Mockito.CALLS_REAL_METHODS);
+		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
+		streamTask.setEnvironment(mockEnvironment);
+
+		StreamOperator<?> streamOperator1 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+		StreamOperator<?> streamOperator2 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+		StreamOperator<?> streamOperator3 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+
+		OperatorSnapshotResult operatorSnapshotResult1 = mock(OperatorSnapshotResult.class);
+		OperatorSnapshotResult operatorSnapshotResult2 = mock(OperatorSnapshotResult.class);
+		OperatorSnapshotResult operatorSnapshotResult3 = mock(OperatorSnapshotResult.class);
+
+		RunnableFuture<OperatorStateHandle> failingFuture = mock(RunnableFuture.class);
+		when(failingFuture.get()).thenThrow(new ExecutionException(new Exception("Test exception")));
+
+		when(operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn(failingFuture);
+
+		when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult1);
+		when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult2);
+		when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult3);
+
+		StreamOperator<?>[] streamOperators = {streamOperator1, streamOperator2, streamOperator3};
+
+		OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
+		when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+
+		StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class);
+		StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class);
+		StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class);
+
+		CheckpointStreamFactory.CheckpointStateOutputStream outStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+		CheckpointStreamFactory.CheckpointStateOutputStream outStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+		CheckpointStreamFactory.CheckpointStateOutputStream outStream3 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+
+		when(outStream1.closeAndGetHandle()).thenReturn(streamStateHandle1);
+		when(outStream2.closeAndGetHandle()).thenReturn(streamStateHandle2);
+		when(outStream3.closeAndGetHandle()).thenReturn(streamStateHandle3);
+
+		CheckpointStreamFactory mockStreamFactory = mock(CheckpointStreamFactory.class);
+		when(mockStreamFactory.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn(
+			outStream1, outStream2, outStream3);
+
+		AbstractStateBackend mockStateBackend = mock(AbstractStateBackend.class);
+		when(mockStateBackend.createStreamFactory(any(JobID.class), anyString())).thenReturn(mockStreamFactory);
+
+		Whitebox.setInternalState(streamTask, "isRunning", true);
+		Whitebox.setInternalState(streamTask, "lock", new Object());
+		Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
+		Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
+		Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", MoreExecutors.newDirectExecutorService());
+		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
+		Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
+
+		streamTask.triggerCheckpoint(checkpointMetaData);
+
+		verify(streamTask).handleAsyncException(anyString(), any(Throwable.class));
+
+		verify(operatorSnapshotResult1).cancel();
+		verify(operatorSnapshotResult2).cancel();
+		verify(operatorSnapshotResult3).cancel();
+
+		verify(streamStateHandle1).discardState();
+		verify(streamStateHandle2).discardState();
+		verify(streamStateHandle3).discardState();
+	}
+
 	// ------------------------------------------------------------------------
 	//  Test Utilities
 	// ------------------------------------------------------------------------