You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/16 23:35:33 UTC

[2/3] flink git commit: [FLINK-5063] [checkpointing] Discard state handles of declined or expired state handles

[FLINK-5063] [checkpointing] Discard state handles of declined or expired state handles

Whenever the checkpoint coordinator receives an acknowledge checkpoint message which belongs
to the job maintained by the checkpoint coordinator, it should either record the state handles
for later processing or discard to free the resources. The latter case can happen if a
checkpoint has been expired and late acknowledge checkpoint messages arrive. Furthremore, it
can happen if a Task sent a decline checkpoint message while other Tasks where still drawing
a checkpoint. This PR changes the behaviour such that state handles belonging to the job of
the checkpoint coordinator are discarded if they could not be added to the PendingCheckpoint.

This closes #2812


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72b295b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72b295b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72b295b3

Branch: refs/heads/master
Commit: 72b295b3b52dff2d0bc5b78881826e8936c370ff
Parents: bf06a1c
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Nov 14 18:33:55 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 17 00:34:55 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  80 ++++++++++----
 .../runtime/checkpoint/PendingCheckpoint.java   |  57 +++++++---
 .../checkpoint/CheckpointCoordinatorTest.java   | 107 +++++++++++++++++++
 3 files changed, 209 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/72b295b3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 8088c3d..886409d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -638,35 +638,62 @@ public class CheckpointCoordinator {
 			if (checkpoint != null && !checkpoint.isDiscarded()) {
 				isPendingCheckpoint = true;
 
-				if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState())) {
-					if (checkpoint.isFullyAcknowledged()) {
-						completed = checkpoint.finalizeCheckpoint();
+				switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState())) {
+					case SUCCESS:
+						if (checkpoint.isFullyAcknowledged()) {
+							completed = checkpoint.finalizeCheckpoint();
 
-						completedCheckpointStore.addCheckpoint(completed);
+							completedCheckpointStore.addCheckpoint(completed);
 
-						LOG.info("Completed checkpoint " + checkpointId + " (in " +
+							LOG.info("Completed checkpoint " + checkpointId + " (in " +
 								completed.getDuration() + " ms)");
 
-						if (LOG.isDebugEnabled()) {
-							StringBuilder builder = new StringBuilder();
-							for (Map.Entry<JobVertexID, TaskState> entry : completed.getTaskStates().entrySet()) {
-								builder.append("JobVertexID: ").append(entry.getKey()).append(" {").append(entry.getValue()).append("}");
-							}
+							if (LOG.isDebugEnabled()) {
+								StringBuilder builder = new StringBuilder();
+								for (Map.Entry<JobVertexID, TaskState> entry : completed.getTaskStates().entrySet()) {
+									builder.append("JobVertexID: ").append(entry.getKey()).append(" {").append(entry.getValue()).append("}");
+								}
 
-							LOG.debug(builder.toString());
-						}
+								LOG.debug(builder.toString());
+							}
 
-						pendingCheckpoints.remove(checkpointId);
-						rememberRecentCheckpointId(checkpointId);
+							pendingCheckpoints.remove(checkpointId);
+							rememberRecentCheckpointId(checkpointId);
 
-						dropSubsumedCheckpoints(completed.getCheckpointID());
+							dropSubsumedCheckpoints(completed.getCheckpointID());
 
-						triggerQueuedRequests();
-					}
-				} else {
-					// checkpoint did not accept message
-					LOG.error("Received duplicate or invalid acknowledge message for checkpoint {} , task {}",
-							checkpointId, message.getTaskExecutionId());
+							triggerQueuedRequests();
+						}
+						break;
+					case DUPLICATE:
+						LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}.",
+							message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
+						break;
+					case UNKNOWN:
+						LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
+								"because the task's execution attempt id was unknown. Discarding " +
+								"the state handle to avoid lingering state.", message.getCheckpointId(),
+							message.getTaskExecutionId(), message.getJob());
+
+						try {
+							message.getSubtaskState().discardState();
+						} catch (Exception e) {
+							LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.",
+								message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
+						}
+						break;
+					case DISCARDED:
+						LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
+							"because the pending checkpoint had been discarded. Discarding the " +
+								"state handle tp avoid lingering state.",
+							message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
+
+						try {
+							message.getSubtaskState().discardState();
+						} catch (Exception e) {
+							LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.",
+								message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
+						}
 				}
 			}
 			else if (checkpoint != null) {
@@ -678,11 +705,20 @@ public class CheckpointCoordinator {
 				// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
 				if (recentPendingCheckpoints.contains(checkpointId)) {
 					isPendingCheckpoint = true;
-					LOG.warn("Received late message for now expired checkpoint attempt " + checkpointId);
+					LOG.warn("Received late message for now expired checkpoint attempt {}.", checkpointId);
 				}
 				else {
+					LOG.debug("Received message for an unknown checkpoint {}.", checkpointId);
 					isPendingCheckpoint = false;
 				}
+
+				try {
+					// try to discard the state so that we don't have lingering state lying around
+					message.getSubtaskState().discardState();
+				} catch (Exception e) {
+					LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.",
+						message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
+				}
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/72b295b3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 43a2557..5034502 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -35,7 +35,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -64,6 +66,9 @@ public class PendingCheckpoint {
 
 	private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;
 
+	/** Set of acknowledged tasks */
+	private final Set<ExecutionAttemptID> acknowledgedTasks;
+
 	/** Flag indicating whether the checkpoint is triggered as part of periodic scheduling. */
 	private final boolean isPeriodic;
 
@@ -109,6 +114,8 @@ public class PendingCheckpoint {
 
 		checkArgument(verticesToConfirm.size() > 0,
 				"Checkpoint needs at least one vertex that commits the checkpoint");
+
+		acknowledgedTasks = new HashSet<>(verticesToConfirm.size());
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -228,24 +235,37 @@ public class PendingCheckpoint {
 			}
 		}
 	}
-	
-	public boolean acknowledgeTask(
-			ExecutionAttemptID attemptID,
-			SubtaskState checkpointedSubtaskState) {
+
+	/**
+	 * Acknowledges the task with the given execution attempt id and the given subtask state.
+	 *
+	 * @param executionAttemptId of the acknowledged task
+	 * @param subtaskState of the acknowledged task
+	 * @return TaskAcknowledgeResult of the operation
+	 */
+	public TaskAcknowledgeResult acknowledgeTask(
+			ExecutionAttemptID executionAttemptId,
+			SubtaskState subtaskState) {
 
 		synchronized (lock) {
 
 			if (discarded) {
-				return false;
+				return TaskAcknowledgeResult.DISCARDED;
 			}
 
-			final ExecutionVertex vertex = notYetAcknowledgedTasks.remove(attemptID);
+			final ExecutionVertex vertex = notYetAcknowledgedTasks.remove(executionAttemptId);
 
 			if (vertex == null) {
-				return false;
+				if (acknowledgedTasks.contains(executionAttemptId)) {
+					return TaskAcknowledgeResult.DUPLICATE;
+				} else {
+					return TaskAcknowledgeResult.UNKNOWN;
+				}
+			} else {
+				acknowledgedTasks.add(executionAttemptId);
 			}
 
-			if (null != checkpointedSubtaskState) {
+			if (null != subtaskState) {
 
 				JobVertexID jobVertexID = vertex.getJobvertexId();
 				int subtaskIndex = vertex.getParallelSubtaskIndex();
@@ -253,9 +273,9 @@ public class PendingCheckpoint {
 
 				if (null == taskState) {
 					ChainedStateHandle<StreamStateHandle> nonPartitionedState =
-							checkpointedSubtaskState.getLegacyOperatorState();
+							subtaskState.getLegacyOperatorState();
 					ChainedStateHandle<OperatorStateHandle> partitioneableState =
-							checkpointedSubtaskState.getManagedOperatorState();
+							subtaskState.getManagedOperatorState();
 					//TODO this should go away when we remove chained state, assigning state to operators directly instead
 					int chainLength;
 					if (nonPartitionedState != null) {
@@ -276,17 +296,27 @@ public class PendingCheckpoint {
 				}
 
 				long duration = System.currentTimeMillis() - checkpointTimestamp;
-				checkpointedSubtaskState.setDuration(duration);
+				subtaskState.setDuration(duration);
 
-				taskState.putState(subtaskIndex, checkpointedSubtaskState);
+				taskState.putState(subtaskIndex, subtaskState);
 			}
 
 			++numAcknowledgedTasks;
 
-			return true;
+			return TaskAcknowledgeResult.SUCCESS;
 		}
 	}
 
+	/**
+	 * Result of the {@link PendingCheckpoint#acknowledgedTasks} method.
+	 */
+	public enum TaskAcknowledgeResult {
+		SUCCESS, // successful acknowledge of the task
+		DUPLICATE, // acknowledge message is a duplicate
+		UNKNOWN, // unknown task acknowledged
+		DISCARDED // pending checkpoint has been discarded
+	}
+
 	// ------------------------------------------------------------------------
 	//  Cancellation
 	// ------------------------------------------------------------------------
@@ -350,6 +380,7 @@ public class PendingCheckpoint {
 			} finally {
 				taskStates.clear();
 				notYetAcknowledgedTasks.clear();
+				acknowledgedTasks.clear();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/72b295b3/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 5c50c02..a59ffa2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -84,6 +84,7 @@ import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -996,6 +997,112 @@ public class CheckpointCoordinatorTest {
 		}
 	}
 
+	/**
+	 * Tests that late acknowledge checkpoint messages are properly cleaned up. Furthermore it tests
+	 * that unknown checkpoint messages for the same job a are cleaned up as well. In contrast
+	 * checkpointing messages from other jobs should not be touched. A late acknowledge
+	 * message is an acknowledge message which arrives after the checkpoint has been declined.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testStateCleanupForLateOrUnknownMessages() throws Exception {
+		final JobID jobId = new JobID();
+
+		final ExecutionAttemptID triggerAttemptId = new ExecutionAttemptID();
+		final ExecutionVertex triggerVertex = mockExecutionVertex(triggerAttemptId);
+
+		final ExecutionAttemptID ackAttemptId1 = new ExecutionAttemptID();
+		final ExecutionVertex ackVertex1 = mockExecutionVertex(ackAttemptId1);
+
+		final ExecutionAttemptID ackAttemptId2 = new ExecutionAttemptID();
+		final ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptId2);
+
+		final long timestamp = 1L;
+
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+			jobId,
+			20000L,
+			20000L,
+			0L,
+			1,
+			ExternalizedCheckpointSettings.none(),
+			new ExecutionVertex[] { triggerVertex },
+			new ExecutionVertex[] {triggerVertex, ackVertex1, ackVertex2},
+			new ExecutionVertex[0],
+			new StandaloneCheckpointIDCounter(),
+			new StandaloneCompletedCheckpointStore(1),
+			null,
+			new DisabledCheckpointStatsTracker());
+
+		assertTrue(coord.triggerCheckpoint(timestamp, false));
+
+		assertEquals(1, coord.getNumberOfPendingCheckpoints());
+
+		PendingCheckpoint pendingCheckpoint = coord.getPendingCheckpoints().values().iterator().next();
+
+		long checkpointId = pendingCheckpoint.getCheckpointId();
+
+		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
+
+		SubtaskState triggerSubtaskState = mock(SubtaskState.class);
+
+		// acknowledge the first trigger vertex
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointMetaData, triggerSubtaskState));
+
+		SubtaskState unknownSubtaskState = mock(SubtaskState.class);
+
+		// receive an acknowledge message for an unknown vertex
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointMetaData, unknownSubtaskState));
+
+		// we should discard acknowledge messages from an unknown vertex belonging to our job
+		verify(unknownSubtaskState, times(1)).discardState();
+
+		SubtaskState differentJobSubtaskState = mock(SubtaskState.class);
+
+		// receive an acknowledge message from an unknown job
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointMetaData, differentJobSubtaskState));
+
+		// we should not interfere with different jobs
+		verify(differentJobSubtaskState, never()).discardState();
+
+		// duplicate acknowledge message for the trigger vertex
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointMetaData, triggerSubtaskState));
+
+		// duplicate acknowledge messages for a known vertex should not trigger discarding the state
+		verify(triggerSubtaskState, never()).discardState();
+
+		// let the checkpoint fail at the first ack vertex
+		coord.receiveDeclineMessage(new DeclineCheckpoint(jobId, ackAttemptId1, checkpointId));
+
+		assertTrue(pendingCheckpoint.isDiscarded());
+
+		// check that we've cleaned up the already acknowledged state
+		verify(triggerSubtaskState, times(1)).discardState();
+
+		SubtaskState ackSubtaskState = mock(SubtaskState.class);
+
+		// late acknowledge message from the second ack vertex
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptId2, checkpointMetaData, ackSubtaskState));
+
+		// check that we also cleaned up this state
+		verify(ackSubtaskState, times(1)).discardState();
+
+		// receive an acknowledge message from an unknown job
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointMetaData, differentJobSubtaskState));
+
+		// we should not interfere with different jobs
+		verify(differentJobSubtaskState, never()).discardState();
+
+		SubtaskState unknownSubtaskState2 = mock(SubtaskState.class);
+
+		// receive an acknowledge message for an unknown vertex
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointMetaData, unknownSubtaskState2));
+
+		// we should discard acknowledge messages from an unknown vertex belonging to our job
+		verify(unknownSubtaskState2, times(1)).discardState();
+	}
+
 	@Test
 	public void testPeriodicTriggering() {
 		try {