You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/01/23 16:37:32 UTC
[2/2] flink git commit: [FLINK-5229] [state] Cleanup of operator
snapshots if subsequent operator snapshots fail
[FLINK-5229] [state] Cleanup of operator snapshots if subsequent operator snapshots fail
This PR adds operator state cleanup to the StreamTask class. If a stream task contains multiple
stream operators, then every operator is checkpointed. In case that a snapshot operation fails
all state handles and OperatorSnapshotResults belonging to previous operators have to be freed.
Add test cases for failing checkpoint operations in StreamTask
Address PR comments
This closes #3183.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/840b779c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/840b779c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/840b779c
Branch: refs/heads/release-1.2
Commit: 840b779c542462cf7bd4bed40620dd68e90ec6bd
Parents: 006fcc4
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Jan 20 14:28:44 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jan 23 17:36:55 2017 +0100
----------------------------------------------------------------------
.../apache/flink/runtime/state/StateUtil.java | 21 +++
.../api/operators/OperatorSnapshotResult.java | 22 +--
.../streaming/runtime/tasks/StreamTask.java | 56 +++++-
.../streaming/runtime/tasks/StreamTaskTest.java | 182 ++++++++++++++++++-
4 files changed, 251 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/840b779c/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
index a4799bf..19afdec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
@@ -18,6 +18,10 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.util.FutureUtil;
+
+import java.util.concurrent.RunnableFuture;
+
/**
* Helpers for {@link StateObject} related code.
*/
@@ -61,4 +65,21 @@ public class StateUtil {
}
}
}
+
+ /**
+ * Discards the given state future by first trying to cancel it. If this is not possible, then
+ * the state object contained in the future is calculated and afterwards discarded.
+ *
+ * @param stateFuture to be discarded
+ * @throws Exception if the discard operation failed
+ */
+ public static void discardStateFuture(RunnableFuture<? extends StateObject> stateFuture) throws Exception {
+ if (null != stateFuture) {
+ if (!stateFuture.cancel(true)) {
+ StateObject stateObject = FutureUtil.runIfNotDoneAndGet(stateFuture);
+
+ stateObject.discardState();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/840b779c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
index 913928f..5a6c37b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
@@ -20,9 +20,8 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FutureUtil;
import java.util.concurrent.RunnableFuture;
@@ -87,7 +86,7 @@ public class OperatorSnapshotResult {
Exception exception = null;
try {
- cancelIfNotNull(getKeyedStateManagedFuture());
+ StateUtil.discardStateFuture(getKeyedStateManagedFuture());
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(
new Exception("Could not properly cancel managed keyed state future.", e),
@@ -95,7 +94,7 @@ public class OperatorSnapshotResult {
}
try {
- cancelIfNotNull(getOperatorStateManagedFuture());
+ StateUtil.discardStateFuture(getOperatorStateManagedFuture());
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(
new Exception("Could not properly cancel managed operator state future.", e),
@@ -103,7 +102,7 @@ public class OperatorSnapshotResult {
}
try {
- cancelIfNotNull(getKeyedStateRawFuture());
+ StateUtil.discardStateFuture(getKeyedStateRawFuture());
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(
new Exception("Could not properly cancel raw keyed state future.", e),
@@ -111,7 +110,7 @@ public class OperatorSnapshotResult {
}
try {
- cancelIfNotNull(getOperatorStateRawFuture());
+ StateUtil.discardStateFuture(getOperatorStateRawFuture());
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(
new Exception("Could not properly cancel raw operator state future.", e),
@@ -122,15 +121,4 @@ public class OperatorSnapshotResult {
throw exception;
}
}
-
- private static <T extends StreamStateHandle> void cancelIfNotNull(RunnableFuture<T> future) throws Exception {
- if (null != future) {
- if (!future.cancel(true)) {
- // the cancellation was not successful because it might have been completed before
- StreamStateHandle streamStateHandle = FutureUtil.runIfNotDoneAndGet(future);
-
- streamStateHandle.discardState();
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/840b779c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 775475d..95f9d17 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackendFactory;
+import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
@@ -530,7 +531,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// propagate exceptions only if the task is still in "running" state
if (isRunning) {
throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() +
- "for operator " + getName() + '.', e);
+ " for operator " + getName() + '.', e);
} else {
LOG.debug("Could not perform checkpoint {} for operator {} while the " +
"invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e);
@@ -953,12 +954,19 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis);
}
} catch (Exception e) {
+ try {
+ cleanup();
+ } catch (Exception cleanupException) {
+ e.addSuppressed(cleanupException);
+ }
+
// registers the exception and tries to fail the whole task
AsynchronousException asyncException = new AsynchronousException(
new Exception(
"Could not materialize checkpoint " + checkpointMetaData.getCheckpointId() +
" for operator " + owner.getName() + '.',
e));
+
owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException);
} finally {
owner.cancelables.unregisterClosable(this);
@@ -967,17 +975,37 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
@Override
public void close() {
- // cleanup/release ongoing snapshot operations
- for (OperatorSnapshotResult snapshotResult : snapshotInProgressList) {
- if (null != snapshotResult) {
+ try {
+ cleanup();
+ } catch (Exception cleanupException) {
+ LOG.warn("Could not properly clean up the async checkpoint runnable.", cleanupException);
+ }
+ }
+
+ private void cleanup() throws Exception {
+ Exception exception = null;
+
+ // clean up ongoing operator snapshot results and non partitioned state handles
+ for (OperatorSnapshotResult operatorSnapshotResult : snapshotInProgressList) {
+ if (operatorSnapshotResult != null) {
try {
- snapshotResult.cancel();
- } catch (Exception e) {
- LOG.warn("Could not properly cancel operator snapshot result in async " +
- "checkpoint runnable.", e);
+ operatorSnapshotResult.cancel();
+ } catch (Exception cancelException) {
+ exception = ExceptionUtils.firstOrSuppressed(cancelException, exception);
}
}
}
+
+ // discard non partitioned state handles
+ try {
+ StateUtil.bestEffortDiscardAllStateObjects(nonPartitionedStateHandles);
+ } catch (Exception discardException) {
+ exception = ExceptionUtils.firstOrSuppressed(discardException, exception);
+ }
+
+ if (null != exception) {
+ throw exception;
+ }
}
}
@@ -1057,6 +1085,18 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
}
+ // Cleanup non partitioned state handles
+ for (StreamStateHandle nonPartitionedState : nonPartitionedStates) {
+ if (nonPartitionedState != null) {
+ try {
+ nonPartitionedState.discardState();
+ } catch (Exception e) {
+ LOG.warn("Could not properly discard a non partitioned " +
+ "state. This might leave some orphaned files behind.", e);
+ }
+ }
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("{} - did NOT finish synchronous part of checkpoint {}." +
"Alignment duration: {} ms, snapshot duration {} ms",
http://git-wip-us.apache.org/repos/asf/flink/blob/840b779c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index b55c288..ffdb09d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -20,15 +20,19 @@ package org.apache.flink.streaming.runtime.tasks;
import akka.dispatch.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.execution.Environment;
@@ -51,9 +55,12 @@ import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackendFactory;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
@@ -65,16 +72,20 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import scala.concurrent.Await;
@@ -90,7 +101,9 @@ import java.net.URL;
import java.util.Collections;
import java.util.Comparator;
import java.util.PriorityQueue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
@@ -98,10 +111,14 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
-public class StreamTaskTest {
+public class StreamTaskTest extends TestLogger {
private static OneShotLatch SYNC_LATCH;
@@ -170,8 +187,8 @@ public class StreamTaskTest {
task.getExecutingThread().join();
// ensure that the state backends are closed
- Mockito.verify(StateBackendTestSource.operatorStateBackend).close();
- Mockito.verify(StateBackendTestSource.keyedStateBackend).close();
+ verify(StateBackendTestSource.operatorStateBackend).close();
+ verify(StateBackendTestSource.keyedStateBackend).close();
assertEquals(ExecutionState.FINISHED, task.getExecutionState());
}
@@ -194,8 +211,8 @@ public class StreamTaskTest {
task.getExecutingThread().join();
// ensure that the state backends are closed
- Mockito.verify(StateBackendTestSource.operatorStateBackend).close();
- Mockito.verify(StateBackendTestSource.keyedStateBackend).close();
+ verify(StateBackendTestSource.operatorStateBackend).close();
+ verify(StateBackendTestSource.keyedStateBackend).close();
assertEquals(ExecutionState.FAILED, task.getExecutionState());
}
@@ -240,6 +257,161 @@ public class StreamTaskTest {
assertEquals(ExecutionState.CANCELED, task.getExecutionState());
}
+ @Test
+ public void testFailingCheckpointStreamOperator() throws Exception {
+ final long checkpointId = 42L;
+ final long timestamp = 1L;
+
+ TaskInfo mockTaskInfo = mock(TaskInfo.class);
+ when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
+ when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
+ Environment mockEnvironment = mock(Environment.class);
+ when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
+
+ StreamTask<?, AbstractStreamOperator<?>> streamTask = mock(StreamTask.class, Mockito.CALLS_REAL_METHODS);
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
+ streamTask.setEnvironment(mockEnvironment);
+
+ StreamOperator<?> streamOperator1 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+ StreamOperator<?> streamOperator2 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+ StreamOperator<?> streamOperator3 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+
+ OperatorSnapshotResult operatorSnapshotResult1 = mock(OperatorSnapshotResult.class);
+ OperatorSnapshotResult operatorSnapshotResult2 = mock(OperatorSnapshotResult.class);
+
+ final Exception testException = new Exception("Test exception");
+
+ when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult1);
+ when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult2);
+ when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenThrow(testException);
+
+ StreamOperator<?>[] streamOperators = {streamOperator1, streamOperator2, streamOperator3};
+
+ OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
+ when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+
+ StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class);
+ StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class);
+ StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class);
+
+ CheckpointStreamFactory.CheckpointStateOutputStream outStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+ CheckpointStreamFactory.CheckpointStateOutputStream outStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+ CheckpointStreamFactory.CheckpointStateOutputStream outStream3 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+
+ when(outStream1.closeAndGetHandle()).thenReturn(streamStateHandle1);
+ when(outStream2.closeAndGetHandle()).thenReturn(streamStateHandle2);
+ when(outStream3.closeAndGetHandle()).thenReturn(streamStateHandle3);
+
+ CheckpointStreamFactory mockStreamFactory = mock(CheckpointStreamFactory.class);
+ when(mockStreamFactory.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn(
+ outStream1, outStream2, outStream3);
+
+ AbstractStateBackend mockStateBackend = mock(AbstractStateBackend.class);
+ when(mockStateBackend.createStreamFactory(any(JobID.class), anyString())).thenReturn(mockStreamFactory);
+
+ Whitebox.setInternalState(streamTask, "isRunning", true);
+ Whitebox.setInternalState(streamTask, "lock", new Object());
+ Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
+ Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
+ Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
+ Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
+
+ try {
+ streamTask.triggerCheckpoint(checkpointMetaData);
+ fail("Expected test exception here.");
+ } catch (Exception e) {
+ assertEquals(testException, e.getCause());
+ }
+
+ verify(operatorSnapshotResult1).cancel();
+ verify(operatorSnapshotResult2).cancel();
+
+ verify(streamStateHandle1).discardState();
+ verify(streamStateHandle2).discardState();
+ verify(streamStateHandle3).discardState();
+ }
+
+ /**
+ * Tests that in case of a failing AsyncCheckpointRunnable all operator snapshot results are
+ * cancelled and all non partitioned state handles are discarded.
+ */
+ @Test
+ public void testFailingAsyncCheckpointRunnable() throws Exception {
+ final long checkpointId = 42L;
+ final long timestamp = 1L;
+
+ TaskInfo mockTaskInfo = mock(TaskInfo.class);
+ when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
+ when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
+ Environment mockEnvironment = mock(Environment.class);
+ when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
+
+ StreamTask<?, AbstractStreamOperator<?>> streamTask = mock(StreamTask.class, Mockito.CALLS_REAL_METHODS);
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
+ streamTask.setEnvironment(mockEnvironment);
+
+ StreamOperator<?> streamOperator1 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+ StreamOperator<?> streamOperator2 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+ StreamOperator<?> streamOperator3 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+
+ OperatorSnapshotResult operatorSnapshotResult1 = mock(OperatorSnapshotResult.class);
+ OperatorSnapshotResult operatorSnapshotResult2 = mock(OperatorSnapshotResult.class);
+ OperatorSnapshotResult operatorSnapshotResult3 = mock(OperatorSnapshotResult.class);
+
+ RunnableFuture<OperatorStateHandle> failingFuture = mock(RunnableFuture.class);
+ when(failingFuture.get()).thenThrow(new ExecutionException(new Exception("Test exception")));
+
+ when(operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn(failingFuture);
+
+ when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult1);
+ when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult2);
+ when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult3);
+
+ StreamOperator<?>[] streamOperators = {streamOperator1, streamOperator2, streamOperator3};
+
+ OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
+ when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+
+ StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class);
+ StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class);
+ StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class);
+
+ CheckpointStreamFactory.CheckpointStateOutputStream outStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+ CheckpointStreamFactory.CheckpointStateOutputStream outStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+ CheckpointStreamFactory.CheckpointStateOutputStream outStream3 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+
+ when(outStream1.closeAndGetHandle()).thenReturn(streamStateHandle1);
+ when(outStream2.closeAndGetHandle()).thenReturn(streamStateHandle2);
+ when(outStream3.closeAndGetHandle()).thenReturn(streamStateHandle3);
+
+ CheckpointStreamFactory mockStreamFactory = mock(CheckpointStreamFactory.class);
+ when(mockStreamFactory.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn(
+ outStream1, outStream2, outStream3);
+
+ AbstractStateBackend mockStateBackend = mock(AbstractStateBackend.class);
+ when(mockStateBackend.createStreamFactory(any(JobID.class), anyString())).thenReturn(mockStreamFactory);
+
+ Whitebox.setInternalState(streamTask, "isRunning", true);
+ Whitebox.setInternalState(streamTask, "lock", new Object());
+ Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
+ Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
+ Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", MoreExecutors.newDirectExecutorService());
+ Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
+ Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
+
+ streamTask.triggerCheckpoint(checkpointMetaData);
+
+ verify(streamTask).handleAsyncException(anyString(), any(Throwable.class));
+
+ verify(operatorSnapshotResult1).cancel();
+ verify(operatorSnapshotResult2).cancel();
+ verify(operatorSnapshotResult3).cancel();
+
+ verify(streamStateHandle1).discardState();
+ verify(streamStateHandle2).discardState();
+ verify(streamStateHandle3).discardState();
+ }
+
// ------------------------------------------------------------------------
// Test Utilities
// ------------------------------------------------------------------------