You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/04/29 11:55:45 UTC

[5/5] flink git commit: [FLINK-1953] [runtime] Implement robust and flexible checkpoint coordinator with tests.

[FLINK-1953] [runtime] Implement robust and flexible checkpoint coordinator with tests.

 - Checkpoints can be configured to have different sets of tasks
   that triggering the checkpoint barriers, that acknowledging the checkpoint,
   and that require checkpoint confirmations.

 - A configurable number of successful chckpoints can be retained

 - Checkpoints time out after a certain time, if not acknowledged (prevent resource leaks)

 - Checkpoints are robust to lost messages and out of order acknowledging.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7f0ce142
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7f0ce142
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7f0ce142

Branch: refs/heads/master
Commit: 7f0ce1428bc32181d6d79ca6f1226b9e2e3d93be
Parents: b1af2df
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Apr 27 22:14:29 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 29 10:47:21 2015 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 380 ++++++++++++
 .../runtime/checkpoint/PendingCheckpoint.java   | 163 +++++
 .../flink/runtime/checkpoint/StateForTask.java  |  90 +++
 .../checkpoint/SuccessfulCheckpoint.java        |  75 +++
 .../runtime/executiongraph/ExecutionVertex.java |  25 +
 .../messages/checkpoint/AbortCheckpoint.java    |  49 ++
 .../checkpoint/AbstractCheckpointMessage.java   |  91 +++
 .../checkpoint/AcknowledgeCheckpoint.java       |  73 +++
 .../messages/checkpoint/ConfirmCheckpoint.java  |  49 ++
 .../messages/checkpoint/TriggerCheckpoint.java  |  73 +++
 .../messages/checkpoint/package-info.java       |  24 +
 .../flink/runtime/messages/package-info.java    |  24 +
 .../checkpoint/CheckpointCoordinatorTest.java   | 620 +++++++++++++++++++
 .../messages/CheckpointMessagesTest.java        | 103 +++
 14 files changed, 1839 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
