You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/07/05 08:40:22 UTC

[flink] 02/04: [hotfix][tests] Remove problematic mocking from StreamTaskTest#testAsyncCheckpointingConcurrentCloseBeforeAcknowledge

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ec0fdca096bbe046c7b559bdc324f7ecd027e369
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Jul 3 14:12:25 2019 +0200

    [hotfix][tests] Remove problematic mocking from StreamTaskTest#testAsyncCheckpointingConcurrentCloseBeforeAcknowledge
    
    The problem was the whenNew mocking which could lead to a StackOverflowException in the
    StreamTask#AsyncCheckpointRunnable.
---
 .../streaming/runtime/tasks/StreamTaskTest.java    | 247 ++++++++++++++++-----
 1 file changed, 197 insertions(+), 50 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 4ed23c7..189dd3d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
@@ -36,6 +37,7 @@ import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.CancelTaskException;
@@ -57,6 +59,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
@@ -66,15 +69,18 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.OperatorStreamStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StateBackendFactory;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TaskStateManagerImpl;
@@ -94,7 +100,6 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
-import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -104,6 +109,7 @@ import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.SupplierWithException;
@@ -124,15 +130,19 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning;
 import static org.hamcrest.Matchers.everyItem;
@@ -156,7 +166,6 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
 
 /**
  * Tests for {@link StreamTask}.
@@ -516,71 +525,56 @@ public class StreamTaskTest extends TestLogger {
 	@Test
 	public void testAsyncCheckpointingConcurrentCloseBeforeAcknowledge() throws Exception {
 
-		final OneShotLatch createSubtask = new OneShotLatch();
-		final OneShotLatch completeSubtask = new OneShotLatch();
-
-		whenNew(OperatorSnapshotFinalizer.class).
-			withAnyArguments().
-			thenAnswer((Answer<OperatorSnapshotFinalizer>) invocation -> {
-					createSubtask.trigger();
-					completeSubtask.await();
-					Object[] arguments = invocation.getArguments();
-					return new OperatorSnapshotFinalizer((OperatorSnapshotFutures) arguments[0]);
-				}
-			);
-
-		KeyedStateHandle managedKeyedStateHandle = mock(KeyedStateHandle.class);
-		KeyedStateHandle rawKeyedStateHandle = mock(KeyedStateHandle.class);
-		OperatorStateHandle managedOperatorStateHandle = mock(OperatorStreamStateHandle.class);
-		OperatorStateHandle rawOperatorStateHandle = mock(OperatorStreamStateHandle.class);
+		final TestingKeyedStateHandle managedKeyedStateHandle = new TestingKeyedStateHandle();
+		final TestingKeyedStateHandle rawKeyedStateHandle = new TestingKeyedStateHandle();
+		final TestingOperatorStateHandle managedOperatorStateHandle = new TestingOperatorStateHandle();
+		final TestingOperatorStateHandle rawOperatorStateHandle = new TestingOperatorStateHandle();
 
+		final BlockingRunnableFuture<SnapshotResult<KeyedStateHandle>> rawKeyedStateHandleFuture = new BlockingRunnableFuture<>(2, SnapshotResult.of(rawKeyedStateHandle));
 		OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures(
 			DoneFuture.of(SnapshotResult.of(managedKeyedStateHandle)),
-			DoneFuture.of(SnapshotResult.of(rawKeyedStateHandle)),
+			rawKeyedStateHandleFuture,
 			DoneFuture.of(SnapshotResult.of(managedOperatorStateHandle)),
 			DoneFuture.of(SnapshotResult.of(rawOperatorStateHandle)));
 
 		final StreamOperator<?> streamOperator = streamOperatorWithSnapshot(operatorSnapshotResult);
 
-		try (MockEnvironment mockEnvironment = spy(new MockEnvironmentBuilder().build())) {
+		final AcknowledgeDummyEnvironment mockEnvironment = new AcknowledgeDummyEnvironment();
 
-			RunningTask<MockStreamTask> task = runTask(() -> new MockStreamTask(
-				mockEnvironment,
-				operatorChain(streamOperator)));
-
-			waitTaskIsRunning(task.streamTask, task.invocationFuture);
-
-			final long checkpointId = 42L;
-			task.streamTask.triggerCheckpoint(
-				new CheckpointMetaData(checkpointId, 1L),
-				CheckpointOptions.forCheckpointWithDefaultLocation(),
-				false);
+		RunningTask<MockStreamTask> task = runTask(() -> new MockStreamTask(
+			mockEnvironment,
+			operatorChain(streamOperator)));
 
-			createSubtask.await();
+		waitTaskIsRunning(task.streamTask, task.invocationFuture);
 
-			task.streamTask.cancel();
+		final long checkpointId = 42L;
+		task.streamTask.triggerCheckpoint(
+			new CheckpointMetaData(checkpointId, 1L),
+			CheckpointOptions.forCheckpointWithDefaultLocation(),
+			false);
 
-			completeSubtask.trigger();
+		rawKeyedStateHandleFuture.awaitRun();
 
-			// wait for the completion of the async task
-			ExecutorService executor = task.streamTask.getAsyncOperationsThreadPool();
-			executor.shutdown();
-			if (!executor.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
-				fail("Executor did not shut down within the given timeout. This indicates that the " +
-					"checkpointing did not resume.");
-			}
+		task.streamTask.cancel();
 
-			// check that the checkpoint has not been acknowledged
-			verify(mockEnvironment, never()).acknowledgeCheckpoint(eq(checkpointId), any(CheckpointMetrics.class), any(TaskStateSnapshot.class));
+		final FutureUtils.ConjunctFuture<Void> discardFuture = FutureUtils.waitForAll(
+			Arrays.asList(
+				managedKeyedStateHandle.getDiscardFuture(),
+				rawKeyedStateHandle.getDiscardFuture(),
+				managedOperatorStateHandle.getDiscardFuture(),
+				rawOperatorStateHandle.getDiscardFuture()));
 
-			// check that the state handles have been discarded
-			verify(managedKeyedStateHandle).discardState();
-			verify(rawKeyedStateHandle).discardState();
-			verify(managedOperatorStateHandle).discardState();
-			verify(rawOperatorStateHandle).discardState();
+		// make sure that all state handles have been discarded
+		discardFuture.get();
 
-			task.waitForTaskCompletion(true);
+		try {
+			mockEnvironment.getAcknowledgeCheckpointFuture().get(10L, TimeUnit.MILLISECONDS);
+			fail("The checkpoint should not get acknowledged.");
+		} catch (TimeoutException expected) {
+			// future should not be completed
 		}
+
+		task.waitForTaskCompletion(true);
 	}
 
 	/**
@@ -1308,4 +1302,157 @@ public class StreamTaskTest extends TestLogger {
 			super.initializeState(context);
 		}
 	}
+
+	private static class TestingKeyedStateHandle implements KeyedStateHandle {
+
+		private static final long serialVersionUID = -2473861305282291582L;
+
+		private final transient CompletableFuture<Void> discardFuture = new CompletableFuture<>();
+
+		public CompletableFuture<Void> getDiscardFuture() {
+			return discardFuture;
+		}
+
+		@Override
+		public KeyGroupRange getKeyGroupRange() {
+			return KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
+		}
+
+		@Override
+		public TestingKeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+			return this;
+		}
+
+		@Override
+		public void registerSharedStates(SharedStateRegistry stateRegistry) {
+
+		}
+
+		@Override
+		public void discardState() {
+			discardFuture.complete(null);
+		}
+
+		@Override
+		public long getStateSize() {
+			return 0L;
+		}
+	}
+
+	private static class TestingOperatorStateHandle implements OperatorStateHandle {
+
+		private static final long serialVersionUID = 923794934187614088L;
+
+		private final transient CompletableFuture<Void> discardFuture = new CompletableFuture<>();
+
+		public CompletableFuture<Void> getDiscardFuture() {
+			return discardFuture;
+		}
+
+		@Override
+		public Map<String, StateMetaInfo> getStateNameToPartitionOffsets() {
+			return Collections.emptyMap();
+		}
+
+		@Override
+		public FSDataInputStream openInputStream() throws IOException {
+			throw new IOException("Cannot open input streams in testing implementation.");
+		}
+
+		@Override
+		public StreamStateHandle getDelegateStateHandle() {
+			throw new UnsupportedOperationException("Not implemented.");
+		}
+
+		@Override
+		public void discardState() throws Exception {
+			discardFuture.complete(null);
+		}
+
+		@Override
+		public long getStateSize() {
+			return 0L;
+		}
+	}
+
+	private static class AcknowledgeDummyEnvironment extends DummyEnvironment {
+
+		private final CompletableFuture<Long> acknowledgeCheckpointFuture = new CompletableFuture<>();
+
+		public CompletableFuture<Long> getAcknowledgeCheckpointFuture() {
+			return acknowledgeCheckpointFuture;
+		}
+
+		@Override
+		public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics) {
+			acknowledgeCheckpointFuture.complete(checkpointId);
+		}
+
+		@Override
+		public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
+			acknowledgeCheckpointFuture.complete(checkpointId);
+		}
+	}
+
+	private static final class BlockingRunnableFuture<V> implements RunnableFuture<V> {
+
+		private final CompletableFuture<V> future = new CompletableFuture<>();
+
+		private final OneShotLatch signalRunLatch = new OneShotLatch();
+
+		private final CountDownLatch continueRunLatch;
+
+		private final V value;
+
+		private BlockingRunnableFuture(int parties, V value) {
+			this.continueRunLatch = new CountDownLatch(parties);
+			this.value = value;
+		}
+
+		@Override
+		public void run() {
+			signalRunLatch.trigger();
+			continueRunLatch.countDown();
+
+			try {
+				// poor man's barrier because it can happen that the async operations thread gets
+				// interrupted by the mail box thread. The CyclicBarrier would in this case fail
+				// all participants of the barrier, leaving the future uncompleted
+				continueRunLatch.await();
+			} catch (InterruptedException e) {
+				ExceptionUtils.rethrow(e);
+			}
+
+			future.complete(value);
+		}
+
+		@Override
+		public boolean cancel(boolean mayInterruptIfRunning) {
+			return false;
+		}
+
+		@Override
+		public boolean isCancelled() {
+			return future.isCancelled();
+		}
+
+		@Override
+		public boolean isDone() {
+			return future.isDone();
+		}
+
+		@Override
+		public V get() throws InterruptedException, ExecutionException {
+			return future.get();
+		}
+
+		@Override
+		public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+			return future.get(timeout, unit);
+		}
+
+		void awaitRun() throws InterruptedException {
+			signalRunLatch.await();
+		}
+	}
 }