You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/01/27 19:11:09 UTC
flink git commit: [FLINK-5667] [state] Synchronize asynchronous
checkpointing and close operation
Repository: flink
Updated Branches:
refs/heads/master 54b709275 -> 0aa9db078
[FLINK-5667] [state] Synchronize asynchronous checkpointing and close operation
This PR synchronizes asynchronous checkpointing and close operations of a StreamTask.
The synchronization prevents that an acknowledged checkpoint gets discarded and that
a discarded checkpoint gets acknowledged. It achieves this by introducing an atomic
state variable which guards against late close and acknowledge operations.
This closes #3226.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0aa9db07
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0aa9db07
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0aa9db07
Branch: refs/heads/master
Commit: 0aa9db07800bcc3979dc89bba0b3697149b18ecd
Parents: 54b7092
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Jan 27 16:26:22 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jan 27 20:10:41 2017 +0100
----------------------------------------------------------------------
.../streaming/runtime/tasks/StreamTask.java | 70 ++++--
.../streaming/runtime/tasks/StreamTaskTest.java | 237 +++++++++++++++++++
2 files changed, 287 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0aa9db07/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 7647fbb..705dfca 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -75,6 +75,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Base class for all streaming tasks. A task is the unit of local processing that is deployed
@@ -879,6 +880,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
private final long asyncStartNanos;
+ private final AtomicReference<CheckpointingOperation.AsynCheckpointState> asyncCheckpointState = new AtomicReference<>(
+ CheckpointingOperation.AsynCheckpointState.RUNNING);
+
AsyncCheckpointRunnable(
StreamTask<?, ?> owner,
List<StreamStateHandle> nonPartitionedStateHandles,
@@ -948,13 +952,25 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
keyedStateHandleBackend,
keyedStateHandleStream);
- owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, subtaskState);
+ if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
+ owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, subtaskState);
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis);
+ }
+ } else {
+ LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
+ owner.getName(),
+ checkpointMetaData.getCheckpointId());
}
} catch (Exception e) {
+ // the state is completed if an exception occurred in the acknowledgeCheckpoint call
+ // in order to clean up, we have to set it to RUNNING again.
+ asyncCheckpointState.compareAndSet(
+ CheckpointingOperation.AsynCheckpointState.COMPLETED,
+ CheckpointingOperation.AsynCheckpointState.RUNNING);
+
try {
cleanup();
} catch (Exception cleanupException) {
@@ -984,28 +1000,36 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
private void cleanup() throws Exception {
- Exception exception = null;
+ if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.DISCARDED)) {
+ LOG.debug("Cleanup AsyncCheckpointRunnable for checkpoint {} of {}.", checkpointMetaData.getCheckpointId(), owner.getName());
+ Exception exception = null;
- // clean up ongoing operator snapshot results and non partitioned state handles
- for (OperatorSnapshotResult operatorSnapshotResult : snapshotInProgressList) {
- if (operatorSnapshotResult != null) {
- try {
- operatorSnapshotResult.cancel();
- } catch (Exception cancelException) {
- exception = ExceptionUtils.firstOrSuppressed(cancelException, exception);
+ // clean up ongoing operator snapshot results and non partitioned state handles
+ for (OperatorSnapshotResult operatorSnapshotResult : snapshotInProgressList) {
+ if (operatorSnapshotResult != null) {
+ try {
+ operatorSnapshotResult.cancel();
+ } catch (Exception cancelException) {
+ exception = ExceptionUtils.firstOrSuppressed(cancelException, exception);
+ }
}
}
- }
- // discard non partitioned state handles
- try {
- StateUtil.bestEffortDiscardAllStateObjects(nonPartitionedStateHandles);
- } catch (Exception discardException) {
- exception = ExceptionUtils.firstOrSuppressed(discardException, exception);
- }
+ // discard non partitioned state handles
+ try {
+ StateUtil.bestEffortDiscardAllStateObjects(nonPartitionedStateHandles);
+ } catch (Exception discardException) {
+ exception = ExceptionUtils.firstOrSuppressed(discardException, exception);
+ }
- if (null != exception) {
- throw exception;
+ if (null != exception) {
+ throw exception;
+ }
+ } else {
+ LOG.debug("{} - asynchronous checkpointing operation for checkpoint {} has " +
+ "already been completed. Thus, the state handles are not cleaned up.",
+ owner.getName(),
+ checkpointMetaData.getCheckpointId());
}
}
}
@@ -1174,5 +1198,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
owner.cancelables.registerClosable(asyncCheckpointRunnable);
owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
}
+
+ private enum AsynCheckpointState {
+ RUNNING,
+ DISCARDED,
+ COMPLETED
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0aa9db07/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index f5990ca..cb9850f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -55,8 +56,11 @@ import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackendFactory;
@@ -85,10 +89,14 @@ import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
@@ -104,6 +112,8 @@ import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
@@ -114,11 +124,17 @@ import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(StreamTask.class)
public class StreamTaskTest extends TestLogger {
private static OneShotLatch SYNC_LATCH;
@@ -413,6 +429,227 @@ public class StreamTaskTest extends TestLogger {
verify(streamStateHandle3).discardState();
}
+ /**
+ * FLINK-5667
+ *
+ * Tests that a concurrent cancel operation does not discard the state handles of an
+ * acknowledged checkpoint. The situation can only happen if the cancel call is executed
+ * after Environment.acknowledgeCheckpoint() and before the
+ * CloseableRegistry.unregisterClosable() call.
+ */
+ @Test
+ public void testAsyncCheckpointingConcurrentCloseAfterAcknowledge() throws Exception {
+ final long checkpointId = 42L;
+ final long timestamp = 1L;
+
+ final OneShotLatch acknowledgeCheckpointLatch = new OneShotLatch();
+ final OneShotLatch completeAcknowledge = new OneShotLatch();
+
+ TaskInfo mockTaskInfo = mock(TaskInfo.class);
+ when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
+ when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
+ Environment mockEnvironment = mock(Environment.class);
+ when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ acknowledgeCheckpointLatch.trigger();
+
+ // block here so that we can issue the concurrent cancel call
+ completeAcknowledge.await();
+
+ return null;
+ }
+ }).when(mockEnvironment).acknowledgeCheckpoint(any(CheckpointMetaData.class), any(SubtaskState.class));
+
+ StreamTask<?, AbstractStreamOperator<?>> streamTask = mock(StreamTask.class, Mockito.CALLS_REAL_METHODS);
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
+ streamTask.setEnvironment(mockEnvironment);
+
+ StreamOperator<?> streamOperator = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+
+ KeyGroupsStateHandle managedKeyedStateHandle = mock(KeyGroupsStateHandle.class);
+ KeyGroupsStateHandle rawKeyedStateHandle = mock(KeyGroupsStateHandle.class);
+ OperatorStateHandle managedOperatorStateHandle = mock(OperatorStateHandle.class);
+ OperatorStateHandle rawOperatorStateHandle = mock(OperatorStateHandle.class);
+
+ OperatorSnapshotResult operatorSnapshotResult = new OperatorSnapshotResult(
+ new DoneFuture<>(managedKeyedStateHandle),
+ new DoneFuture<>(rawKeyedStateHandle),
+ new DoneFuture<>(managedOperatorStateHandle),
+ new DoneFuture<>(rawOperatorStateHandle));
+
+ when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult);
+
+ StreamOperator<?>[] streamOperators = {streamOperator};
+
+ OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
+ when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+
+ StreamStateHandle streamStateHandle = mock(StreamStateHandle.class);
+
+ CheckpointStreamFactory.CheckpointStateOutputStream outStream = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+
+ when(outStream.closeAndGetHandle()).thenReturn(streamStateHandle);
+
+ CheckpointStreamFactory mockStreamFactory = mock(CheckpointStreamFactory.class);
+ when(mockStreamFactory.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn(outStream);
+
+ AbstractStateBackend mockStateBackend = mock(AbstractStateBackend.class);
+ when(mockStateBackend.createStreamFactory(any(JobID.class), anyString())).thenReturn(mockStreamFactory);
+
+ Whitebox.setInternalState(streamTask, "isRunning", true);
+ Whitebox.setInternalState(streamTask, "lock", new Object());
+ Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
+ Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
+ Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", Executors.newFixedThreadPool(1));
+ Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
+ Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
+
+ streamTask.triggerCheckpoint(checkpointMetaData);
+
+ acknowledgeCheckpointLatch.await();
+
+ ArgumentCaptor<SubtaskState> subtaskStateCaptor = ArgumentCaptor.forClass(SubtaskState.class);
+
+ // check that the checkpoint has been completed
+ verify(mockEnvironment).acknowledgeCheckpoint(eq(checkpointMetaData), subtaskStateCaptor.capture());
+
+ SubtaskState subtaskState = subtaskStateCaptor.getValue();
+
+ // check that the subtask state contains the expected state handles
+ assertEquals(managedKeyedStateHandle, subtaskState.getManagedKeyedState());
+ assertEquals(rawKeyedStateHandle, subtaskState.getRawKeyedState());
+ assertEquals(new ChainedStateHandle<>(Collections.singletonList(managedOperatorStateHandle)), subtaskState.getManagedOperatorState());
+ assertEquals(new ChainedStateHandle<>(Collections.singletonList(rawOperatorStateHandle)), subtaskState.getRawOperatorState());
+
+ // check that the state handles have not been discarded
+ verify(managedKeyedStateHandle, never()).discardState();
+ verify(rawKeyedStateHandle, never()).discardState();
+ verify(managedOperatorStateHandle, never()).discardState();
+ verify(rawOperatorStateHandle, never()).discardState();
+
+ streamTask.cancel();
+
+ completeAcknowledge.trigger();
+
+ // canceling the stream task after it has acknowledged the checkpoint should not discard
+ // the state handles
+ verify(managedKeyedStateHandle, never()).discardState();
+ verify(rawKeyedStateHandle, never()).discardState();
+ verify(managedOperatorStateHandle, never()).discardState();
+ verify(rawOperatorStateHandle, never()).discardState();
+ }
+
+ /**
+ * FLINK-5667
+ *
+ * Tests that a concurrent cancel operation discards the state handles of a not yet
+ * acknowledged checkpoint and prevents sending an acknowledge message to the
+ * CheckpointCoordinator. The situation can only happen if the cancel call is executed
+ * before Environment.acknowledgeCheckpoint().
+ */
+ @Test
+ public void testAsyncCheckpointingConcurrentCloseBeforeAcknowledge() throws Exception {
+ final long checkpointId = 42L;
+ final long timestamp = 1L;
+
+ final OneShotLatch createSubtask = new OneShotLatch();
+ final OneShotLatch completeSubtask = new OneShotLatch();
+
+ TaskInfo mockTaskInfo = mock(TaskInfo.class);
+ when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
+ when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
+ Environment mockEnvironment = mock(Environment.class);
+ when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
+
+ whenNew(SubtaskState.class).withAnyArguments().thenAnswer(new Answer<SubtaskState>() {
+ @Override
+ public SubtaskState answer(InvocationOnMock invocation) throws Throwable {
+ createSubtask.trigger();
+ completeSubtask.await();
+
+ return new SubtaskState(
+ (ChainedStateHandle<StreamStateHandle>)invocation.getArguments()[0],
+ (ChainedStateHandle<OperatorStateHandle>)invocation.getArguments()[1],
+ (ChainedStateHandle<OperatorStateHandle>)invocation.getArguments()[2],
+ (KeyGroupsStateHandle)invocation.getArguments()[3],
+ (KeyGroupsStateHandle)invocation.getArguments()[4]);
+ }
+ });
+
+ StreamTask<?, AbstractStreamOperator<?>> streamTask = mock(StreamTask.class, Mockito.CALLS_REAL_METHODS);
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
+ streamTask.setEnvironment(mockEnvironment);
+
+ StreamOperator<?> streamOperator = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+
+ KeyGroupsStateHandle managedKeyedStateHandle = mock(KeyGroupsStateHandle.class);
+ KeyGroupsStateHandle rawKeyedStateHandle = mock(KeyGroupsStateHandle.class);
+ OperatorStateHandle managedOperatorStateHandle = mock(OperatorStateHandle.class);
+ OperatorStateHandle rawOperatorStateHandle = mock(OperatorStateHandle.class);
+
+ OperatorSnapshotResult operatorSnapshotResult = new OperatorSnapshotResult(
+ new DoneFuture<>(managedKeyedStateHandle),
+ new DoneFuture<>(rawKeyedStateHandle),
+ new DoneFuture<>(managedOperatorStateHandle),
+ new DoneFuture<>(rawOperatorStateHandle));
+
+ when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult);
+
+ StreamOperator<?>[] streamOperators = {streamOperator};
+
+ OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
+ when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+
+ StreamStateHandle streamStateHandle = mock(StreamStateHandle.class);
+
+ CheckpointStreamFactory.CheckpointStateOutputStream outStream = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+
+ when(outStream.closeAndGetHandle()).thenReturn(streamStateHandle);
+
+ CheckpointStreamFactory mockStreamFactory = mock(CheckpointStreamFactory.class);
+ when(mockStreamFactory.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn(outStream);
+
+ AbstractStateBackend mockStateBackend = mock(AbstractStateBackend.class);
+ when(mockStateBackend.createStreamFactory(any(JobID.class), anyString())).thenReturn(mockStreamFactory);
+
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+
+ Whitebox.setInternalState(streamTask, "isRunning", true);
+ Whitebox.setInternalState(streamTask, "lock", new Object());
+ Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
+ Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
+ Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", executor);
+ Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
+ Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
+
+ streamTask.triggerCheckpoint(checkpointMetaData);
+
+ createSubtask.await();
+
+ streamTask.cancel();
+
+ completeSubtask.trigger();
+
+ // wait for the completion of the async task
+ executor.shutdown();
+
+ if (!executor.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
+ fail("Executor did not shut down within the given timeout. This indicates that the " +
+ "checkpointing did not resume.");
+ }
+
+ // check that the checkpoint has not been acknowledged
+ verify(mockEnvironment, never()).acknowledgeCheckpoint(any(CheckpointMetaData.class), any(SubtaskState.class));
+
+ // check that the state handles have been discarded
+ verify(managedKeyedStateHandle).discardState();
+ verify(rawKeyedStateHandle).discardState();
+ verify(managedOperatorStateHandle).discardState();
+ verify(rawOperatorStateHandle).discardState();
+ }
+
// ------------------------------------------------------------------------
// Test Utilities
// ------------------------------------------------------------------------