You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/01/27 19:11:09 UTC

flink git commit: [FLINK-5667] [state] Synchronize asynchronous checkpointing and close operation

Repository: flink
Updated Branches:
  refs/heads/master 54b709275 -> 0aa9db078


[FLINK-5667] [state] Synchronize asynchronous checkpointing and close operation

This PR synchronizes asynchronous checkpointing and close operations of a StreamTask.
The synchronization prevents that an acknowledged checkpoint gets discarded and that
a discarded checkpoint gets acknowledged. It achieves this by introducing an atomic
state variable which guards against late close and acknowledge operations.

This closes #3226.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0aa9db07
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0aa9db07
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0aa9db07

Branch: refs/heads/master
Commit: 0aa9db07800bcc3979dc89bba0b3697149b18ecd
Parents: 54b7092
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Jan 27 16:26:22 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jan 27 20:10:41 2017 +0100

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java     |  70 ++++--
 .../streaming/runtime/tasks/StreamTaskTest.java | 237 +++++++++++++++++++
 2 files changed, 287 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0aa9db07/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 7647fbb..705dfca 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -75,6 +75,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Base class for all streaming tasks. A task is the unit of local processing that is deployed
@@ -879,6 +880,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 		private final long asyncStartNanos;
 
+		private final AtomicReference<CheckpointingOperation.AsynCheckpointState> asyncCheckpointState = new AtomicReference<>(
+			CheckpointingOperation.AsynCheckpointState.RUNNING);
+
 		AsyncCheckpointRunnable(
 				StreamTask<?, ?> owner,
 				List<StreamStateHandle> nonPartitionedStateHandles,
@@ -948,13 +952,25 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 						keyedStateHandleBackend,
 						keyedStateHandleStream);
 
-				owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, subtaskState);
+				if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
+					owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, subtaskState);
 
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
 							owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis);
+					}
+				} else {
+					LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
+						owner.getName(),
+						checkpointMetaData.getCheckpointId());
 				}
 			} catch (Exception e) {
+				// the state is completed if an exception occurred in the acknowledgeCheckpoint call
+				// in order to clean up, we have to set it to RUNNING again.
+				asyncCheckpointState.compareAndSet(
+					CheckpointingOperation.AsynCheckpointState.COMPLETED,
+					CheckpointingOperation.AsynCheckpointState.RUNNING);
+
 				try {
 					cleanup();
 				} catch (Exception cleanupException) {
@@ -984,28 +1000,36 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		}
 
 		private void cleanup() throws Exception {
-			Exception exception = null;
+			if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.DISCARDED)) {
+				LOG.debug("Cleanup AsyncCheckpointRunnable for checkpoint {} of {}.", checkpointMetaData.getCheckpointId(), owner.getName());
+				Exception exception = null;
 
-			// clean up ongoing operator snapshot results and non partitioned state handles
-			for (OperatorSnapshotResult operatorSnapshotResult : snapshotInProgressList) {
-				if (operatorSnapshotResult != null) {
-					try {
-						operatorSnapshotResult.cancel();
-					} catch (Exception cancelException) {
-						exception = ExceptionUtils.firstOrSuppressed(cancelException, exception);
+				// clean up ongoing operator snapshot results and non partitioned state handles
+				for (OperatorSnapshotResult operatorSnapshotResult : snapshotInProgressList) {
+					if (operatorSnapshotResult != null) {
+						try {
+							operatorSnapshotResult.cancel();
+						} catch (Exception cancelException) {
+							exception = ExceptionUtils.firstOrSuppressed(cancelException, exception);
+						}
 					}
 				}
-			}
 
-			// discard non partitioned state handles
-			try {
-				StateUtil.bestEffortDiscardAllStateObjects(nonPartitionedStateHandles);
-			} catch (Exception discardException) {
-				exception = ExceptionUtils.firstOrSuppressed(discardException, exception);
-			}
+				// discard non partitioned state handles
+				try {
+					StateUtil.bestEffortDiscardAllStateObjects(nonPartitionedStateHandles);
+				} catch (Exception discardException) {
+					exception = ExceptionUtils.firstOrSuppressed(discardException, exception);
+				}
 
-			if (null != exception) {
-				throw exception;
+				if (null != exception) {
+					throw exception;
+				}
+			} else {
+				LOG.debug("{} - asynchronous checkpointing operation for checkpoint {} has " +
+						"already been completed. Thus, the state handles are not cleaned up.",
+					owner.getName(),
+					checkpointMetaData.getCheckpointId());
 			}
 		}
 	}
@@ -1174,5 +1198,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			owner.cancelables.registerClosable(asyncCheckpointRunnable);
 			owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
 		}
