You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/07/05 08:40:22 UTC
[flink] 02/04: [hotfix][tests] Remove problematic mocking from
StreamTaskTest#testAsyncCheckpointingConcurrentCloseBeforeAcknowledge
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ec0fdca096bbe046c7b559bdc324f7ecd027e369
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Jul 3 14:12:25 2019 +0200
[hotfix][tests] Remove problematic mocking from StreamTaskTest#testAsyncCheckpointingConcurrentCloseBeforeAcknowledge
The problem was the whenNew mocking which could lead to a StackOverflowException in the
StreamTask#AsyncCheckpointRunnable.
---
.../streaming/runtime/tasks/StreamTaskTest.java | 247 ++++++++++++++++-----
1 file changed, 197 insertions(+), 50 deletions(-)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 4ed23c7..189dd3d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.PermanentBlobCache;
@@ -36,6 +37,7 @@ import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.execution.CancelTaskException;
@@ -57,6 +59,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
@@ -66,15 +69,18 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
@@ -94,7 +100,6 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
-import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -104,6 +109,7 @@ import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
@@ -124,15 +130,19 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning;
import static org.hamcrest.Matchers.everyItem;
@@ -156,7 +166,6 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
/**
* Tests for {@link StreamTask}.
@@ -516,71 +525,56 @@ public class StreamTaskTest extends TestLogger {
@Test
public void testAsyncCheckpointingConcurrentCloseBeforeAcknowledge() throws Exception {
- final OneShotLatch createSubtask = new OneShotLatch();
- final OneShotLatch completeSubtask = new OneShotLatch();
-
- whenNew(OperatorSnapshotFinalizer.class).
- withAnyArguments().
- thenAnswer((Answer<OperatorSnapshotFinalizer>) invocation -> {
- createSubtask.trigger();
- completeSubtask.await();
- Object[] arguments = invocation.getArguments();
- return new OperatorSnapshotFinalizer((OperatorSnapshotFutures) arguments[0]);
- }
- );
-
- KeyedStateHandle managedKeyedStateHandle = mock(KeyedStateHandle.class);
- KeyedStateHandle rawKeyedStateHandle = mock(KeyedStateHandle.class);
- OperatorStateHandle managedOperatorStateHandle = mock(OperatorStreamStateHandle.class);
- OperatorStateHandle rawOperatorStateHandle = mock(OperatorStreamStateHandle.class);
+ final TestingKeyedStateHandle managedKeyedStateHandle = new TestingKeyedStateHandle();
+ final TestingKeyedStateHandle rawKeyedStateHandle = new TestingKeyedStateHandle();
+ final TestingOperatorStateHandle managedOperatorStateHandle = new TestingOperatorStateHandle();
+ final TestingOperatorStateHandle rawOperatorStateHandle = new TestingOperatorStateHandle();
+ final BlockingRunnableFuture<SnapshotResult<KeyedStateHandle>> rawKeyedStateHandleFuture = new BlockingRunnableFuture<>(2, SnapshotResult.of(rawKeyedStateHandle));
OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures(
DoneFuture.of(SnapshotResult.of(managedKeyedStateHandle)),
- DoneFuture.of(SnapshotResult.of(rawKeyedStateHandle)),
+ rawKeyedStateHandleFuture,
DoneFuture.of(SnapshotResult.of(managedOperatorStateHandle)),
DoneFuture.of(SnapshotResult.of(rawOperatorStateHandle)));
final StreamOperator<?> streamOperator = streamOperatorWithSnapshot(operatorSnapshotResult);
- try (MockEnvironment mockEnvironment = spy(new MockEnvironmentBuilder().build())) {
+ final AcknowledgeDummyEnvironment mockEnvironment = new AcknowledgeDummyEnvironment();
- RunningTask<MockStreamTask> task = runTask(() -> new MockStreamTask(
- mockEnvironment,
- operatorChain(streamOperator)));
-
- waitTaskIsRunning(task.streamTask, task.invocationFuture);
-
- final long checkpointId = 42L;
- task.streamTask.triggerCheckpoint(
- new CheckpointMetaData(checkpointId, 1L),
- CheckpointOptions.forCheckpointWithDefaultLocation(),
- false);
+ RunningTask<MockStreamTask> task = runTask(() -> new MockStreamTask(
+ mockEnvironment,
+ operatorChain(streamOperator)));
- createSubtask.await();
+ waitTaskIsRunning(task.streamTask, task.invocationFuture);
- task.streamTask.cancel();
+ final long checkpointId = 42L;
+ task.streamTask.triggerCheckpoint(
+ new CheckpointMetaData(checkpointId, 1L),
+ CheckpointOptions.forCheckpointWithDefaultLocation(),
+ false);
- completeSubtask.trigger();
+ rawKeyedStateHandleFuture.awaitRun();
- // wait for the completion of the async task
- ExecutorService executor = task.streamTask.getAsyncOperationsThreadPool();
- executor.shutdown();
- if (!executor.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
- fail("Executor did not shut down within the given timeout. This indicates that the " +
- "checkpointing did not resume.");
- }
+ task.streamTask.cancel();
- // check that the checkpoint has not been acknowledged
- verify(mockEnvironment, never()).acknowledgeCheckpoint(eq(checkpointId), any(CheckpointMetrics.class), any(TaskStateSnapshot.class));
+ final FutureUtils.ConjunctFuture<Void> discardFuture = FutureUtils.waitForAll(
+ Arrays.asList(
+ managedKeyedStateHandle.getDiscardFuture(),
+ rawKeyedStateHandle.getDiscardFuture(),
+ managedOperatorStateHandle.getDiscardFuture(),
+ rawOperatorStateHandle.getDiscardFuture()));
- // check that the state handles have been discarded
- verify(managedKeyedStateHandle).discardState();
- verify(rawKeyedStateHandle).discardState();
- verify(managedOperatorStateHandle).discardState();
- verify(rawOperatorStateHandle).discardState();
+ // make sure that all state handles have been discarded
+ discardFuture.get();
- task.waitForTaskCompletion(true);
+ try {
+ mockEnvironment.getAcknowledgeCheckpointFuture().get(10L, TimeUnit.MILLISECONDS);
+ fail("The checkpoint should not get acknowledged.");
+ } catch (TimeoutException expected) {
+ // future should not be completed
}
+
+ task.waitForTaskCompletion(true);
}
/**
@@ -1308,4 +1302,157 @@ public class StreamTaskTest extends TestLogger {
super.initializeState(context);
}
}
+
+ private static class TestingKeyedStateHandle implements KeyedStateHandle {
+
+ private static final long serialVersionUID = -2473861305282291582L;
+
+ private final transient CompletableFuture<Void> discardFuture = new CompletableFuture<>();
+
+ public CompletableFuture<Void> getDiscardFuture() {
+ return discardFuture;
+ }
+
+ @Override
+ public KeyGroupRange getKeyGroupRange() {
+ return KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
+ }
+
+ @Override
+ public TestingKeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+ return this;
+ }
+
+ @Override
+ public void registerSharedStates(SharedStateRegistry stateRegistry) {
+
+ }
+
+ @Override
+ public void discardState() {
+ discardFuture.complete(null);
+ }
+
+ @Override
+ public long getStateSize() {
+ return 0L;
+ }
+ }
+
+ private static class TestingOperatorStateHandle implements OperatorStateHandle {
+
+ private static final long serialVersionUID = 923794934187614088L;
+
+ private final transient CompletableFuture<Void> discardFuture = new CompletableFuture<>();
+
+ public CompletableFuture<Void> getDiscardFuture() {
+ return discardFuture;
+ }
+
+ @Override
+ public Map<String, StateMetaInfo> getStateNameToPartitionOffsets() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public FSDataInputStream openInputStream() throws IOException {
+ throw new IOException("Cannot open input streams in testing implementation.");
+ }
+
+ @Override
+ public StreamStateHandle getDelegateStateHandle() {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public void discardState() throws Exception {
+ discardFuture.complete(null);
+ }
+
+ @Override
+ public long getStateSize() {
+ return 0L;
+ }
+ }
+
+ private static class AcknowledgeDummyEnvironment extends DummyEnvironment {
+
+ private final CompletableFuture<Long> acknowledgeCheckpointFuture = new CompletableFuture<>();
+
+ public CompletableFuture<Long> getAcknowledgeCheckpointFuture() {
+ return acknowledgeCheckpointFuture;
+ }
+
+ @Override
+ public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics) {
+ acknowledgeCheckpointFuture.complete(checkpointId);
+ }
+
+ @Override
+ public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
+ acknowledgeCheckpointFuture.complete(checkpointId);
+ }
+ }
+
+ private static final class BlockingRunnableFuture<V> implements RunnableFuture<V> {
+
+ private final CompletableFuture<V> future = new CompletableFuture<>();
+
+ private final OneShotLatch signalRunLatch = new OneShotLatch();
+
+ private final CountDownLatch continueRunLatch;
+
+ private final V value;
+
+ private BlockingRunnableFuture(int parties, V value) {
+ this.continueRunLatch = new CountDownLatch(parties);
+ this.value = value;
+ }
+
+ @Override
+ public void run() {
+ signalRunLatch.trigger();
+ continueRunLatch.countDown();
+
+ try {
+ // poor man's barrier because it can happen that the async operations thread gets
+ // interrupted by the mail box thread. The CyclicBarrier would in this case fail
+ // all participants of the barrier, leaving the future uncompleted
+ continueRunLatch.await();
+ } catch (InterruptedException e) {
+ ExceptionUtils.rethrow(e);
+ }
+
+ future.complete(value);
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return future.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return future.isDone();
+ }
+
+ @Override
+ public V get() throws InterruptedException, ExecutionException {
+ return future.get();
+ }
+
+ @Override
+ public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return future.get(timeout, unit);
+ }
+
+ void awaitRun() throws InterruptedException {
+ signalRunLatch.await();
+ }
+ }
}