You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/16 23:36:43 UTC

[1/2] flink git commit: [FLINK-5057] [taskmanager] Read cancellation timeout from task manager config

Repository: flink
Updated Branches:
  refs/heads/release-1.1 723ce7256 -> 4daf3bbc1


[FLINK-5057] [taskmanager] Read cancellation timeout from task manager config

This closes #2794


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad3e674b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad3e674b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad3e674b

Branch: refs/heads/release-1.1
Commit: ad3e674b26fb5766ffefda653701af5180d60413
Parents: 723ce72
Author: Ufuk Celebi <uc...@apache.org>
Authored: Sat Nov 12 20:19:15 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 16 21:26:30 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/runtime/taskmanager/Task.java     |  5 +++--
 .../apache/flink/runtime/taskmanager/TaskStopTest.java |  7 +++++--
 .../org/apache/flink/runtime/taskmanager/TaskTest.java | 13 +++++++------
 3 files changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ad3e674b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 56aea1b..514a8d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -287,11 +287,12 @@ public class Task implements Runnable {
 		this.nameOfInvokableClass = taskInformation.getInvokableClassName();
 		this.operatorState = operatorState;
 
-		this.taskCancellationInterval = taskConfiguration.getLong(
+		Configuration tmConfig = taskManagerConfig.getConfiguration();
+		this.taskCancellationInterval = tmConfig.getLong(
 				ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS,
 				ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS);
 
-		this.taskCancellationTimeout = taskConfiguration.getLong(
+		this.taskCancellationTimeout = tmConfig.getLong(
 				ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS,
 				ConfigConstants.DEFAULT_TASK_CANCELLATION_TIMEOUT_MILLIS);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad3e674b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index 8c41a9b..028a49a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -47,6 +47,7 @@ import java.lang.reflect.Field;
 import java.util.Collections;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ TaskDeploymentDescriptor.class, JobID.class, FiniteDuration.class })
@@ -54,6 +55,8 @@ public class TaskStopTest {
 	private Task task;
 
 	public void doMocking(AbstractInvokable taskMock) throws Exception {
+		TaskManagerRuntimeInfo tmRuntimeInfo = mock(TaskManagerRuntimeInfo.class);
+			when(tmRuntimeInfo.getConfiguration()).thenReturn(new Configuration());
 
 		task = new Task(
 			mock(JobInformation.class),
@@ -79,7 +82,7 @@ public class TaskStopTest {
 			mock(FiniteDuration.class),
 			mock(LibraryCacheManager.class),
 			mock(FileCache.class),
-			mock(TaskManagerRuntimeInfo.class),
+			tmRuntimeInfo,
 			mock(TaskMetricGroup.class));
 
 		Field f = task.getClass().getDeclaredField("invokable");
@@ -91,7 +94,7 @@ public class TaskStopTest {
 		f2.set(task, ExecutionState.RUNNING);
 	}
 
-	@Test(timeout = 10000)
+	@Test(timeout = 20000)
 	public void testStopExecution() throws Exception {
 		StoppableTestTask taskMock = new StoppableTestTask();
 		doMocking(taskMock);

http://git-wip-us.apache.org/repos/asf/flink/blob/ad3e674b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index b5056ed..7bf73bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -752,10 +752,11 @@ public class TaskTest extends TestLogger {
 		return createTask(invokable, libCache, networkEnvironment, new Configuration(), new ExecutionConfig());
 	}
 
-	private Task createTask(Class<? extends AbstractInvokable> invokable,
-							LibraryCacheManager libCache,
-							NetworkEnvironment networkEnvironment,
-		Configuration taskConfig,
+	private Task createTask(
+		Class<? extends AbstractInvokable> invokable,
+		LibraryCacheManager libCache,
+		NetworkEnvironment networkEnvironment,
+		Configuration taskManagerConfig,
 		ExecutionConfig execConfig) throws IOException {
 		
 		JobID jobId = new JobID();
@@ -777,7 +778,7 @@ public class TaskTest extends TestLogger {
 			"Test Task",
 			1,
 			invokable.getName(),
-			taskConfig);
+			new Configuration());
 		
 		return new Task(
 			jobInformation,
@@ -798,7 +799,7 @@ public class TaskTest extends TestLogger {
 				new FiniteDuration(60, TimeUnit.SECONDS),
 				libCache,
 				mock(FileCache.class),
-				new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
+				new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")),
 				mock(TaskMetricGroup.class));
 	}
 


[2/2] flink git commit: [FLINK-5063] [checkpointing] Discard state handles of declined or expired state handles

Posted by se...@apache.org.
[FLINK-5063] [checkpointing] Discard state handles of declined or expired state handles

Whenever the checkpoint coordinator receives an acknowledge checkpoint message which belongs
to the job maintained by the checkpoint coordinator, it should either record the state handles
for later processing or discard to free the resources. The latter case can happen if a
checkpoint has been expired and late acknowledge checkpoint messages arrive. Furthremore, it
can happen if a Task sent a decline checkpoint message while other Tasks where still drawing
a checkpoint. This PR changes the behaviour such that state handles belonging to the job of
the checkpoint coordinator are discarded if they could not be added to the PendingCheckpoint.

This closes #2813


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4daf3bbc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4daf3bbc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4daf3bbc

Branch: refs/heads/release-1.1
Commit: 4daf3bbc1e0251e1e84d799421dae9e3fa2363fc
Parents: ad3e674
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Nov 14 18:33:55 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 16 21:29:32 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  88 ++++++++----
 .../runtime/checkpoint/PendingCheckpoint.java   | 103 ++++++++------
 .../checkpoint/CheckpointCoordinatorTest.java   | 134 ++++++++++++++++++-
 3 files changed, 256 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4daf3bbc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 8661ddc..5c517b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -674,42 +674,65 @@ public class CheckpointCoordinator {
 			if (checkpoint != null && !checkpoint.isDiscarded()) {
 				isPendingCheckpoint = true;
 
-				if (checkpoint.acknowledgeTask(
-					message.getTaskExecutionId(),
-					message.getState(),
-					message.getStateSize(),
-					null)) { // TODO: Give KV-state to the acknowledgeTask method
-					if (checkpoint.isFullyAcknowledged()) {
-						completed = checkpoint.finalizeCheckpoint();
+				switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), message.getStateSize(), null)) {
+					case SUCCESS:
+						// TODO: Give KV-state to the acknowledgeTask method
+						if (checkpoint.isFullyAcknowledged()) {
+							completed = checkpoint.finalizeCheckpoint();
 
-						completedCheckpointStore.addCheckpoint(completed);
+							completedCheckpointStore.addCheckpoint(completed);
 
-						LOG.info("Completed checkpoint " + checkpointId + " (in " +
+							LOG.info("Completed checkpoint " + checkpointId + " (in " +
 								completed.getDuration() + " ms)");
 
-						if (LOG.isDebugEnabled()) {
-							StringBuilder builder = new StringBuilder();
-							for (Map.Entry<JobVertexID, TaskState> entry: completed.getTaskStates().entrySet()) {
-								builder.append("JobVertexID: ").append(entry.getKey()).append(" {").append(entry.getValue()).append("}");
-							}
+							if (LOG.isDebugEnabled()) {
+								StringBuilder builder = new StringBuilder();
+								for (Map.Entry<JobVertexID, TaskState> entry : completed.getTaskStates().entrySet()) {
+									builder.append("JobVertexID: ").append(entry.getKey()).append(" {").append(entry.getValue()).append("}");
+								}
 
-							LOG.debug(builder.toString());
-						}
+								LOG.debug(builder.toString());
+							}
 
-						pendingCheckpoints.remove(checkpointId);
-						rememberRecentCheckpointId(checkpointId);
+							pendingCheckpoints.remove(checkpointId);
+							rememberRecentCheckpointId(checkpointId);
 
-						dropSubsumedCheckpoints(completed.getTimestamp());
+							dropSubsumedCheckpoints(completed.getTimestamp());
 
-						onFullyAcknowledgedCheckpoint(completed);
+							onFullyAcknowledgedCheckpoint(completed);
 
-						triggerQueuedRequests();
-					}
-				}
-				else {
-					// checkpoint did not accept message
-					LOG.error("Received duplicate or invalid acknowledge message for checkpoint " + checkpointId
-							+ " , task " + message.getTaskExecutionId());
+							triggerQueuedRequests();
+						}
+						break;
+					case DUPLICATE:
+						LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}.",
+							message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
+						break;
+					case UNKNOWN:
+						LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
+								"because the task's execution attempt id was unknown. Discarding " +
+								"the state handle to avoid lingering state.", message.getCheckpointId(),
+							message.getTaskExecutionId(), message.getJob());
+
+						try {
+							message.getState().deserializeValue(userClassLoader).discardState();
+						} catch (Exception e) {
+							LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.",
+								message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
+						}
+						break;
+					case DISCARDED:
+						LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
+							"because the pending checkpoint had been discarded. Discarding the " +
+								"state handle tp avoid lingering state.",
+							message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
+
+						try {
+							message.getState().deserializeValue(userClassLoader).discardState();
+						} catch (Exception e) {
+							LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.",
+								message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
+						}
 				}
 			}
 			else if (checkpoint != null) {
@@ -721,11 +744,20 @@ public class CheckpointCoordinator {
 				// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
 				if (recentPendingCheckpoints.contains(checkpointId)) {
 					isPendingCheckpoint = true;
-					LOG.warn("Received late message for now expired checkpoint attempt " + checkpointId);
+					LOG.warn("Received late message for now expired checkpoint attempt {}.", checkpointId);
 				}
 				else {
+					LOG.debug("Received message for an unknown checkpoint {}.", checkpointId);
 					isPendingCheckpoint = false;
 				}
+
+				try {
+					// try to discard the state so that we don't have lingering state lying around
+					message.getState().deserializeValue(userClassLoader).discardState();
+				} catch (Exception e) {
+					LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.",
+						message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
+				}
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4daf3bbc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index cd1f6c4..e81bb09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -27,6 +27,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.SerializedValue;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * A pending checkpoint is a checkpoint that has been started, but has not been
@@ -50,6 +52,9 @@ public class PendingCheckpoint {
 
 	private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;
 	
+	/** Set of acknowledged tasks */
+	private final Set<ExecutionAttemptID> acknowledgedTasks;
+
 	private int numAcknowledgedTasks;
 	
 	private boolean discarded;
@@ -72,6 +77,8 @@ public class PendingCheckpoint {
 		
 		this.notYetAcknowledgedTasks = verticesToConfirm;
 		this.taskStates = new HashMap<>();
+
+		acknowledgedTasks = new HashSet<>(verticesToConfirm.size());
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -131,65 +138,82 @@ public class PendingCheckpoint {
 		}
 	}
 	
-	public boolean acknowledgeTask(
-			ExecutionAttemptID attemptID,
+	public TaskAcknowledgeResult acknowledgeTask(
+			ExecutionAttemptID executionAttemptId,
 			SerializedValue<StateHandle<?>> state,
 			long stateSize,
 			Map<Integer, SerializedValue<StateHandle<?>>> kvState) {
 
 		synchronized (lock) {
 			if (discarded) {
-				return false;
+				return TaskAcknowledgeResult.DISCARDED;
 			}
 			
-			ExecutionVertex vertex = notYetAcknowledgedTasks.remove(attemptID);
-			if (vertex != null) {
-				if (state != null || kvState != null) {
+			final ExecutionVertex vertex = notYetAcknowledgedTasks.remove(executionAttemptId);
+
+			if (vertex == null) {
+				if (acknowledgedTasks.contains(executionAttemptId)) {
+					return TaskAcknowledgeResult.DUPLICATE;
+				} else {
+					return TaskAcknowledgeResult.UNKNOWN;
+				}
+			} else {
+				acknowledgedTasks.add(executionAttemptId);
+			}
 
-					JobVertexID jobVertexID = vertex.getJobvertexId();
 
-					TaskState taskState;
+			if (state != null || kvState != null) {
+				JobVertexID jobVertexID = vertex.getJobvertexId();
 
-					if (taskStates.containsKey(jobVertexID)) {
-						taskState = taskStates.get(jobVertexID);
-					} else {
-						taskState = new TaskState(jobVertexID, vertex.getTotalNumberOfParallelSubtasks());
-						taskStates.put(jobVertexID, taskState);
-					}
+				TaskState taskState;
+
+				if (taskStates.containsKey(jobVertexID)) {
+					taskState = taskStates.get(jobVertexID);
+				} else {
+					taskState = new TaskState(jobVertexID, vertex.getTotalNumberOfParallelSubtasks());
+					taskStates.put(jobVertexID, taskState);
+				}
 
-					long timestamp = System.currentTimeMillis() - checkpointTimestamp;
+				long timestamp = System.currentTimeMillis() - checkpointTimestamp;
+
+				if (state != null) {
+					taskState.putState(
+						vertex.getParallelSubtaskIndex(),
+						new SubtaskState(
+							state,
+							stateSize,
+							timestamp
+						)
+					);
+				}
 
-					if (state != null) {
-						taskState.putState(
-							vertex.getParallelSubtaskIndex(),
-							new SubtaskState(
-								state,
-								stateSize,
+				if (kvState != null) {
+					for (Map.Entry<Integer, SerializedValue<StateHandle<?>>> entry : kvState.entrySet()) {
+						taskState.putKvState(
+							entry.getKey(),
+							new KeyGroupState(
+								entry.getValue(),
+								0L,
 								timestamp
-							)
-						);
-					}
+							));
 
-					if (kvState != null) {
-						for (Map.Entry<Integer, SerializedValue<StateHandle<?>>> entry : kvState.entrySet()) {
-							taskState.putKvState(
-								entry.getKey(),
-								new KeyGroupState(
-									entry.getValue(),
-									0L,
-									timestamp
-								));
-						}
 					}
 				}
-				numAcknowledgedTasks++;
-				return true;
-			}
-			else {
-				return false;
 			}
+			numAcknowledgedTasks++;
+			return TaskAcknowledgeResult.SUCCESS;
 		}
 	}
+
+	/**
+	 * Result of the {@link PendingCheckpoint#acknowledgedTasks} method.
+	 */
+	public enum TaskAcknowledgeResult {
+		SUCCESS, // successful acknowledge of the task
+		DUPLICATE, // acknowledge message is a duplicate
+		UNKNOWN, // unknown task acknowledged
+		DISCARDED // pending checkpoint has been discarded
+	}
 	
 	/**
 	 * Discards the pending checkpoint, releasing all held resources.
@@ -211,6 +235,7 @@ public class PendingCheckpoint {
 			} finally {
 				taskStates.clear();
 				notYetAcknowledgedTasks.clear();
+				acknowledgedTasks.clear();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4daf3bbc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index f3f988a..f05b5d2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -30,6 +30,8 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -49,8 +51,10 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -954,6 +958,118 @@ public class CheckpointCoordinatorTest {
 		}
 	}
 
+	/**
+	 * Tests that late acknowledge checkpoint messages are properly cleaned up. Furthermore it tests
+	 * that unknown checkpoint messages for the same job a are cleaned up as well. In contrast
+	 * checkpointing messages from other jobs should not be touched. A late acknowledge
+	 * message is an acknowledge message which arrives after the checkpoint has been declined.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testStateCleanupForLateOrUnknownMessages() throws Exception {
+		final JobID jobId = new JobID();
+
+		final ExecutionAttemptID triggerAttemptId = new ExecutionAttemptID();
+		final ExecutionVertex triggerVertex = mockExecutionVertex(triggerAttemptId);
+
+		final ExecutionAttemptID ackAttemptId1 = new ExecutionAttemptID();
+		final ExecutionVertex ackVertex1 = mockExecutionVertex(ackAttemptId1);
+
+		final ExecutionAttemptID ackAttemptId2 = new ExecutionAttemptID();
+		final ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptId2);
+
+		final long timestamp = 1L;
+
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+			jobId,
+			20000L,
+			20000L,
+			42,
+			new ExecutionVertex[] { triggerVertex },
+			new ExecutionVertex[] {triggerVertex, ackVertex1, ackVertex2},
+			new ExecutionVertex[0],
+			cl,
+			new StandaloneCheckpointIDCounter(),
+			new StandaloneCompletedCheckpointStore(2, cl),
+			RecoveryMode.STANDALONE);
+
+		assertTrue(coord.triggerCheckpoint(timestamp));
+
+		assertEquals(1, coord.getNumberOfPendingCheckpoints());
+
+		PendingCheckpoint pendingCheckpoint = coord.getPendingCheckpoints().values().iterator().next();
+
+		long checkpointId = pendingCheckpoint.getCheckpointId();
+
+		StateHandle<?> triggerSubtaskState = mock(StateHandle.class);
+		SerializedValue<StateHandle<?>> triggerSerializedValue = mock(SerializedValue.class);
+		doReturn(triggerSubtaskState).when(triggerSerializedValue).deserializeValue(any(ClassLoader.class));
+
+		// acknowledge the first trigger vertex
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, triggerSerializedValue, 1L));
+
+		StateHandle<?> unknownSubtaskState = mock(StateHandle.class);
+		SerializedValue<StateHandle<?>> unknownSerializedValue = mock(SerializedValue.class);
+		doReturn(unknownSubtaskState).when(unknownSerializedValue).deserializeValue(any(ClassLoader.class));
+
+		// receive an acknowledge message for an unknown vertex
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, unknownSerializedValue, 1L));
+
+		// we should discard acknowledge messages from an unknown vertex belonging to our job
+		verify(unknownSubtaskState, times(1)).discardState();
+
+		StateHandle<?> differentJobSubtaskState = mock(StateHandle.class);
+		SerializedValue<StateHandle<?>> differentJobSerializedValue = mock(SerializedValue.class);
+		doReturn(differentJobSubtaskState).when(differentJobSerializedValue).deserializeValue(any(ClassLoader.class));
+
+		// receive an acknowledge message from an unknown job
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, differentJobSerializedValue, 1L));
+
+		// we should not interfere with different jobs
+		verify(differentJobSubtaskState, never()).discardState();
+
+		// duplicate acknowledge message for the trigger vertex
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, triggerSerializedValue, 1L));
+
+		// duplicate acknowledge messages for a known vertex should not trigger discarding the state
+		verify(triggerSubtaskState, never()).discardState();
+
+		// let the checkpoint fail at the first ack vertex
+		coord.receiveDeclineMessage(new DeclineCheckpoint(jobId, ackAttemptId1, checkpointId));
+
+		assertTrue(pendingCheckpoint.isDiscarded());
+
+		// check that we've cleaned up the already acknowledged state
+		verify(triggerSubtaskState, times(1)).discardState();
+
+		StateHandle<?> ackSubtaskState = mock(StateHandle.class);
+		SerializedValue<StateHandle<?>> ackSerializedValue = mock(SerializedValue.class);
+		doReturn(ackSubtaskState).when(ackSerializedValue).deserializeValue(any(ClassLoader.class));
+
+		// late acknowledge message from the second ack vertex
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptId2, checkpointId, ackSerializedValue, 1L));
+
+		// check that we also cleaned up this state
+		verify(ackSubtaskState, times(1)).discardState();
+
+		// receive an acknowledge message from an unknown job
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, differentJobSerializedValue, 1L));
+
+		// we should not interfere with different jobs
+		verify(differentJobSubtaskState, never()).discardState();
+
+		StateHandle<?> unknownSubtaskState2 = mock(StateHandle.class);
+		SerializedValue<StateHandle<?>> unknownSerializedValue2 = mock(SerializedValue.class);
+		doReturn(unknownSubtaskState2).when(unknownSerializedValue2).deserializeValue(any(ClassLoader.class));
+
+		// receive an acknowledge message for an unknown vertex
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, unknownSerializedValue2, 1L));
+
+		// we should discard acknowledge messages from an unknown vertex belonging to our job
+		verify(unknownSubtaskState2, times(1)).discardState();
+	}
+
 	@Test
 	public void testPeriodicTriggering() {
 		try {
@@ -1375,18 +1491,32 @@ public class CheckpointCoordinatorTest {
 	// ------------------------------------------------------------------------
 
 	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) {
-		return mockExecutionVertex(attemptID, ExecutionState.RUNNING);
+		return mockExecutionVertex(attemptID, 1);
+	}
+
+	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptId, int parallelism) {
+		return mockExecutionVertex(attemptId, ExecutionState.RUNNING, parallelism);
 	}
 
 	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID, 
 														ExecutionState state, ExecutionState ... successiveStates) {
+		return mockExecutionVertex(attemptID, state, 1, successiveStates);
+	}
+
+	private static ExecutionVertex mockExecutionVertex(
+			ExecutionAttemptID attemptId,
+			ExecutionState state,
+			int parallelism,
+			ExecutionState ... successiveStates) {
+
 		final Execution exec = mock(Execution.class);
-		when(exec.getAttemptId()).thenReturn(attemptID);
+		when(exec.getAttemptId()).thenReturn(attemptId);
 		when(exec.getState()).thenReturn(state, successiveStates);
 
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
 		when(vertex.getJobvertexId()).thenReturn(new JobVertexID());
 		when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
+		when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(parallelism);
 
 		return vertex;
 	}