You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/12/14 15:51:02 UTC

flink git commit: [FLINK-5240][tests] ensure state backends are properly closed

Repository: flink
Updated Branches:
  refs/heads/master 8038ae4c8 -> bf2874e22


[FLINK-5240][tests] ensure state backends are properly closed

This adds additional test cases to verify the state backends are closed
properly upon the end of a task. The state backends should always be
closed regardless of the final state of the task.

This closes #2997.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bf2874e2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bf2874e2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bf2874e2

Branch: refs/heads/master
Commit: bf2874e22a41ae195ea162f4e9c31e90a42a4c1a
Parents: 8038ae4
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Dec 13 15:21:31 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Dec 14 16:48:57 2016 +0100

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTaskTest.java | 115 ++++++++++++++++++-
 1 file changed, 111 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bf2874e2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index d04c456..b55c288 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -22,6 +22,7 @@ import akka.dispatch.Futures;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -47,7 +49,10 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.StateBackendFactory;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
@@ -61,6 +66,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.ExceptionUtils;
@@ -68,6 +74,9 @@ import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
 
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
@@ -144,7 +153,7 @@ public class StreamTaskTest {
 	}
 
 	@Test
-	public void testStateBackendLoading() throws Exception {
+	public void testStateBackendLoadingAndClosing() throws Exception {
 		Configuration taskManagerConfig = new Configuration();
 		taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, MockStateBackend.class.getName());
 
@@ -152,16 +161,46 @@ public class StreamTaskTest {
 		cfg.setStreamOperator(new StreamSource<>(new MockSourceFunction()));
 		cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-		Task task = createTask(SourceStreamTask.class, cfg, taskManagerConfig);
+		Task task = createTask(StateBackendTestSource.class, cfg, taskManagerConfig);
 
+		StateBackendTestSource.fail = false;
 		task.startTaskThread();
 
 		// wait for clean termination
 		task.getExecutingThread().join();
+
+		// ensure that the state backends are closed
+		Mockito.verify(StateBackendTestSource.operatorStateBackend).close();
+		Mockito.verify(StateBackendTestSource.keyedStateBackend).close();
+
 		assertEquals(ExecutionState.FINISHED, task.getExecutionState());
 	}
 
 	@Test
+	public void testStateBackendClosingOnFailure() throws Exception {
+		Configuration taskManagerConfig = new Configuration();
+		taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, MockStateBackend.class.getName());
+
+		StreamConfig cfg = new StreamConfig(new Configuration());
+		cfg.setStreamOperator(new StreamSource<>(new MockSourceFunction()));
+		cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		Task task = createTask(StateBackendTestSource.class, cfg, taskManagerConfig);
+
+		StateBackendTestSource.fail = true;
+		task.startTaskThread();
+
+		// wait for clean termination
+		task.getExecutingThread().join();
+
+		// ensure that the state backends are closed
+		Mockito.verify(StateBackendTestSource.operatorStateBackend).close();
+		Mockito.verify(StateBackendTestSource.keyedStateBackend).close();
+
+		assertEquals(ExecutionState.FAILED, task.getExecutionState());
+	}
+
+	@Test
 	public void testCancellationNotBlockedOnLock() throws Exception {
 		SYNC_LATCH = new OneShotLatch();
 
@@ -358,12 +397,42 @@ public class StreamTaskTest {
 		public void cancel() {}
 	}
 
+	/**
+	 * Mocked state backend factory which returns mocks for the operator and keyed state backends.
+	 */
 	public static final class MockStateBackend implements StateBackendFactory<AbstractStateBackend> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
 		public AbstractStateBackend createFromConfig(Configuration config) throws Exception {
-			return mock(AbstractStateBackend.class);
+			AbstractStateBackend stateBackendMock = mock(AbstractStateBackend.class);
+
+			Mockito.when(stateBackendMock.createOperatorStateBackend(
+					Mockito.any(Environment.class),
+					Mockito.any(String.class)))
+				.thenAnswer(new Answer<OperatorStateBackend>() {
+					@Override
+					public OperatorStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
+						return Mockito.mock(OperatorStateBackend.class);
+					}
+				});
+
+			Mockito.when(stateBackendMock.createKeyedStateBackend(
+					Mockito.any(Environment.class),
+					Mockito.any(JobID.class),
+					Mockito.any(String.class),
+					Mockito.any(TypeSerializer.class),
+					Mockito.any(int.class),
+					Mockito.any(KeyGroupRange.class),
+					Mockito.any(TaskKvStateRegistry.class)))
+				.thenAnswer(new Answer<AbstractKeyedStateBackend>() {
+					@Override
+					public AbstractKeyedStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
+						return Mockito.mock(AbstractKeyedStateBackend.class);
+					}
+				});
+
+			return stateBackendMock;
 		}
 	}
 
@@ -371,6 +440,44 @@ public class StreamTaskTest {
 	// ------------------------------------------------------------------------
 
 	/**
+	 * Source that instantiates the operator state backend and the keyed state backend.
+	 * The created state backends can be retrieved from the static fields to check if the
+	 * CloseableRegistry closed them correctly.
+	 */
+	public static class StateBackendTestSource extends StreamTask<Long, StreamSource<Long, SourceFunction<Long>>> {
+
+		private static volatile boolean fail;
+
+		private static volatile OperatorStateBackend operatorStateBackend;
+		private static volatile AbstractKeyedStateBackend keyedStateBackend;
+
+		@Override
+		protected void init() throws Exception {
+			operatorStateBackend = createOperatorStateBackend(
+				Mockito.mock(StreamOperator.class),
+				null);
+			keyedStateBackend = createKeyedStateBackend(
+				Mockito.mock(TypeSerializer.class),
+				4,
+				Mockito.mock(KeyGroupRange.class));
+		}
+
+		@Override
+		protected void run() throws Exception {
+			if (fail) {
+				throw new RuntimeException();
+			}
+		}
+
+		@Override
+		protected void cleanup() throws Exception {}
+
+		@Override
+		protected void cancelTask() throws Exception {}
+
+	}
+
+	/**
 	 * A task that locks if cancellation attempts to cleanly shut down 
 	 */
 	public static class CancelLockingTask extends StreamTask<String, AbstractStreamOperator<String>> {
@@ -438,7 +545,7 @@ public class StreamTaskTest {
 
 				// we are at the point where cancelling can happen
 				SYNC_LATCH.trigger();
-	
+
 				// try to acquire the lock - this is not possible as long as the lock holder
 				// thread lives
 				//noinspection SynchronizationOnLocalVariableOrMethodParameter