new file mode 100644
index 0000000..9647ca4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.ConfirmCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ *
+ */
+public class CheckpointCoordinator {
+	
+	private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
+	
+	/** The number of recent checkpoints whose IDs are remembered */
+	private static final int NUM_GHOST_CHECKPOINT_IDS = 16;
+	
+	
+	/** Coordinator-wide lock to safeguard the checkpoint updates */
+	private final Object lock = new Object();
+	
+	/** The job whose checkpoint this coordinator coordinates */
+	private final JobID job;
+	
+	/** Tasks who need to be sent a message when a checkpoint is started */
+	private final ExecutionVertex[] tasksToTrigger;
+
+	/** Tasks who need to acknowledge a checkpoint before it succeeds */
+	private final ExecutionVertex[] tasksToWaitFor;
+	
+	/** Tasks who need to be sent a message when a checkpoint is confirmed */
+	private final ExecutionVertex[] tasksToCommitTo;
+
+	private final Map<Long, PendingCheckpoint> pendingCheckpoints;
+	
+	private final ArrayDeque<SuccessfulCheckpoint> completedCheckpoints;
+	
+	private final ArrayDeque<Long> recentPendingCheckpoints;
+
+	private final AtomicLong checkpointIdCounter = new AtomicLong(1);
+
+	private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger();
+
+	/** The timer that processes the checkpoint timeouts */
+	private final Timer timeoutTimer;
+	
+	private final long checkpointTimeout;
+	
+	private final int numSuccessfulCheckpointsToRetain;
+	
+	private boolean shutdown;
+	
+	// --------------------------------------------------------------------------------------------
+
+	public CheckpointCoordinator(JobID job, int numSuccessfulCheckpointsToRetain, long checkpointTimeout,
+								ExecutionVertex[] tasksToTrigger,
+								ExecutionVertex[] tasksToWaitFor,
+								ExecutionVertex[] tasksToCommitTo) {
+		
+		// some sanity checks
+		if (job == null || tasksToTrigger == null ||
+				tasksToWaitFor == null || tasksToCommitTo == null) {
+			throw new NullPointerException();
+		}
+		if (numSuccessfulCheckpointsToRetain < 1) {
+			throw new IllegalArgumentException("Must retain at least one successful checkpoint");
+		}
+		if (checkpointTimeout < 1) {
+			throw new IllegalArgumentException("Checkpoint timeout must be larger than zero");
+		}
+		
+		this.job = job;
+		this.numSuccessfulCheckpointsToRetain = numSuccessfulCheckpointsToRetain;
+		this.checkpointTimeout = checkpointTimeout;
+		this.tasksToTrigger = tasksToTrigger;
+		this.tasksToWaitFor = tasksToWaitFor;
+		this.tasksToCommitTo = tasksToCommitTo;
+		this.pendingCheckpoints = new LinkedHashMap<Long, PendingCheckpoint>();
+		this.completedCheckpoints = new ArrayDeque<SuccessfulCheckpoint>(numSuccessfulCheckpointsToRetain + 1);
+		this.recentPendingCheckpoints = new ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS);
+
+		timeoutTimer = new Timer("Checkpoint Timeout Handler", true);
+	}
+
+	/**
+	 * Shuts down the checkpoint coordinator.
+	 * 
+	 * After this method has been called, the coordinator does not accept and further
+	 * messages and cannot trigger any further checkpoints.
+	 */
+	public void shutdown() {
+		synchronized (lock) {
+			if (shutdown) {
+				return;
+			}
+			shutdown = true;
+			
+			// shut down the thread that handles the timeouts
+			timeoutTimer.cancel();
+			
+			// clear and discard all pending checkpoints
+			for (PendingCheckpoint pending : pendingCheckpoints.values()) {
+				pending.discard();
+			}
+			pendingCheckpoints.clear();
+			
+			// clean and discard all successful checkpoints
+			for (SuccessfulCheckpoint checkpoint : completedCheckpoints) {
+				checkpoint.dispose();
+			}
+			completedCheckpoints.clear();
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Handling checkpoints and messages
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Triggers a new checkpoint and uses the current system time as the
+	 * checkpoint time.
+	 */
+	public void triggerCheckpoint() {
+		triggerCheckpoint(System.currentTimeMillis());
+	}
+
+	/**
+	 * Triggers a new checkpoint and uses the given timestamp as the checkpoint
+	 * timestamp.
+	 * 
+	 * @param timestamp The timestamp for the checkpoint.
+	 */
+	public boolean triggerCheckpoint(final long timestamp) {
+		if (shutdown) {
+			LOG.error("Cannot trigger checkpoint, checkpoint coordinator has been shutdown.");
+			return false;
+		}
+		
+		final long checkpointID = checkpointIdCounter.getAndIncrement();
+		LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
+		
+		try {
+			// first check if all tasks that we need to trigger are running.
+			// if not, abort the checkpoint
+			ExecutionAttemptID[] triggerIDs = new ExecutionAttemptID[tasksToTrigger.length];
+			for (int i = 0; i < tasksToTrigger.length; i++) {
+				Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
+				if (ee != null) {
+					triggerIDs[i] = ee.getAttemptId();
+				} else {
+					LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
+							tasksToTrigger[i].getSimpleName());
+					return false;
+				}
+			}
+
+			// next, check if all tasks that need to acknowledge the checkpoint are running.
+			// if not, abort the checkpoint
+			Map<ExecutionAttemptID, ExecutionVertex> ackTasks =
+								new HashMap<ExecutionAttemptID, ExecutionVertex>(tasksToWaitFor.length);
+
+			for (ExecutionVertex ev : tasksToWaitFor) {
+				Execution ee = ev.getCurrentExecutionAttempt();
+				if (ee != null) {
+					ackTasks.put(ee.getAttemptId(), ev);
+				} else {
+					LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
+							ev.getSimpleName());
+					return false;
+				}
+			}
+			
+			// register a new pending checkpoint. this makes sure we can properly receive acknowledgements
+			final PendingCheckpoint checkpoint = new PendingCheckpoint(job, checkpointID, timestamp, ackTasks);
+
+			// schedule the timer that will clean up the expired checkpoints
+			TimerTask canceller = new TimerTask() {
+				@Override
+				public void run() {
+					try {
+						synchronized (lock) {
+							// only do the work if the checkpoint is not discarded anyways
+							// note that checkpoint completion discards the pending checkpoint object
+							if (!checkpoint.isDiscarded()) {
+								LOG.info("Checkpoint " + checkpointID + " expired before completing.");
+								checkpoint.discard();
+								pendingCheckpoints.remove(checkpointID);
+								rememberRecentCheckpointId(checkpointID);
+							}
+						}
+					}
+					catch (Throwable t) {
+						LOG.error("Exception while handling checkpoint timeout", t);
+					}
+				}
+			};
+			
+			synchronized (lock) {
+				if (shutdown) {
+					throw new IllegalStateException("Checkpoint coordinator has been shutdown.");
+				}
+				pendingCheckpoints.put(checkpointID, checkpoint);
+				timeoutTimer.schedule(canceller, checkpointTimeout);
+			}
+
+			// send the messages to the tasks that trigger their checkpoint
+			for (int i = 0; i < tasksToTrigger.length; i++) {
+				ExecutionAttemptID id = triggerIDs[i];
+				TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp);
+				tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
+			}
+			
+			numUnsuccessfulCheckpointsTriggers.set(0);
+			return true;
+		}
+		catch (Throwable t) {
+			int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
+			LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
+			
+			synchronized (lock) {
+				pendingCheckpoints.remove(checkpointID);
+			}
+			
+			return false;
+		}
+	}
+	
+	public void receiveAcknowledgeMessage(AcknowledgeCheckpoint message) {
+		if (shutdown || message == null) {
+			return;
+		}
+		if (!job.equals(message.getJob())) {
+			LOG.error("Received AcknowledgeCheckpoint message for wrong job: {}", message);
+			return;
+		}
+		
+		final long checkpointId = message.getCheckpointId();
+		boolean checkpointCompleted = false;
+		
+		synchronized (lock) {
+			// we need to check inside the lock for being shutdown as well, otherwise we
+			// get races and invalid error log messages
+			if (shutdown) {
+				return;
+			}
+			
+			PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
+			if (checkpoint != null && !checkpoint.isDiscarded()) {
+				if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState())) {
+					
+					if (checkpoint.isFullyAcknowledged()) {
+						LOG.info("Completed checkpoint " + checkpointId);
+
+						SuccessfulCheckpoint completed = checkpoint.toCompletedCheckpoint();
+						completedCheckpoints.addLast(completed);
+						if (completedCheckpoints.size() > numSuccessfulCheckpointsToRetain) {
+							completedCheckpoints.removeFirst();
+						}
+						pendingCheckpoints.remove(checkpointId);
+						rememberRecentCheckpointId(checkpointId);
+						
+						dropSubsumedCheckpoints(completed.getTimestamp());
+						
+						checkpointCompleted = true;
+					}
+				}
+				else {
+					// checkpoint did not accept message
+					LOG.error("Received duplicate or invalid acknowledge message for checkpoint " + checkpointId
+							+ " , task " + message.getTaskExecutionId());
+				}
+			}
+			else if (checkpoint != null) {
+				// this should not happen
+				throw new IllegalStateException(
+						"Received message for discarded but non-removed checkpoint " + checkpointId);
+			}
+			else {
+				// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
+				if (recentPendingCheckpoints.contains(checkpointId)) {
+					LOG.warn("Received late message for now expired checkpoint attempt " + checkpointId);
+				}
+				else {
+					LOG.info("Received message for non-existing checkpoint " + checkpointId);
+				}
+			}
+		}
+		
+		// send the confirmation messages to the necessary targets. we do this here
+		// to be outside the lock scope
+		if (checkpointCompleted) {
+			for (ExecutionVertex ev : tasksToCommitTo) {
+				Execution ee = ev.getCurrentExecutionAttempt();
+				if (ee != null) {
+					ExecutionAttemptID attemptId = ee.getAttemptId();
+					ConfirmCheckpoint confirmMessage = new ConfirmCheckpoint(job, attemptId, checkpointId);
+					ev.sendMessageToCurrentExecution(confirmMessage, ee.getAttemptId());
+				}
+			}
+		}
+	}
+	
+	private void rememberRecentCheckpointId(long id) {
+		if (recentPendingCheckpoints.size() >= NUM_GHOST_CHECKPOINT_IDS) {
+			recentPendingCheckpoints.removeFirst();
+		}
+		recentPendingCheckpoints.addLast(id);
+	}
+	
+	private void dropSubsumedCheckpoints(long timestamp) {
+		Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator();
+		while (entries.hasNext()) {
+			PendingCheckpoint p = entries.next().getValue();
+			if (p.getCheckpointTimestamp() < timestamp) {
+				rememberRecentCheckpointId(p.getCheckpointId());
+				p.discard();
+				entries.remove();
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Accessors
+	// --------------------------------------------------------------------------------------------
+	
+	public int getNumberOfPendingCheckpoints() {
+		return this.pendingCheckpoints.size();
+	}
+
+	public int getNumberOfRetainedSuccessfulCheckpoints() {
+		return this.completedCheckpoints.size();
+	}
+
+	public Map<Long, PendingCheckpoint> getPendingCheckpoints() {
+		synchronized (lock) {
+			return new HashMap<Long, PendingCheckpoint>(this.pendingCheckpoints);
+		}
+	}
+	
+	public List<SuccessfulCheckpoint> getSuccessfulCheckpoints() {
+		synchronized (lock) {
+			return new ArrayList<SuccessfulCheckpoint>(this.completedCheckpoints);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
new file mode 100644
index 0000000..e221238
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.state.StateHandle;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A pending checkpoint is a checkpoint that has been started, but has not been
+ * acknowledged by all tasks that need to acknowledge it. Once all tasks have
+ * acknowledged it, it becomes a {@link SuccessfulCheckpoint}.
+ */
+public class PendingCheckpoint {
+	
+	private final Object lock = new Object();
+	
+	private final JobID jobId;
+	
+	private final long checkpointId;
+	
+	private final long checkpointTimestamp;
+	
+	private final List<StateForTask> collectedStates;
+	
+	private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;
+	
+	private int numAcknowledgedTasks;
+	
+	private boolean discarded;
+
+	// --------------------------------------------------------------------------------------------
+	
+	public PendingCheckpoint(JobID jobId, long checkpointId, long checkpointTimestamp,
+							Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm)
+	{
+		if (jobId == null || verticesToConfirm == null) {
+			throw new NullPointerException();
+		}
+		if (verticesToConfirm.size() == 0) {
+			throw new IllegalArgumentException("Checkpoint needs at least one vertex that commits the checkpoint");
+		}
+		
+		this.jobId = jobId;
+		this.checkpointId = checkpointId;
+		this.checkpointTimestamp = checkpointTimestamp;
+		
+		this.notYetAcknowledgedTasks = verticesToConfirm;
+		this.collectedStates = new ArrayList<StateForTask>(notYetAcknowledgedTasks.size());
+	}
+	
+	// --------------------------------------------------------------------------------------------
+
+
+	public JobID getJobId() {
+		return jobId;
+	}
+
+	public long getCheckpointId() {
+		return checkpointId;
+	}
+
+	public long getCheckpointTimestamp() {
+		return checkpointTimestamp;
+	}
+
+	public int getNumberOfNonAcknowledgedTasks() {
+		return notYetAcknowledgedTasks.size();
+	}
+	
+	public int getNumberOfAcknowledgedTasks() {
+		return numAcknowledgedTasks;
+	}
+	
+	public boolean isFullyAcknowledged() {
+		return this.notYetAcknowledgedTasks.isEmpty() && !discarded;
+	}
+	
+	public boolean isDiscarded() {
+		return discarded;
+	}
+	
+	public List<StateForTask> getCollectedStates() {
+		return collectedStates;
+	}
+	
+	public SuccessfulCheckpoint toCompletedCheckpoint() {
+		synchronized (lock) {
+			if (discarded) {
+				throw new IllegalStateException("pending checkpoint is discarded");
+			}
+			if (notYetAcknowledgedTasks.isEmpty()) {
+				SuccessfulCheckpoint completed =  new SuccessfulCheckpoint(jobId, checkpointId,
+						checkpointTimestamp, new ArrayList<StateForTask>(collectedStates));
+				discard();
+				return completed;
+			}
+			else {
+				throw new IllegalStateException("Cannot complete checkpoint while nit all tasks are acknowledged");
+			}
+		}
+	}
+	
+	public boolean acknowledgeTask(ExecutionAttemptID attemptID, StateHandle state) {
+		synchronized (lock) {
+			if (discarded) {
+				return false;
+			}
+			
+			ExecutionVertex vertex = notYetAcknowledgedTasks.remove(attemptID);
+			if (vertex != null) {
+				if (state != null) {
+					collectedStates.add(new StateForTask(state, vertex.getJobvertexId(), vertex.getParallelSubtaskIndex()));
+				}
+				numAcknowledgedTasks++;
+				return true;
+			}
+			else {
+				return false;
+			}
+		}
+	}
+	
+	/**
+	 * Discards the pending checkpoint, releasing all held resources.
+	 */
+	public void discard() {
+		synchronized (lock) {
+			discarded = true;
+			numAcknowledgedTasks = -1;
+			collectedStates.clear();
+			notYetAcknowledgedTasks.clear();
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return "";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
new file mode 100644
index 0000000..83a6dc8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.StateHandle;
+
+/**
+ * Simple bean to describe the state belonging to a parallel operator.
+ * Since we hold the state across execution attempts, we identify a task by its
+ * JobVertexId and subtask index.
+ */
+public class StateForTask {
+
+	/** The state of the parallel operator */
+	private final StateHandle state;
+
+	/** The vertex id of the parallel operator */
+	private final JobVertexID operatorId;
+	
+	/** The index of the parallel subtask */
+	private final int subtask;
+
+	public StateForTask(StateHandle state, JobVertexID operatorId, int subtask) {
+		if (state == null || operatorId == null || subtask < 0) {
+			throw new IllegalArgumentException();
+		}
+		
+		this.state = state;
+		this.operatorId = operatorId;
+		this.subtask = subtask;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	public StateHandle getState() {
+		return state;
+	}
+
+	public JobVertexID getOperatorId() {
+		return operatorId;
+	}
+
+	public int getSubtask() {
+		return subtask;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		else if (o instanceof StateForTask) {
+			StateForTask that = (StateForTask) o;
+			return this.subtask == that.subtask && this.operatorId.equals(that.operatorId)
+					&& this.state.equals(that.state);
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return state.hashCode() + 31 * operatorId.hashCode() + 43 * subtask;
+	}
+
+	@Override
+	public String toString() {
+		return String.format("StateForTask %s-%d : %s", operatorId, subtask, state);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
new file mode 100644
index 0000000..cd7efba
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.List;
+
+/**
+ * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state)
+ * and that is considered completed.
+ */
+public class SuccessfulCheckpoint {
+	
+	private final JobID job;
+	
+	private final long checkpointID;
+	
+	private final long timestamp;
+	
+	private final List<StateForTask> states;
+
+
+	public SuccessfulCheckpoint(JobID job, long checkpointID, long timestamp, List<StateForTask> states) {
+		this.job = job;
+		this.checkpointID = checkpointID;
+		this.timestamp = timestamp;
+		this.states = states;
+	}
+
+	public JobID getJobId() {
+		return job;
+	}
+
+	public long getCheckpointID() {
+		return checkpointID;
+	}
+
+	public long getTimestamp() {
+		return timestamp;
+	}
+
+	public List<StateForTask> getStates() {
+		return states;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	public void dispose() {
+		states.clear();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public String toString() {
+		return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e7e019f..a44fc6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import akka.actor.ActorRef;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
@@ -448,6 +449,30 @@ public class ExecutionVertex implements Serializable {
 		this.currentExecution.fail(t);
 	}
 
+	public void sendMessageToCurrentExecution(Serializable message, ExecutionAttemptID attemptID) {
+		Execution exec = getCurrentExecutionAttempt();
+		
+		// check that this is for the correct execution attempt
+		if (exec != null && exec.getAttemptId().equals(attemptID)) {
+			SimpleSlot slot = exec.getAssignedResource();
+			
+			// send only if we actually have a target
+			if (slot != null) {
+				ActorRef taskManager = slot.getInstance().getTaskManager();
+				if (taskManager != null) {
+					taskManager.tell(message, ActorRef.noSender());
+				}
+			}
+			else {
+				LOG.debug("Skipping message to undeployed task execution {}/{}", getSimpleName(), attemptID);
+			}
+		}
+		else {
+			LOG.debug("Skipping message to {}/{} because it does not match the current execution",
+					getSimpleName(), attemptID);
+		}
+	}
+	
 	/**
 	 * Schedules or updates the consumer tasks of the result partition with the given ID.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbortCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbortCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbortCheckpoint.java
new file mode 100644
index 0000000..0493ba6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbortCheckpoint.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * This message is sent from the {@link org.apache.flink.runtime.jobmanager.JobManager} to the
+ * {@link org.apache.flink.runtime.taskmanager.TaskManager} to tell a task that the checkpoint
+ * has been confirmed and that the task can commit the checkpoint to the outside world.
+ */
+public class AbortCheckpoint extends AbstractCheckpointMessage implements java.io.Serializable {
+
+	private static final long serialVersionUID = 2094094662279578953L;
+
+	public AbortCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
+		super(job, taskExecutionId, checkpointId);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean equals(Object o) {
+		return this == o || ( (o instanceof AbortCheckpoint) && super.equals(o));
+	}
+
+	@Override
+	public String toString() {
+		return String.format("AbortCheckpoint %d for (%s/%s)", 
+				getCheckpointId(), getJob(), getTaskExecutionId());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbstractCheckpointMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbstractCheckpointMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbstractCheckpointMessage.java
new file mode 100644
index 0000000..dc50dc9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbstractCheckpointMessage.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * The base class of all checkpoint messages.
+ */
+public abstract class AbstractCheckpointMessage implements java.io.Serializable {
+
+	private static final long serialVersionUID = 186780414819428178L;
+	
+	/** The job to which this message belongs */
+	private final JobID job;
+	
+	/** The task execution that is source/target of the checkpoint message */  
+	private final ExecutionAttemptID taskExecutionId;
+	
+	/** The ID of the checkpoint that this message coordinates */
+	private final long checkpointId;
+
+	
+	protected AbstractCheckpointMessage(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
+		if (job == null || taskExecutionId == null) {
+			throw new NullPointerException();
+		}
+		
+		this.job = job;
+		this.taskExecutionId = taskExecutionId;
+		this.checkpointId = checkpointId;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	public JobID getJob() {
+		return job;
+	}
+
+	public ExecutionAttemptID getTaskExecutionId() {
+		return taskExecutionId;
+	}
+
+	public long getCheckpointId() {
+		return checkpointId;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return job.hashCode() + taskExecutionId.hashCode() + (int) (checkpointId ^ (checkpointId >>> 32));
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		else if (o instanceof AbstractCheckpointMessage) {
+			AbstractCheckpointMessage that = (AbstractCheckpointMessage) o;
+			return this.job.equals(that.job) && this.taskExecutionId.equals(that.taskExecutionId) &&
+					this.checkpointId == that.checkpointId;
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "(" + checkpointId + ':' + job + '/' + taskExecutionId + ')';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
new file mode 100644
index 0000000..dd94e37
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.state.StateHandle;
+
+/**
+ * This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the
+ * {@link org.apache.flink.runtime.jobmanager.JobManager} to signal that the checkpoint of an
+ * individual task is completed.
+ * 
+ * This message may carry the handle to the task's state.
+ */
+public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements java.io.Serializable {
+
+	private static final long serialVersionUID = -7606214777192401493L;
+	
+	private final StateHandle state;
+
+	public AcknowledgeCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
+		this(job, taskExecutionId, checkpointId, null);
+	}
+
+	public AcknowledgeCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, StateHandle state) {
+		super(job, taskExecutionId, checkpointId);
+		this.state = state;
+	}
+
+	public StateHandle getState() {
+		return state;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true ;
+		}
+		else if (o instanceof AcknowledgeCheckpoint) {
+			AcknowledgeCheckpoint that = (AcknowledgeCheckpoint) o;
+			return super.equals(o) && (this.state == null ? that.state == null :
+					(that.state != null && this.state.equals(that.state)));
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return String.format("Confirm Task Checkpoint %d for (%s/%s) - state=%s",
+				getCheckpointId(), getJob(), getTaskExecutionId(), state);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
new file mode 100644
index 0000000..cdfd202
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * This message is sent from the {@link org.apache.flink.runtime.jobmanager.JobManager} to the
+ * {@link org.apache.flink.runtime.taskmanager.TaskManager} to tell a task that the checkpoint
+ * has been confirmed and that the task can commit the checkpoint to the outside world.
+ */
+public class ConfirmCheckpoint extends AbstractCheckpointMessage implements java.io.Serializable {
+
+	private static final long serialVersionUID = 2094094662279578953L;
+
+	public ConfirmCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
+		super(job, taskExecutionId, checkpointId);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean equals(Object o) {
+		return this == o || ( (o instanceof ConfirmCheckpoint) && super.equals(o));
+	}
+
+	@Override
+	public String toString() {
+		return String.format("ConfirmCheckpoint %d for (%s/%s)", 
+				getCheckpointId(), getJob(), getTaskExecutionId());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
new file mode 100644
index 0000000..0528755
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * This message is sent from the {@link org.apache.flink.runtime.jobmanager.JobManager} to the
+ * {@link org.apache.flink.runtime.taskmanager.TaskManager} to tell a certain task to trigger its
+ * checkpoint.
+ */
+public class TriggerCheckpoint extends AbstractCheckpointMessage implements java.io.Serializable {
+
+	private static final long serialVersionUID = 2094094662279578953L;
+	
+	/** The timestamp associated with the checkpoint */
+	private final long timestamp;
+
+	public TriggerCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp) {
+		super(job, taskExecutionId, checkpointId);
+		this.timestamp = timestamp;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	public long getTimestamp() {
+		return timestamp;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return super.hashCode() + (int) (timestamp ^ (timestamp >>> 32));
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		else if (o instanceof TriggerCheckpoint) {
+			TriggerCheckpoint that = (TriggerCheckpoint) o;
+			return this.timestamp == that.timestamp && super.equals(o);
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return String.format("Trigger Checkpoint %d@%d for (%s/%s)", 
+				getCheckpointId(), getTimestamp(), getJob(), getTaskExecutionId());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java
new file mode 100644
index 0000000..7b96b81
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the messages that are sent between {@link org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link org.apache.flink.runtime.taskmanager.TaskManager} to coordinate the checkpoint snapshots of the
+ * distributed dataflow.
+ */
+package org.apache.flink.runtime.messages.checkpoint;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java
new file mode 100644
index 0000000..e0b8cce
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the messages that are sent between actors, like the
+ * {@link org.apache.flink.runtime.jobmanager.JobManager} and
+ * {@link org.apache.flink.runtime.taskmanager.TaskManager} to coordinate the distributed operations.
+ */
+package org.apache.flink.runtime.messages;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
new file mode 100644
index 0000000..aee0e63
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.ConfirmCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Tests for the checkpoint coordinator.
+ */
+public class CheckpointCoordinatorTest {
+	
+	@Test
+	public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
+		try {
+			final JobID jid = new JobID();
+			final long timestamp = System.currentTimeMillis();
+
+			// create some mock Execution vertices that receive the checkpoint trigger messages
+			ExecutionVertex triggerVertex1 = mock(ExecutionVertex.class);
+			ExecutionVertex triggerVertex2 = mock(ExecutionVertex.class);
+			
+			// create some mock Execution vertices that need to ack the checkpoint
+			final ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
+			final ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
+			ExecutionVertex ackVertex1 = mockExecutionVertex(ackAttemptID1);
+			ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2);
+
+			// set up the coordinator and validate the initial state
+			CheckpointCoordinator coord = new CheckpointCoordinator(
+					jid, 1, 600000,
+					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
+					new ExecutionVertex[] { ackVertex1, ackVertex2 },
+					new ExecutionVertex[] {} );
+
+			// nothing should be happening
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			// trigger the first checkpoint. this should not succeed
+			assertFalse(coord.triggerCheckpoint(timestamp));
+
+			// still, nothing should be happening
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			coord.shutdown();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testCheckpointAbortsIfAckTasksAreNotExecuted() {
+		try {
+			final JobID jid = new JobID();
+			final long timestamp = System.currentTimeMillis();
+
+			// create some mock Execution vertices that need to ack the checkpoint
+			final ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
+			final ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
+			ExecutionVertex triggerVertex1 = mockExecutionVertex(triggerAttemptID1);
+			ExecutionVertex triggerVertex2 = mockExecutionVertex(triggerAttemptID2);
+			
+			// create some mock Execution vertices that receive the checkpoint trigger messages
+			ExecutionVertex ackVertex1 = mock(ExecutionVertex.class);
+			ExecutionVertex ackVertex2 = mock(ExecutionVertex.class);
+
+			// set up the coordinator and validate the initial state
+			CheckpointCoordinator coord = new CheckpointCoordinator(
+					jid, 1, 600000,
+					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
+					new ExecutionVertex[] { ackVertex1, ackVertex2 },
+					new ExecutionVertex[] {} );
+
+			// nothing should be happening
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			// trigger the first checkpoint. this should not succeed
+			assertFalse(coord.triggerCheckpoint(timestamp));
+
+			// still, nothing should be happening
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			coord.shutdown();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testTriggerAndConfirmSimpleCheckpoint() {
+		try {
+			final JobID jid = new JobID();
+			final long timestamp = System.currentTimeMillis();
+			
+			// create some mock Execution vertices that receive the checkpoint trigger messages
+			final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+			final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+			ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+			ExecutionVertex vertex2 = mockExecutionVertex(attemptID2);
+
+			// set up the coordinator and validate the initial state
+			CheckpointCoordinator coord = new CheckpointCoordinator(
+					jid, 1, 600000,
+					new ExecutionVertex[] { vertex1, vertex2 },
+					new ExecutionVertex[] { vertex1, vertex2 },
+					new ExecutionVertex[] { vertex1, vertex2 });
+			
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			
+			// trigger the first checkpoint. this should succeed
+			assertTrue(coord.triggerCheckpoint(timestamp));
+			
+			// validate that we have a pending checkpoint
+			assertEquals(1, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			
+			long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
+			
+			assertNotNull(checkpoint);
+			assertEquals(checkpointId, checkpoint.getCheckpointId());
+			assertEquals(timestamp, checkpoint.getCheckpointTimestamp());
+			assertEquals(jid, checkpoint.getJobId());
+			assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
+			assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
+			assertEquals(0, checkpoint.getCollectedStates().size());
+			assertFalse(checkpoint.isDiscarded());
+			assertFalse(checkpoint.isFullyAcknowledged());
+			
+			// check that the vertices received the trigger checkpoint message
+			{
+				TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpointId, timestamp);
+				TriggerCheckpoint expectedMessage2 = new TriggerCheckpoint(jid, attemptID2, checkpointId, timestamp);
+				verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
+				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
+			}
+			
+			// acknowledge from one of the tasks
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+			assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
+			assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
+			assertFalse(checkpoint.isDiscarded());
+			assertFalse(checkpoint.isFullyAcknowledged());
+
+			// acknowledge the same task again (should not matter)
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+			assertFalse(checkpoint.isDiscarded());
+			assertFalse(checkpoint.isFullyAcknowledged());
+
+			// acknowledge the other task.
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId));
+			
+			// the checkpoint is internally converted to a successful checkpoint and the
+			// pending checkpoint object is disposed
+			assertTrue(checkpoint.isDiscarded());
+			
+			// the now we should have a completed checkpoint
+			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			
+			// validate that the relevant tasks got a confirmation message
+			{
+				ConfirmCheckpoint confirmMessage1 = new ConfirmCheckpoint(jid, attemptID1, checkpointId);
+				ConfirmCheckpoint confirmMessage2 = new ConfirmCheckpoint(jid, attemptID2, checkpointId);
+				verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1));
+				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
+			}
+			
+			SuccessfulCheckpoint success = coord.getSuccessfulCheckpoints().get(0);
+			assertEquals(jid, success.getJobId());
+			assertEquals(timestamp, success.getTimestamp());
+			assertEquals(checkpoint.getCheckpointId(), success.getCheckpointID());
+			assertTrue(success.getStates().isEmpty());
+			
+			// ---------------
+			// trigger another checkpoint and see that this one replaces the other checkpoint
+			// ---------------
+			final long timestampNew = timestamp + 7;
+			coord.triggerCheckpoint(timestampNew);
+
+			long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
+			
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			
+			SuccessfulCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0);
+			assertEquals(jid, successNew.getJobId());
+			assertEquals(timestampNew, successNew.getTimestamp());
+			assertEquals(checkpointIdNew, successNew.getCheckpointID());
+			assertTrue(successNew.getStates().isEmpty());
+
+			// validate that the relevant tasks got a confirmation message
+			{
+				TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpointIdNew, timestampNew);
+				TriggerCheckpoint expectedMessage2 = new TriggerCheckpoint(jid, attemptID2, checkpointIdNew, timestampNew);
+				verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
+				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
+
+				ConfirmCheckpoint confirmMessage1 = new ConfirmCheckpoint(jid, attemptID1, checkpointIdNew);
+				ConfirmCheckpoint confirmMessage2 = new ConfirmCheckpoint(jid, attemptID2, checkpointIdNew);
+				verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1));
+				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
+			}
+
+			coord.shutdown();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	
+	@Test
+	public void testMultipleConcurrentCheckpoints() {
+		try {
+			final JobID jid = new JobID();
+			final long timestamp1 = System.currentTimeMillis();
+			final long timestamp2 = timestamp1 + 8617;
+
+			// create some mock execution vertices
+			
+			final ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
+			final ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
+
+			final ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
+			final ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
+			final ExecutionAttemptID ackAttemptID3 = new ExecutionAttemptID();
+
+			final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
+			
+			ExecutionVertex triggerVertex1 = mockExecutionVertex(triggerAttemptID1);
+			ExecutionVertex triggerVertex2 = mockExecutionVertex(triggerAttemptID2);
+
+			ExecutionVertex ackVertex1 = mockExecutionVertex(ackAttemptID1);
+			ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2);
+			ExecutionVertex ackVertex3 = mockExecutionVertex(ackAttemptID3);
+			
+			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
+			
+			// set up the coordinator and validate the initial state
+			CheckpointCoordinator coord = new CheckpointCoordinator(
+					jid, 2, 600000,
+					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
+					new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
+					new ExecutionVertex[] { commitVertex });
+			
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			// trigger the first checkpoint. this should succeed
+			assertTrue(coord.triggerCheckpoint(timestamp1));
+
+			assertEquals(1, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			
+			PendingCheckpoint pending1 = coord.getPendingCheckpoints().values().iterator().next();
+			long checkpointId1 = pending1.getCheckpointId();
+
+			// trigger messages should have been sent
+			verify(triggerVertex1, times(1)).sendMessageToCurrentExecution(
+					new TriggerCheckpoint(jid, triggerAttemptID1, checkpointId1, timestamp1), triggerAttemptID1);
+			verify(triggerVertex2, times(1)).sendMessageToCurrentExecution(
+					new TriggerCheckpoint(jid, triggerAttemptID2, checkpointId1, timestamp1), triggerAttemptID2);
+			
+			// acknowledge one of the three tasks
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1));
+			
+			// start the second checkpoint
+			// trigger the first checkpoint. this should succeed
+			assertTrue(coord.triggerCheckpoint(timestamp2));
+
+			assertEquals(2, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			PendingCheckpoint pending2;
+			{
+				Iterator<PendingCheckpoint> all = coord.getPendingCheckpoints().values().iterator();
+				PendingCheckpoint cc1 = all.next();
+				PendingCheckpoint cc2 = all.next();
+				pending2 = pending1 == cc1 ? cc2 : cc1;
+			}
+			long checkpointId2 = pending2.getCheckpointId();
+
+			// trigger messages should have been sent
+			verify(triggerVertex1, times(1)).sendMessageToCurrentExecution(
+					new TriggerCheckpoint(jid, triggerAttemptID1, checkpointId2, timestamp2), triggerAttemptID1);
+			verify(triggerVertex2, times(1)).sendMessageToCurrentExecution(
+					new TriggerCheckpoint(jid, triggerAttemptID2, checkpointId2, timestamp2), triggerAttemptID2);
+			
+			// we acknowledge the remaining two tasks from the first
+			// checkpoint and two tasks from the second checkpoint
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2));
+			
+			// now, the first checkpoint should be confirmed
+			assertEquals(1, coord.getNumberOfPendingCheckpoints());
+			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertTrue(pending1.isDiscarded());
+			
+			// the first confirm message should be out
+			verify(commitVertex, times(1)).sendMessageToCurrentExecution(
+					new ConfirmCheckpoint(jid, commitAttemptID, checkpointId1), commitAttemptID);
+			
+			// send the last remaining ack for the second checkpoint
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
+
+			// now, the second checkpoint should be confirmed
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertTrue(pending2.isDiscarded());
+
+			// the second commit message should be out
+			verify(commitVertex, times(1)).sendMessageToCurrentExecution(
+					new ConfirmCheckpoint(jid, commitAttemptID, checkpointId2), commitAttemptID);
+			
+			// validate the committed checkpoints
+			List<SuccessfulCheckpoint> scs = coord.getSuccessfulCheckpoints();
+			
+			SuccessfulCheckpoint sc1 = scs.get(0);
+			assertEquals(checkpointId1, sc1.getCheckpointID());
+			assertEquals(timestamp1, sc1.getTimestamp());
+			assertEquals(jid, sc1.getJobId());
+			assertTrue(sc1.getStates().isEmpty());
+			
+			SuccessfulCheckpoint sc2 = scs.get(1);
+			assertEquals(checkpointId2, sc2.getCheckpointID());
+			assertEquals(timestamp2, sc2.getTimestamp());
+			assertEquals(jid, sc2.getJobId());
+			assertTrue(sc2.getStates().isEmpty());
+
+			coord.shutdown();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSuccessfulCheckpointSubsumesUnsuccessful() {
+		try {
+			final JobID jid = new JobID();
+			final long timestamp1 = System.currentTimeMillis();
+			final long timestamp2 = timestamp1 + 1552;
+
+			// create some mock execution vertices
+			final ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
+			final ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
+
+			final ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
+			final ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
+			final ExecutionAttemptID ackAttemptID3 = new ExecutionAttemptID();
+
+			final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
+
+			ExecutionVertex triggerVertex1 = mockExecutionVertex(triggerAttemptID1);
+			ExecutionVertex triggerVertex2 = mockExecutionVertex(triggerAttemptID2);
+
+			ExecutionVertex ackVertex1 = mockExecutionVertex(ackAttemptID1);
+			ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2);
+			ExecutionVertex ackVertex3 = mockExecutionVertex(ackAttemptID3);
+
+			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
+
+			// set up the coordinator and validate the initial state
+			CheckpointCoordinator coord = new CheckpointCoordinator(
+					jid, 10, 600000,
+					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
+					new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
+					new ExecutionVertex[] { commitVertex });
+
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			// trigger the first checkpoint. this should succeed
+			assertTrue(coord.triggerCheckpoint(timestamp1));
+
+			assertEquals(1, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			PendingCheckpoint pending1 = coord.getPendingCheckpoints().values().iterator().next();
+			long checkpointId1 = pending1.getCheckpointId();
+
+			// trigger messages should have been sent
+			verify(triggerVertex1, times(1)).sendMessageToCurrentExecution(
+					new TriggerCheckpoint(jid, triggerAttemptID1, checkpointId1, timestamp1), triggerAttemptID1);
+			verify(triggerVertex2, times(1)).sendMessageToCurrentExecution(
+					new TriggerCheckpoint(jid, triggerAttemptID2, checkpointId1, timestamp1), triggerAttemptID2);
+
+			// acknowledge one of the three tasks
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1));
+
+			// start the second checkpoint
+			// trigger the first checkpoint. this should succeed
+			assertTrue(coord.triggerCheckpoint(timestamp2));
+
+			assertEquals(2, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			PendingCheckpoint pending2;
+			{
+				Iterator<PendingCheckpoint> all = coord.getPendingCheckpoints().values().iterator();
+				PendingCheckpoint cc1 = all.next();
+				PendingCheckpoint cc2 = all.next();
+				pending2 = pending1 == cc1 ? cc2 : cc1;
+			}
+			long checkpointId2 = pending2.getCheckpointId();
+
+			// trigger messages should have been sent
+			verify(triggerVertex1, times(1)).sendMessageToCurrentExecution(
+					new TriggerCheckpoint(jid, triggerAttemptID1, checkpointId2, timestamp2), triggerAttemptID1);
+			verify(triggerVertex2, times(1)).sendMessageToCurrentExecution(
+					new TriggerCheckpoint(jid, triggerAttemptID2, checkpointId2, timestamp2), triggerAttemptID2);
+
+			// we acknowledge one more task from the first checkpoint and the second
+			// checkpoint completely. The second checkpoint should then subsume the first checkpoint
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2));
+
+			// now, the second checkpoint should be confirmed, and the first discarded
+			// actually both pending checkpoints are discarded, and the second has been transformed
+			// into a successful checkpoint
+			assertTrue(pending1.isDiscarded());
+			assertTrue(pending2.isDiscarded());
+			
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			// validate the committed checkpoints
+			List<SuccessfulCheckpoint> scs = coord.getSuccessfulCheckpoints();
+			SuccessfulCheckpoint success = scs.get(0);
+			assertEquals(checkpointId2, success.getCheckpointID());
+			assertEquals(timestamp2, success.getTimestamp());
+			assertEquals(jid, success.getJobId());
+			assertTrue(success.getStates().isEmpty());
+
+			// the first confirm message should be out
+			verify(commitVertex, times(1)).sendMessageToCurrentExecution(
+					new ConfirmCheckpoint(jid, commitAttemptID, checkpointId2), commitAttemptID);
+
+			// send the last remaining ack for the first checkpoint. This should not do anything
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
+			
+			coord.shutdown();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	
+	@Test
+	public void testCheckpointTimeoutIsolated() {
+		try {
+			final JobID jid = new JobID();
+			final long timestamp = System.currentTimeMillis();
+
+			// create some mock execution vertices
+
+			final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
+
+			final ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
+			final ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
+
+			final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
+
+			ExecutionVertex triggerVertex = mockExecutionVertex(triggerAttemptID);
+
+			ExecutionVertex ackVertex1 = mockExecutionVertex(ackAttemptID1);
+			ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2);
+
+			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
+
+			// set up the coordinator
+			// the timeout for the checkpoint is a 200 milliseconds
+			
+			CheckpointCoordinator coord = new CheckpointCoordinator(
+					jid, 2, 200,
+					new ExecutionVertex[] { triggerVertex },
+					new ExecutionVertex[] { ackVertex1, ackVertex2 },
+					new ExecutionVertex[] { commitVertex });
+			
+			// trigger a checkpoint, partially acknowledged
+			assertTrue(coord.triggerCheckpoint(timestamp));
+			assertEquals(1, coord.getNumberOfPendingCheckpoints());
+			
+			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().values().iterator().next();
+			assertFalse(checkpoint.isDiscarded());
+			
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId()));
+			
+			// wait until the checkpoint must have expired.
+			// we check every 250 msecs conservatively for 5 seconds
+			// to give even slow build servers a very good chance of completing this
+			long deadline = System.currentTimeMillis() + 5000;
+			do {
+				Thread.sleep(250);
+			}
+			while (!checkpoint.isDiscarded() && System.currentTimeMillis() < deadline);
+			
+			assertTrue("Checkpoint was not canceled by the timeout", checkpoint.isDiscarded());
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			// no confirm message must have been sent
+			verify(commitVertex, times(0))
+					.sendMessageToCurrentExecution(any(ConfirmCheckpoint.class), any(ExecutionAttemptID.class));
+			
+			coord.shutdown();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void handleMessagesForNonExistingCheckpoints() {
+		try {
+			final JobID jid = new JobID();
+			final long timestamp = System.currentTimeMillis();
+
+			// create some mock execution vertices and trigger some checkpoint
+
+			final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
+			final ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
+			final ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
+			final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
+
+			ExecutionVertex triggerVertex = mockExecutionVertex(triggerAttemptID);
+			ExecutionVertex ackVertex1 = mockExecutionVertex(ackAttemptID1);
+			ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2);
+			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
+			
+			CheckpointCoordinator coord = new CheckpointCoordinator(
+					jid, 2, 200000,
+					new ExecutionVertex[] { triggerVertex },
+					new ExecutionVertex[] { ackVertex1, ackVertex2 },
+					new ExecutionVertex[] { commitVertex });
+
+			assertTrue(coord.triggerCheckpoint(timestamp));
+			
+			long checkpointId = coord.getPendingCheckpoints().keySet().iterator().next();
+			
+			// send some messages that do not belong to either the job or the any
+			// of the vertices that need to be acknowledged.
+			// non of the messages should throw an exception
+			
+			// wrong job id
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId));
+			
+			// unknown checkpoint
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, 1L));
+			
+			// unknown ack vertex
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointId));
+
+			coord.shutdown();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) {
+		final Execution exec = mock(Execution.class);
+		when(exec.getAttemptId()).thenReturn(attemptID);
+
+		ExecutionVertex vertex = mock(ExecutionVertex.class);
+		when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
+		
+		return vertex;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7f0ce142/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
new file mode 100644
index 0000000..fe1a598
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.checkpoint.AbortCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.ConfirmCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
+import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+public class CheckpointMessagesTest {
+	
+	@Test
+	public void testTriggerAndConfirmCheckpoint() {
+		try {
+			ConfirmCheckpoint cc = new ConfirmCheckpoint(new JobID(), new ExecutionAttemptID(), 45287698767345L);
+			testSerializabilityEqualsHashCode(cc);
+			
+			TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L);
+			testSerializabilityEqualsHashCode(tc);
+
+			AbortCheckpoint ac = new AbortCheckpoint(new JobID(), new ExecutionAttemptID(), 1365762983745L);
+			testSerializabilityEqualsHashCode(ac);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testConfirmTaskCheckpointed() {
+		try {
+			AcknowledgeCheckpoint noState = new AcknowledgeCheckpoint(
+											new JobID(), new ExecutionAttemptID(), 569345L);
+
+			AcknowledgeCheckpoint withState = new AcknowledgeCheckpoint(
+											new JobID(), new ExecutionAttemptID(), 87658976143L, new MyHandle());
+			
+			testSerializabilityEqualsHashCode(noState);
+			testSerializabilityEqualsHashCode(withState);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private static void testSerializabilityEqualsHashCode(Serializable o) throws IOException {
+		Object copy = CommonTestUtils.createCopySerializable(o);
+		assertEquals(o, copy);
+		assertEquals(o.hashCode(), copy.hashCode());
+		assertNotNull(o.toString());
+		assertNotNull(copy.toString());
+	}
+	
+	private static class MyHandle implements StateHandle {
+
+		private static final long serialVersionUID = 8128146204128728332L;
+
+		@Override
+		public Map<String, OperatorState<?>> getState(ClassLoader userClassloader) {
+			return null;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj.getClass() == this.getClass();
+		}
+
+		@Override
+		public int hashCode() {
+			return getClass().hashCode();
+		}
+	};
+}