+
+		private enum AsynCheckpointState {
+			RUNNING,
+			DISCARDED,
+			COMPLETED
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0aa9db07/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index f5990ca..cb9850f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -55,8 +56,11 @@ import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateBackendFactory;
@@ -85,10 +89,14 @@ import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.mockito.internal.util.reflection.Whitebox;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
@@ -104,6 +112,8 @@ import java.util.Comparator;
 import java.util.PriorityQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -114,11 +124,17 @@ import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.withSettings;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
 
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(StreamTask.class)
 public class StreamTaskTest extends TestLogger {
 
 	private static OneShotLatch SYNC_LATCH;
@@ -413,6 +429,227 @@ public class StreamTaskTest extends TestLogger {
 		verify(streamStateHandle3).discardState();
 	}
 
+	/**
+	 * FLINK-5667
+	 *
+	 * Tests that a concurrent cancel operation does not discard the state handles of an
+	 * acknowledged checkpoint. The situation can only happen if the cancel call is executed
+	 * after Environment.acknowledgeCheckpoint() and before the
+	 * CloseableRegistry.unregisterClosable() call.
+	 */
+	@Test
+	public void testAsyncCheckpointingConcurrentCloseAfterAcknowledge() throws Exception {
+		final long checkpointId = 42L;
+		final long timestamp = 1L;
+
+		final OneShotLatch acknowledgeCheckpointLatch = new OneShotLatch();
+		final OneShotLatch completeAcknowledge = new OneShotLatch();
+
+		TaskInfo mockTaskInfo = mock(TaskInfo.class);
+		when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
+		when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
+		Environment mockEnvironment = mock(Environment.class);
+		when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
+		doAnswer(new Answer() {
+			@Override
+			public Object answer(InvocationOnMock invocation) throws Throwable {
+				acknowledgeCheckpointLatch.trigger();
+
+				// block here so that we can issue the concurrent cancel call
+				completeAcknowledge.await();
+
+				return null;
+			}
+		}).when(mockEnvironment).acknowledgeCheckpoint(any(CheckpointMetaData.class), any(SubtaskState.class));
+
+		StreamTask<?, AbstractStreamOperator<?>> streamTask = mock(StreamTask.class, Mockito.CALLS_REAL_METHODS);
+		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
+		streamTask.setEnvironment(mockEnvironment);
+
+		StreamOperator<?> streamOperator = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+
+		KeyGroupsStateHandle managedKeyedStateHandle = mock(KeyGroupsStateHandle.class);
+		KeyGroupsStateHandle rawKeyedStateHandle = mock(KeyGroupsStateHandle.class);
+		OperatorStateHandle managedOperatorStateHandle = mock(OperatorStateHandle.class);
+		OperatorStateHandle rawOperatorStateHandle = mock(OperatorStateHandle.class);
+
+		OperatorSnapshotResult operatorSnapshotResult = new OperatorSnapshotResult(
+			new DoneFuture<>(managedKeyedStateHandle),
+			new DoneFuture<>(rawKeyedStateHandle),
+			new DoneFuture<>(managedOperatorStateHandle),
+			new DoneFuture<>(rawOperatorStateHandle));
+
+		when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult);
+
+		StreamOperator<?>[] streamOperators = {streamOperator};
+
+		OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
+		when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+
+		StreamStateHandle streamStateHandle = mock(StreamStateHandle.class);
+
+		CheckpointStreamFactory.CheckpointStateOutputStream outStream = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+
+		when(outStream.closeAndGetHandle()).thenReturn(streamStateHandle);
+
+		CheckpointStreamFactory mockStreamFactory = mock(CheckpointStreamFactory.class);
+		when(mockStreamFactory.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn(outStream);
+
+		AbstractStateBackend mockStateBackend = mock(AbstractStateBackend.class);
+		when(mockStateBackend.createStreamFactory(any(JobID.class), anyString())).thenReturn(mockStreamFactory);
+
+		Whitebox.setInternalState(streamTask, "isRunning", true);
+		Whitebox.setInternalState(streamTask, "lock", new Object());
+		Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
+		Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
+		Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", Executors.newFixedThreadPool(1));
+		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
+		Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
+
+		streamTask.triggerCheckpoint(checkpointMetaData);
+
+		acknowledgeCheckpointLatch.await();
+
+		ArgumentCaptor<SubtaskState> subtaskStateCaptor = ArgumentCaptor.forClass(SubtaskState.class);
+
+		// check that the checkpoint has been completed
+		verify(mockEnvironment).acknowledgeCheckpoint(eq(checkpointMetaData), subtaskStateCaptor.capture());
+
+		SubtaskState subtaskState = subtaskStateCaptor.getValue();
+
+		// check that the subtask state contains the expected state handles
+		assertEquals(managedKeyedStateHandle, subtaskState.getManagedKeyedState());
+		assertEquals(rawKeyedStateHandle, subtaskState.getRawKeyedState());
+		assertEquals(new ChainedStateHandle<>(Collections.singletonList(managedOperatorStateHandle)), subtaskState.getManagedOperatorState());
+		assertEquals(new ChainedStateHandle<>(Collections.singletonList(rawOperatorStateHandle)), subtaskState.getRawOperatorState());
+
+		// check that the state handles have not been discarded
+		verify(managedKeyedStateHandle, never()).discardState();
+		verify(rawKeyedStateHandle, never()).discardState();
+		verify(managedOperatorStateHandle, never()).discardState();
+		verify(rawOperatorStateHandle, never()).discardState();
+
+		streamTask.cancel();
+
+		completeAcknowledge.trigger();
+
+		// canceling the stream task after it has acknowledged the checkpoint should not discard
+		// the state handles
+		verify(managedKeyedStateHandle, never()).discardState();
+		verify(rawKeyedStateHandle, never()).discardState();
+		verify(managedOperatorStateHandle, never()).discardState();
+		verify(rawOperatorStateHandle, never()).discardState();
+	}
+
+	/**
+	 * FLINK-5667
+	 *
+	 * Tests that a concurrent cancel operation discards the state handles of a not yet
+	 * acknowledged checkpoint and prevents sending an acknowledge message to the
+	 * CheckpointCoordinator. The situation can only happen if the cancel call is executed
+	 * before Environment.acknowledgeCheckpoint().
+	 */
+	@Test
+	public void testAsyncCheckpointingConcurrentCloseBeforeAcknowledge() throws Exception {
+		final long checkpointId = 42L;
+		final long timestamp = 1L;
+
+		final OneShotLatch createSubtask = new OneShotLatch();
+		final OneShotLatch completeSubtask = new OneShotLatch();
+
+		TaskInfo mockTaskInfo = mock(TaskInfo.class);
+		when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
+		when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
+		Environment mockEnvironment = mock(Environment.class);
+		when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
+
+		whenNew(SubtaskState.class).withAnyArguments().thenAnswer(new Answer<SubtaskState>() {
+			@Override
+			public SubtaskState answer(InvocationOnMock invocation) throws Throwable {
+				createSubtask.trigger();
+				completeSubtask.await();
+
+				return new SubtaskState(
+					(ChainedStateHandle<StreamStateHandle>)invocation.getArguments()[0],
+					(ChainedStateHandle<OperatorStateHandle>)invocation.getArguments()[1],
+					(ChainedStateHandle<OperatorStateHandle>)invocation.getArguments()[2],
+					(KeyGroupsStateHandle)invocation.getArguments()[3],
+					(KeyGroupsStateHandle)invocation.getArguments()[4]);
+			}
+		});
+
+		StreamTask<?, AbstractStreamOperator<?>> streamTask = mock(StreamTask.class, Mockito.CALLS_REAL_METHODS);
+		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
+		streamTask.setEnvironment(mockEnvironment);
+
+		StreamOperator<?> streamOperator = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+
+		KeyGroupsStateHandle managedKeyedStateHandle = mock(KeyGroupsStateHandle.class);
+		KeyGroupsStateHandle rawKeyedStateHandle = mock(KeyGroupsStateHandle.class);
+		OperatorStateHandle managedOperatorStateHandle = mock(OperatorStateHandle.class);
+		OperatorStateHandle rawOperatorStateHandle = mock(OperatorStateHandle.class);
+
+		OperatorSnapshotResult operatorSnapshotResult = new OperatorSnapshotResult(
+			new DoneFuture<>(managedKeyedStateHandle),
+			new DoneFuture<>(rawKeyedStateHandle),
+			new DoneFuture<>(managedOperatorStateHandle),
+			new DoneFuture<>(rawOperatorStateHandle));
+
+		when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult);
+
+		StreamOperator<?>[] streamOperators = {streamOperator};
+
+		OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
+		when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+
+		StreamStateHandle streamStateHandle = mock(StreamStateHandle.class);
+
+		CheckpointStreamFactory.CheckpointStateOutputStream outStream = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+
+		when(outStream.closeAndGetHandle()).thenReturn(streamStateHandle);
+
+		CheckpointStreamFactory mockStreamFactory = mock(CheckpointStreamFactory.class);
+		when(mockStreamFactory.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn(outStream);
+
+		AbstractStateBackend mockStateBackend = mock(AbstractStateBackend.class);
+		when(mockStateBackend.createStreamFactory(any(JobID.class), anyString())).thenReturn(mockStreamFactory);
+
+		ExecutorService executor = Executors.newFixedThreadPool(1);
+
+		Whitebox.setInternalState(streamTask, "isRunning", true);
+		Whitebox.setInternalState(streamTask, "lock", new Object());
+		Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
+		Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
+		Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", executor);
+		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
+		Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
+
+		streamTask.triggerCheckpoint(checkpointMetaData);
+
+		createSubtask.await();
+
+		streamTask.cancel();
+
+		completeSubtask.trigger();
+
+		// wait for the completion of the async task
+		executor.shutdown();
+
+		if (!executor.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
+			fail("Executor did not shut down within the given timeout. This indicates that the " +
+				"checkpointing did not resume.");
+		}
+
+		// check that the checkpoint has not been acknowledged
+		verify(mockEnvironment, never()).acknowledgeCheckpoint(any(CheckpointMetaData.class), any(SubtaskState.class));
+
+		// check that the state handles have been discarded
+		verify(managedKeyedStateHandle).discardState();
+		verify(rawKeyedStateHandle).discardState();
+		verify(managedOperatorStateHandle).discardState();
+		verify(rawOperatorStateHandle).discardState();
+	}
+
 	// ------------------------------------------------------------------------
 	//  Test Utilities
 	// ------------------------------------------------------------------------