You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/12/14 15:51:02 UTC
flink git commit: [FLINK-5240][tests] ensure state backends are
properly closed
Repository: flink
Updated Branches:
refs/heads/master 8038ae4c8 -> bf2874e22
[FLINK-5240][tests] ensure state backends are properly closed
This adds additional test cases to verify the state backends are closed
properly upon the end of a task. The state backends should always be
closed regardless of the final state of the task.
This closes #2997.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bf2874e2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bf2874e2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bf2874e2
Branch: refs/heads/master
Commit: bf2874e22a41ae195ea162f4e9c31e90a42a4c1a
Parents: 8038ae4
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Dec 13 15:21:31 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Dec 14 16:48:57 2016 +0100
----------------------------------------------------------------------
.../streaming/runtime/tasks/StreamTaskTest.java | 115 ++++++++++++++++++-
1 file changed, 111 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bf2874e2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index d04c456..b55c288 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -22,6 +22,7 @@ import akka.dispatch.Futures;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -47,7 +49,10 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
@@ -61,6 +66,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.ExceptionUtils;
@@ -68,6 +74,9 @@ import org.apache.flink.util.SerializedValue;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
@@ -144,7 +153,7 @@ public class StreamTaskTest {
}
@Test
- public void testStateBackendLoading() throws Exception {
+ public void testStateBackendLoadingAndClosing() throws Exception {
Configuration taskManagerConfig = new Configuration();
taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, MockStateBackend.class.getName());
@@ -152,16 +161,46 @@ public class StreamTaskTest {
cfg.setStreamOperator(new StreamSource<>(new MockSourceFunction()));
cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
- Task task = createTask(SourceStreamTask.class, cfg, taskManagerConfig);
+ Task task = createTask(StateBackendTestSource.class, cfg, taskManagerConfig);
+ StateBackendTestSource.fail = false;
task.startTaskThread();
// wait for clean termination
task.getExecutingThread().join();
+
+ // ensure that the state backends are closed
+ Mockito.verify(StateBackendTestSource.operatorStateBackend).close();
+ Mockito.verify(StateBackendTestSource.keyedStateBackend).close();
+
assertEquals(ExecutionState.FINISHED, task.getExecutionState());
}
@Test
+ public void testStateBackendClosingOnFailure() throws Exception {
+ Configuration taskManagerConfig = new Configuration();
+ taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, MockStateBackend.class.getName());
+
+ StreamConfig cfg = new StreamConfig(new Configuration());
+ cfg.setStreamOperator(new StreamSource<>(new MockSourceFunction()));
+ cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ Task task = createTask(StateBackendTestSource.class, cfg, taskManagerConfig);
+
+ StateBackendTestSource.fail = true;
+ task.startTaskThread();
+
+ // wait for clean termination
+ task.getExecutingThread().join();
+
+ // ensure that the state backends are closed
+ Mockito.verify(StateBackendTestSource.operatorStateBackend).close();
+ Mockito.verify(StateBackendTestSource.keyedStateBackend).close();
+
+ assertEquals(ExecutionState.FAILED, task.getExecutionState());
+ }
+
+ @Test
public void testCancellationNotBlockedOnLock() throws Exception {
SYNC_LATCH = new OneShotLatch();
@@ -358,12 +397,42 @@ public class StreamTaskTest {
public void cancel() {}
}
+ /**
+ * Mocked state backend factory which returns mocks for the operator and keyed state backends.
+ */
public static final class MockStateBackend implements StateBackendFactory<AbstractStateBackend> {
private static final long serialVersionUID = 1L;
@Override
public AbstractStateBackend createFromConfig(Configuration config) throws Exception {
- return mock(AbstractStateBackend.class);
+ AbstractStateBackend stateBackendMock = mock(AbstractStateBackend.class);
+
+ Mockito.when(stateBackendMock.createOperatorStateBackend(
+ Mockito.any(Environment.class),
+ Mockito.any(String.class)))
+ .thenAnswer(new Answer<OperatorStateBackend>() {
+ @Override
+ public OperatorStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return Mockito.mock(OperatorStateBackend.class);
+ }
+ });
+
+ Mockito.when(stateBackendMock.createKeyedStateBackend(
+ Mockito.any(Environment.class),
+ Mockito.any(JobID.class),
+ Mockito.any(String.class),
+ Mockito.any(TypeSerializer.class),
+ Mockito.any(int.class),
+ Mockito.any(KeyGroupRange.class),
+ Mockito.any(TaskKvStateRegistry.class)))
+ .thenAnswer(new Answer<AbstractKeyedStateBackend>() {
+ @Override
+ public AbstractKeyedStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return Mockito.mock(AbstractKeyedStateBackend.class);
+ }
+ });
+
+ return stateBackendMock;
}
}
@@ -371,6 +440,44 @@ public class StreamTaskTest {
// ------------------------------------------------------------------------
/**
+ * Source that instantiates the operator state backend and the keyed state backend.
+ * The created state backends can be retrieved from the static fields to check if the
+ * CloseableRegistry closed them correctly.
+ */
+ public static class StateBackendTestSource extends StreamTask<Long, StreamSource<Long, SourceFunction<Long>>> {
+
+ private static volatile boolean fail;
+
+ private static volatile OperatorStateBackend operatorStateBackend;
+ private static volatile AbstractKeyedStateBackend keyedStateBackend;
+
+ @Override
+ protected void init() throws Exception {
+ operatorStateBackend = createOperatorStateBackend(
+ Mockito.mock(StreamOperator.class),
+ null);
+ keyedStateBackend = createKeyedStateBackend(
+ Mockito.mock(TypeSerializer.class),
+ 4,
+ Mockito.mock(KeyGroupRange.class));
+ }
+
+ @Override
+ protected void run() throws Exception {
+ if (fail) {
+ throw new RuntimeException();
+ }
+ }
+
+ @Override
+ protected void cleanup() throws Exception {}
+
+ @Override
+ protected void cancelTask() throws Exception {}
+
+ }
+
+ /**
* A task that locks if cancellation attempts to cleanly shut down
*/
public static class CancelLockingTask extends StreamTask<String, AbstractStreamOperator<String>> {
@@ -438,7 +545,7 @@ public class StreamTaskTest {
// we are at the point where cancelling can happen
SYNC_LATCH.trigger();
-
+
// try to acquire the lock - this is not possible as long as the lock holder
// thread lives
//noinspection SynchronizationOnLocalVariableOrMethodParameter