You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/12 23:03:13 UTC

[02/10] flink git commit: [FLINK-1953] [runtime] Integrate new snapshot checkpoint coordinator with jobgraph and execution graph

[FLINK-1953] [runtime] Integrate new snapshot checkpoint coordinator with jobgraph and execution graph

This closes #651


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9b7f8aa1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9b7f8aa1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9b7f8aa1

Branch: refs/heads/master
Commit: 9b7f8aa121e4a231632296d0809029aca9ebde6a
Parents: ff750e6
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Apr 30 19:59:36 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 12 21:35:57 2015 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 169 ++++++-
 .../CheckpointCoordinatorDeActivator.java       |  56 +++
 .../runtime/checkpoint/PendingCheckpoint.java   |  11 +-
 .../flink/runtime/checkpoint/StateForTask.java  |  12 +-
 .../deployment/TaskDeploymentDescriptor.java    |  32 +-
 .../flink/runtime/execution/Environment.java    |  24 +-
 .../flink/runtime/executiongraph/Execution.java |  20 +-
 .../runtime/executiongraph/ExecutionGraph.java  | 484 +++++++++++--------
 .../executiongraph/ExecutionJobVertex.java      |   3 +-
 .../runtime/executiongraph/ExecutionVertex.java |  25 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |  44 +-
 .../jobgraph/tasks/AbstractInvokable.java       |   6 +-
 .../tasks/CheckpointCommittingOperator.java     |  24 +
 .../jobgraph/tasks/CheckpointedOperator.java    |  24 +
 .../jobgraph/tasks/JobSnapshottingSettings.java |  97 ++++
 .../jobgraph/tasks/OperatorStateCarrier.java    |  16 +-
 .../messages/checkpoint/AbortCheckpoint.java    |  49 --
 .../checkpoint/AcknowledgeCheckpoint.java       |   8 +-
 .../messages/checkpoint/ConfirmCheckpoint.java  |  28 +-
 .../flink/runtime/state/LocalStateHandle.java   |  27 +-
 .../apache/flink/runtime/state/StateHandle.java |  11 +-
 .../apache/flink/runtime/state/StateUtils.java  |  54 +++
 .../runtime/taskmanager/RuntimeEnvironment.java |  25 +-
 .../apache/flink/runtime/taskmanager/Task.java  | 150 +++++-
 .../flink/runtime/util/SerializableObject.java  |  28 ++
 .../flink/runtime/jobmanager/JobManager.scala   | 108 +++--
 .../StreamCheckpointCoordinator.scala           | 151 ------
 .../messages/CheckpointingMessages.scala        |  52 --
 .../flink/runtime/taskmanager/TaskManager.scala |  44 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |  14 +-
 .../checkpoint/CheckpointStateRestoreTest.java  | 235 +++++++++
 .../checkpoint/CoordinatorShutdownTest.java     | 144 ++++++
 .../messages/CheckpointMessagesTest.java        |  17 +-
 .../operators/testutils/MockEnvironment.java    |  11 +-
 .../api/graph/StreamingJobGraphGenerator.java   |  70 ++-
 .../streaming/runtime/tasks/StreamTask.java     |  41 +-
 .../StreamCheckpointingITCase.java              |   4 +-
 .../TaskManagerFailureRecoveryITCase.java       |   2 +-
 38 files changed, 1630 insertions(+), 690 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 9647ca4..b3f6587 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -18,10 +18,17 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.ConfirmCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
@@ -41,7 +48,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- *
+ * The checkpoint coordinator coordinates the distributed snapshots of operators and state.
+ * It triggers the checkpoint by sending the messages to the relevant tasks and collects the
+ * checkpoint acknowledgements. It also collects and maintains the overview of the state handles
+ * reported by the tasks that acknowledge the checkpoint.
  */
 public class CheckpointCoordinator {
 	
@@ -76,13 +86,17 @@ public class CheckpointCoordinator {
 
 	private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger();
 
-	/** The timer that processes the checkpoint timeouts */
-	private final Timer timeoutTimer;
+	/** The timer that handles the checkpoint timeouts and triggers periodic checkpoints */
+	private final Timer timer;
 	
 	private final long checkpointTimeout;
 	
 	private final int numSuccessfulCheckpointsToRetain;
 	
+	private TimerTask periodicScheduler;
+	
+	private ActorRef jobStatusListener;
+	
 	private boolean shutdown;
 	
 	// --------------------------------------------------------------------------------------------
@@ -114,9 +128,13 @@ public class CheckpointCoordinator {
 		this.completedCheckpoints = new ArrayDeque<SuccessfulCheckpoint>(numSuccessfulCheckpointsToRetain + 1);
 		this.recentPendingCheckpoints = new ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS);
 
-		timeoutTimer = new Timer("Checkpoint Timeout Handler", true);
+		timer = new Timer("Checkpoint Timer", true);
 	}
 
+	// --------------------------------------------------------------------------------------------
+	//  Clean shutdown
+	// --------------------------------------------------------------------------------------------
+	
 	/**
 	 * Shuts down the checkpoint coordinator.
 	 * 
@@ -129,9 +147,22 @@ public class CheckpointCoordinator {
 				return;
 			}
 			shutdown = true;
+			LOG.info("Stopping checkpoint coordinator jor job " + job);
 			
 			// shut down the thread that handles the timeouts
-			timeoutTimer.cancel();
+			timer.cancel();
+			
+			// make sure that the actor does not linger
+			if (jobStatusListener != null) {
+				jobStatusListener.tell(PoisonPill.getInstance(), ActorRef.noSender());
+				jobStatusListener = null;
+			}
+			
+			// the scheduling thread needs also to go away
+			if (periodicScheduler != null) {
+				periodicScheduler.cancel();
+				periodicScheduler = null;
+			}
 			
 			// clear and discard all pending checkpoints
 			for (PendingCheckpoint pending : pendingCheckpoints.values()) {
@@ -146,6 +177,10 @@ public class CheckpointCoordinator {
 			completedCheckpoints.clear();
 		}
 	}
+	
+	public boolean isShutdown() {
+		return shutdown;
+	}
 
 	// --------------------------------------------------------------------------------------------
 	//  Handling checkpoints and messages
@@ -235,7 +270,7 @@ public class CheckpointCoordinator {
 					throw new IllegalStateException("Checkpoint coordinator has been shutdown.");
 				}
 				pendingCheckpoints.put(checkpointID, checkpoint);
-				timeoutTimer.schedule(canceller, checkpointTimeout);
+				timer.schedule(canceller, checkpointTimeout);
 			}
 
 			// send the messages to the tasks that trigger their checkpoint
@@ -270,7 +305,8 @@ public class CheckpointCoordinator {
 		}
 		
 		final long checkpointId = message.getCheckpointId();
-		boolean checkpointCompleted = false;
+
+		SuccessfulCheckpoint completed = null;
 		
 		synchronized (lock) {
 			// we need to check inside the lock for being shutdown as well, otherwise we
@@ -286,7 +322,7 @@ public class CheckpointCoordinator {
 					if (checkpoint.isFullyAcknowledged()) {
 						LOG.info("Completed checkpoint " + checkpointId);
 
-						SuccessfulCheckpoint completed = checkpoint.toCompletedCheckpoint();
+						completed = checkpoint.toCompletedCheckpoint();
 						completedCheckpoints.addLast(completed);
 						if (completedCheckpoints.size() > numSuccessfulCheckpointsToRetain) {
 							completedCheckpoints.removeFirst();
@@ -295,8 +331,6 @@ public class CheckpointCoordinator {
 						rememberRecentCheckpointId(checkpointId);
 						
 						dropSubsumedCheckpoints(completed.getTimestamp());
-						
-						checkpointCompleted = true;
 					}
 				}
 				else {
@@ -323,12 +357,13 @@ public class CheckpointCoordinator {
 		
 		// send the confirmation messages to the necessary targets. we do this here
 		// to be outside the lock scope
-		if (checkpointCompleted) {
+		if (completed != null) {
+			final long timestamp = completed.getTimestamp();
 			for (ExecutionVertex ev : tasksToCommitTo) {
 				Execution ee = ev.getCurrentExecutionAttempt();
 				if (ee != null) {
 					ExecutionAttemptID attemptId = ee.getAttemptId();
-					ConfirmCheckpoint confirmMessage = new ConfirmCheckpoint(job, attemptId, checkpointId);
+					ConfirmCheckpoint confirmMessage = new ConfirmCheckpoint(job, attemptId, checkpointId, timestamp);
 					ev.sendMessageToCurrentExecution(confirmMessage, ee.getAttemptId());
 				}
 			}
@@ -355,6 +390,64 @@ public class CheckpointCoordinator {
 	}
 
 	// --------------------------------------------------------------------------------------------
+	//  Checkpoint State Restoring
+	// --------------------------------------------------------------------------------------------
+
+	public void restoreLatestCheckpointedState(Map<JobVertexID, ExecutionJobVertex> tasks,
+												boolean errorIfNoCheckpoint,
+												boolean allOrNothingState) throws Exception {
+		synchronized (lock) {
+			if (shutdown) {
+				throw new IllegalStateException("CheckpointCoordinator is hut down");
+			}
+			
+			if (completedCheckpoints.isEmpty()) {
+				if (errorIfNoCheckpoint) {
+					throw new IllegalStateException("No completed checkpoint available");
+				} else {
+					return;
+				}
+			}
+			
+			// restore from the latest checkpoint
+			SuccessfulCheckpoint latest = completedCheckpoints.getLast();
+						
+			if (allOrNothingState) {
+				Map<ExecutionJobVertex, Integer> stateCounts = new HashMap<ExecutionJobVertex, Integer>();
+
+				for (StateForTask state : latest.getStates()) {
+					ExecutionJobVertex vertex = tasks.get(state.getOperatorId());
+					Execution exec = vertex.getTaskVertices()[state.getSubtask()].getCurrentExecutionAttempt();
+					exec.setInitialState(state.getState());
+
+					Integer count = stateCounts.get(vertex);
+					if (count != null) {
+						stateCounts.put(vertex, count+1);
+					} else {
+						stateCounts.put(vertex, 1);
+					}
+				}
+				
+				// validate that either all task vertices have state, or none
+				for (Map.Entry<ExecutionJobVertex, Integer> entry : stateCounts.entrySet()) {
+					ExecutionJobVertex vertex = entry.getKey();
+					if (entry.getValue() != vertex.getParallelism()) {
+						throw new IllegalStateException(
+								"The checkpoint contained state only for a subset of tasks for vertex " + vertex);
+					}
+				}
+			}
+			else {
+				for (StateForTask state : latest.getStates()) {
+					ExecutionJobVertex vertex = tasks.get(state.getOperatorId());
+					Execution exec = vertex.getTaskVertices()[state.getSubtask()].getCurrentExecutionAttempt();
+					exec.setInitialState(state.getState());
+				}
+			}
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
 	//  Accessors
 	// --------------------------------------------------------------------------------------------
 	
@@ -377,4 +470,56 @@ public class CheckpointCoordinator {
 			return new ArrayList<SuccessfulCheckpoint>(this.completedCheckpoints);
 		}
 	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Periodic scheduling of checkpoints
+	// --------------------------------------------------------------------------------------------
+	
+	public void startPeriodicCheckpointScheduler(long interval) {
+		synchronized (lock) {
+			if (shutdown) {
+				throw new IllegalArgumentException("Checkpoint coordinator is shut down");
+			}
+			
+			// cancel any previous scheduler
+			stopPeriodicCheckpointScheduler();
+			
+			// start a new scheduler
+			periodicScheduler = new TimerTask() {
+				@Override
+				public void run() {
+					try {
+						triggerCheckpoint();
+					}
+					catch (Exception e) {
+						LOG.error("Exception while triggering checkpoint", e);
+					}
+				}
+			};
+			timer.scheduleAtFixedRate(periodicScheduler, interval, interval);
+		}
+	}
+	
+	public void stopPeriodicCheckpointScheduler() {
+		synchronized (lock) {
+			if (periodicScheduler != null) {
+				periodicScheduler.cancel();
+				periodicScheduler = null;
+			}
+		}
+	}
+	
+	public ActorRef createJobStatusListener(ActorSystem actorSystem, long checkpointInterval) {
+		synchronized (lock) {
+			if (shutdown) {
+				throw new IllegalArgumentException("Checkpoint coordinator is shut down");
+			}
+
+			if (jobStatusListener == null) {
+				Props props = Props.create(CheckpointCoordinatorDeActivator.class, this, checkpointInterval);
+				jobStatusListener = actorSystem.actorOf(props);
+			}
+			return jobStatusListener;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
new file mode 100644
index 0000000..a6c4d76
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import akka.actor.UntypedActor;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.ExecutionGraphMessages;
+
+/**
+ * This actor listens to changes in the JobStatus and activates or deactivates the periodic
+ * checkpoint scheduler.
+ */
+public class CheckpointCoordinatorDeActivator extends UntypedActor {
+
+	private final CheckpointCoordinator coordinator;
+	private final long interval;
+	
+	public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator, long interval) {
+		this.coordinator = coordinator;
+		this.interval = interval;
+	}
+
+	@Override
+	public void onReceive(Object message) {
+		if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
+			JobStatus status = ((ExecutionGraphMessages.JobStatusChanged) message).newJobStatus();
+			
+			if (status == JobStatus.RUNNING) {
+				// start the checkpoint scheduler
+				coordinator.startPeriodicCheckpointScheduler(interval);
+			}
+			else {
+				// anything else should stop the trigger for now
+				coordinator.stopPeriodicCheckpointScheduler();
+			}
+		}
+		
+		// we ignore all other messages
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index e221238..f25bff9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.SerializedValue;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -31,6 +32,9 @@ import java.util.Map;
  * A pending checkpoint is a checkpoint that has been started, but has not been
  * acknowledged by all tasks that need to acknowledge it. Once all tasks have
  * acknowledged it, it becomes a {@link SuccessfulCheckpoint}.
+ * 
+ * <p>Note that the pending checkpoint, as well as the successful checkpoint keep the
+ * state handles always as serialized values, never as actual values.</p>
  */
 public class PendingCheckpoint {
 	
@@ -117,12 +121,12 @@ public class PendingCheckpoint {
 				return completed;
 			}
 			else {
-				throw new IllegalStateException("Cannot complete checkpoint while nit all tasks are acknowledged");
+				throw new IllegalStateException("Cannot complete checkpoint while not all tasks are acknowledged");
 			}
 		}
 	}
 	
-	public boolean acknowledgeTask(ExecutionAttemptID attemptID, StateHandle state) {
+	public boolean acknowledgeTask(ExecutionAttemptID attemptID, SerializedValue<StateHandle<?>> state) {
 		synchronized (lock) {
 			if (discarded) {
 				return false;
@@ -158,6 +162,7 @@ public class PendingCheckpoint {
 
 	@Override
 	public String toString() {
-		return "";
+		return String.format("PendingCheckpoint %d @ %d - confirmed=%d, pending=%d",
+				checkpointId, checkpointTimestamp, getNumberOfAcknowledgedTasks(), getNumberOfNonAcknowledgedTasks());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
index 83a6dc8..26b3eb7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
@@ -20,16 +20,22 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.SerializedValue;
 
 /**
  * Simple bean to describe the state belonging to a parallel operator.
  * Since we hold the state across execution attempts, we identify a task by its
  * JobVertexId and subtask index.
+ * 
+ * The state itself is kept in serialized from, since the checkpoint coordinator itself
+ * is never looking at it anyways and only sends it back out in case of a recovery.
+ * Furthermore, the state may involve user-defined classes that are not accessible without
+ * the respective classloader.
  */
 public class StateForTask {
 
 	/** The state of the parallel operator */
-	private final StateHandle state;
+	private final SerializedValue<StateHandle<?>> state;
 
 	/** The vertex id of the parallel operator */
 	private final JobVertexID operatorId;
@@ -37,7 +43,7 @@ public class StateForTask {
 	/** The index of the parallel subtask */
 	private final int subtask;
 
-	public StateForTask(StateHandle state, JobVertexID operatorId, int subtask) {
+	public StateForTask(SerializedValue<StateHandle<?>> state, JobVertexID operatorId, int subtask) {
 		if (state == null || operatorId == null || subtask < 0) {
 			throw new IllegalArgumentException();
 		}
@@ -49,7 +55,7 @@ public class StateForTask {
 
 	// --------------------------------------------------------------------------------------------
 	
-	public StateHandle getState() {
+	public SerializedValue<StateHandle<?>> getState() {
 		return state;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 5d96903..0a1268d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.SerializedValue;
 
 import java.io.Serializable;
 import java.util.Collection;
@@ -77,9 +78,8 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	/** The list of JAR files required to run this task. */
 	private final List<BlobKey> requiredJarFiles;
 
-	private StateHandle operatorStates;
-
-
+	private final SerializedValue<StateHandle<?>> operatorState;
+	
 	/**
 	 * Constructs a task deployment descriptor.
 	 */
@@ -89,15 +89,18 @@ public final class TaskDeploymentDescriptor implements Serializable {
 			Configuration taskConfiguration, String invokableClassName,
 			List<ResultPartitionDeploymentDescriptor> producedPartitions,
 			List<InputGateDeploymentDescriptor> inputGates,
-			List<BlobKey> requiredJarFiles, int targetSlotNumber) {
+			List<BlobKey> requiredJarFiles, int targetSlotNumber,
+			SerializedValue<StateHandle<?>> operatorState) {
 
+		checkArgument(indexInSubtaskGroup >= 0);
+		checkArgument(numberOfSubtasks > indexInSubtaskGroup);
+		checkArgument(targetSlotNumber >= 0);
+		
 		this.jobID = checkNotNull(jobID);
 		this.vertexID = checkNotNull(vertexID);
 		this.executionId = checkNotNull(executionId);
 		this.taskName = checkNotNull(taskName);
-		checkArgument(indexInSubtaskGroup >= 0);
 		this.indexInSubtaskGroup = indexInSubtaskGroup;
-		checkArgument(numberOfSubtasks > indexInSubtaskGroup);
 		this.numberOfSubtasks = numberOfSubtasks;
 		this.jobConfiguration = checkNotNull(jobConfiguration);
 		this.taskConfiguration = checkNotNull(taskConfiguration);
@@ -105,8 +108,8 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		this.producedPartitions = checkNotNull(producedPartitions);
 		this.inputGates = checkNotNull(inputGates);
 		this.requiredJarFiles = checkNotNull(requiredJarFiles);
-		checkArgument(targetSlotNumber >= 0);
 		this.targetSlotNumber = targetSlotNumber;
+		this.operatorState = operatorState;
 	}
 
 	public TaskDeploymentDescriptor(
@@ -115,14 +118,11 @@ public final class TaskDeploymentDescriptor implements Serializable {
 			Configuration taskConfiguration, String invokableClassName,
 			List<ResultPartitionDeploymentDescriptor> producedPartitions,
 			List<InputGateDeploymentDescriptor> inputGates,
-			List<BlobKey> requiredJarFiles, int targetSlotNumber,
-			StateHandle operatorStates) {
+			List<BlobKey> requiredJarFiles, int targetSlotNumber) {
 
 		this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks,
 				jobConfiguration, taskConfiguration, invokableClassName, producedPartitions,
-				inputGates, requiredJarFiles, targetSlotNumber);
-
-		setOperatorState(operatorStates);
+				inputGates, requiredJarFiles, targetSlotNumber, null);
 	}
 
 	/**
@@ -232,11 +232,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		return strBuilder.toString();
 	}
 
-	public void setOperatorState(StateHandle operatorStates) {
-		this.operatorStates = operatorStates;
-	}
-
-	public StateHandle getOperatorStates() {
-		return operatorStates;
+	public SerializedValue<StateHandle<?>> getOperatorState() {
+		return operatorState;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 081e3ca..755f1ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.execution;
 
-import akka.actor.ActorRef;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
@@ -31,6 +30,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.state.StateHandle;
 
 import java.util.Map;
 import java.util.concurrent.Future;
@@ -148,6 +148,25 @@ public interface Environment {
 	 */
 	void reportAccumulators(Map<String, Accumulator<?, ?>> accumulators);
 
+	/**
+	 * Confirms that the invokable has successfully completed all steps it needed to
+	 * to for the checkpoint with the give checkpoint-ID. This method does not include
+	 * any state in the checkpoint.
+	 * 
+	 * @param checkpointId The ID of the checkpoint.
+	 */
+	void acknowledgeCheckpoint(long checkpointId);
+
+	/**
+	 * Confirms that the invokable has successfully completed all steps it needed to
+	 * to for the checkpoint with the give checkpoint-ID. This method does include
+	 * the given state in the checkpoint.
+	 *
+	 * @param checkpointId The ID of the checkpoint.
+	 * @param state A handle to the state to be included in the checkpoint.   
+	 */
+	void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state);
+
 	// --------------------------------------------------------------------------------------------
 	//  Fields relevant to the I/O system. Should go into Task
 	// --------------------------------------------------------------------------------------------
@@ -159,7 +178,4 @@ public interface Environment {
 	InputGate getInputGate(int index);
 
 	InputGate[] getAllInputGates();
-
-	// this should go away
-	ActorRef getJobManager();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 4e046dd..731d70f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.SerializedValue;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
@@ -129,7 +130,7 @@ public class Execution implements Serializable {
 	
 	private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution
 	
-	private StateHandle operatorState;
+	private SerializedValue<StateHandle<?>> operatorState;
 
 	// --------------------------------------------------------------------------------------------
 	
@@ -204,6 +205,13 @@ public class Execution implements Serializable {
 		partialInputChannelDeploymentDescriptors = null;
 	}
 	
+	public void setInitialState(SerializedValue<StateHandle<?>> initialState) {
+		if (state != ExecutionState.CREATED) {
+			throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED");
+		}
+		this.operatorState = initialState;
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	//  Actions
 	// --------------------------------------------------------------------------------------------
@@ -325,7 +333,7 @@ public class Execution implements Serializable {
 						attemptNumber, slot.getInstance().getInstanceConnectionInfo().getHostname()));
 			}
 			
-			final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId, slot);
+			final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId, slot, operatorState);
 			
 			// register this execution at the execution graph, to receive call backs
 			vertex.getExecutionGraph().registerExecution(this);
@@ -903,12 +911,4 @@ public class Execution implements Serializable {
 		return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getSimpleName(),
 				(assignedResource == null ? "(unassigned)" : assignedResource.toString()), state);
 	}
-
-	public void setOperatorState(StateHandle operatorStates) {
-		this.operatorState = operatorStates;
-	}
-
-	public StateHandle getOperatorState() {
-		return operatorState;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index d38913e..90cf42e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import akka.actor.ActorContext;
 import akka.actor.ActorRef;
+
+import akka.actor.ActorSystem;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
@@ -32,20 +34,20 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.runtime.jobmanager.StreamCheckpointCoordinator;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.ExceptionUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Tuple3;
-import scala.concurrent.duration.Duration;
+
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -54,7 +56,6 @@ import java.util.NoSuchElementException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static akka.dispatch.Futures.future;
@@ -80,8 +81,11 @@ import static akka.dispatch.Futures.future;
  *         about deployment of tasks and updates in the task status always use the ExecutionAttemptID to
  *         address the message receiver.</li>
  * </ul>
- *
- *
+ * 
+ * <p>The ExecutionGraph implements {@link java.io.Serializable}, because it can be archived by
+ * sending it to an archive actor via an actor message. The execution graph does contain some
+ * non-serializable fields. These fields are not required in the archived form and are cleared
+ * in the {@link #prepareForArchiving()} method.</p>
  */
 public class ExecutionGraph implements Serializable {
 
@@ -92,9 +96,15 @@ public class ExecutionGraph implements Serializable {
 
 	/** The log object used for debugging. */
 	static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
+	
+	private static final int NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN = 1;
 
 	// --------------------------------------------------------------------------------------------
 
+	/** The lock used to secure all access to mutable fields, especially the tracking of progress
+	 * within the job. */
+	private final SerializableObject progressLock = new SerializableObject();
+	
 	/** The ID of the job this graph has been built for. */
 	private final JobID jobID;
 
@@ -104,9 +114,6 @@ public class ExecutionGraph implements Serializable {
 	/** The job configuration that was originally attached to the JobGraph. */
 	private final Configuration jobConfiguration;
 
-	/** The classloader for the user code. Needed for calls into user code classes */
-	private ClassLoader userClassLoader;
-
 	/** All job vertices that are part of this graph */
 	private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks;
 
@@ -123,8 +130,11 @@ public class ExecutionGraph implements Serializable {
 	 * inside the BlobService and are referenced via the BLOB keys. */
 	private final List<BlobKey> requiredJarFiles;
 
+	/** Listeners that receive messages when the entire job switches it status (such as from
+	 * RUNNING to FINISHED) */
 	private final List<ActorRef> jobStatusListenerActors;
 
+	/** Listeners that receive messages whenever a single task execution changes its status */
 	private final List<ActorRef> executionListenerActors;
 
 	/** Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when
@@ -133,10 +143,6 @@ public class ExecutionGraph implements Serializable {
 	 * at {@code stateTimestamps[RUNNING.ordinal()]}. */
 	private final long[] stateTimestamps;
 
-	/** The lock used to secure all access to mutable fields, especially the tracking of progress
-	 * within the job. */
-	private final Object progressLock = new Object();
-
 	/** The timeout for all messages that require a response/acknowledgement */
 	private final FiniteDuration timeout;
 
@@ -158,8 +164,11 @@ public class ExecutionGraph implements Serializable {
 	 * from results than need to be materialized. */
 	private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
 
+	/** Flag that indicate whether the executed dataflow should be periodically snapshotted */
+	private boolean snapshotCheckpointsEnabled;
+		
 
-	// ------ Execution status and progress -------
+	// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
 
 	/** Current status of the job execution */
 	private volatile JobStatus state = JobStatus.CREATED;
@@ -168,31 +177,36 @@ public class ExecutionGraph implements Serializable {
 	 * that was not recoverable and triggered job failure */
 	private volatile Throwable failureCause;
 
-	/** The scheduler to use for scheduling new tasks as they are needed */
-	private Scheduler scheduler;
-
 	/** The position of the vertex that is next expected to finish.
 	 * This is an index into the "verticesInCreationOrder" collection.
 	 * Once this value has reached the number of vertices, the job is done. */
-	private int nextVertexToFinish;
-
-
-
-	private ActorContext parentContext;
-
-	private  ActorRef stateCheckpointerActor;
-
-	private boolean checkpointingEnabled;
+	private volatile int nextVertexToFinish;
+	
+	
+	// ------ Fields that are relevant to the execution and need to be cleared before archiving  -------
 
-	private long checkpointingInterval = 5000;
+	/** The scheduler to use for scheduling new tasks as they are needed */
+	@SuppressWarnings("NonSerializableFieldInSerializableClass")
+	private Scheduler scheduler;
 
-	public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout) {
-		this(jobId, jobName, jobConfig, timeout, new ArrayList<BlobKey>());
-	}
+	/** The classloader for the user code. Needed for calls into user code classes */
+	@SuppressWarnings("NonSerializableFieldInSerializableClass")
+	private ClassLoader userClassLoader;
+	
+	/** The coordinator for checkpoints, if snapshot checkpoints are enabled */
+	@SuppressWarnings("NonSerializableFieldInSerializableClass")
+	private CheckpointCoordinator checkpointCoordinator;
+	
+	
+	// --------------------------------------------------------------------------------------------
+	//   Constructors
+	// --------------------------------------------------------------------------------------------
 
-	public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig,
-						FiniteDuration timeout, List<BlobKey> requiredJarFiles) {
-		this(jobId, jobName, jobConfig, timeout, requiredJarFiles, Thread.currentThread().getContextClassLoader());
+	/**
+	 * This constructor is for tests only, because it does not include class loading information.
+	 */
+	ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout) {
+		this(jobId, jobName, jobConfig, timeout, new ArrayList<BlobKey>(), ExecutionGraph.class.getClassLoader());
 	}
 
 	public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout,
@@ -224,18 +238,8 @@ public class ExecutionGraph implements Serializable {
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
-	public void setStateCheckpointerActor(ActorRef stateCheckpointerActor) {
-		this.stateCheckpointerActor = stateCheckpointerActor;
-	}
-
-	public ActorRef getStateCheckpointerActor() {
-		return stateCheckpointerActor;
-	}
-
-	public void setParentContext(ActorContext parentContext) {
-		this.parentContext = parentContext;
-	}
+	//  Configuration of Data-flow wide execution settings  
+	// --------------------------------------------------------------------------------------------
 
 	public void setNumberOfRetriesLeft(int numberOfRetriesLeft) {
 		if (numberOfRetriesLeft < -1) {
@@ -259,46 +263,97 @@ public class ExecutionGraph implements Serializable {
 		return delayBeforeRetrying;
 	}
 
-	public void attachJobGraph(List<AbstractJobVertex> topologiallySorted) throws JobException {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d "
-					+ "vertices and %d intermediate results.", topologiallySorted.size(), tasks.size(), intermediateResults.size()));
+	public boolean isQueuedSchedulingAllowed() {
+		return this.allowQueuedScheduling;
+	}
+
+	public void setQueuedSchedulingAllowed(boolean allowed) {
+		this.allowQueuedScheduling = allowed;
+	}
+
+	public void setScheduleMode(ScheduleMode scheduleMode) {
+		this.scheduleMode = scheduleMode;
+	}
+
+	public ScheduleMode getScheduleMode() {
+		return scheduleMode;
+	}
+
+	public void enableSnaphotCheckpointing(long interval, long checkpointTimeout,
+											List<ExecutionJobVertex> verticesToTrigger,
+											List<ExecutionJobVertex> verticesToWaitFor,
+											List<ExecutionJobVertex> verticesToCommitTo,
+											ActorSystem actorSystem)
+	{
+		// simple sanity checks
+		if (interval < 10 || checkpointTimeout < 10) {
+			throw new IllegalArgumentException();
+		}
+		if (state != JobStatus.CREATED) {
+			throw new IllegalStateException("Job must be in CREATED state");
 		}
+
+		ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);
+		ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);
+		ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);
 		
-		final long createTimestamp = System.currentTimeMillis();
+		// disable to make sure existing checkpoint coordinators are cleared
+		disableSnaphotCheckpointing();
 		
-		for (AbstractJobVertex jobVertex : topologiallySorted) {
-			
-			// create the execution job vertex and attach it to the graph
-			ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp);
-			ejv.connectToPredecessors(this.intermediateResults);
-			
-			ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
-			if (previousTask != null) {
-				throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
-						jobVertex.getID(), ejv, previousTask));
-			}
-			
-			for (IntermediateResult res : ejv.getProducedDataSets()) {
-				IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
-				if (previousDataSet != null) {
-					throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
-							res.getId(), res, previousDataSet));
-				}
-			}
-			
-			this.verticesInCreationOrder.add(ejv);
+		// create the coordinator that triggers and commits checkpoints and holds the state 
+		snapshotCheckpointsEnabled = true;
+		checkpointCoordinator = new CheckpointCoordinator(jobID, NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN,
+				checkpointTimeout, tasksToTrigger, tasksToWaitFor, tasksToCommitTo);
+		
+		// the periodic checkpoint scheduler is activated and deactivated as a result of
+		// job status changes (running -> on, all other states -> off)
+		registerJobStatusListener(checkpointCoordinator.createJobStatusListener(actorSystem, interval));
+	}
+	
+	public void disableSnaphotCheckpointing() {
+		if (state != JobStatus.CREATED) {
+			throw new IllegalStateException("Job must be in CREATED state");
+		}
+		
+		snapshotCheckpointsEnabled = false;
+		if (checkpointCoordinator != null) {
+			checkpointCoordinator.shutdown();
+			checkpointCoordinator = null;
 		}
 	}
+	
+	public boolean isSnapshotCheckpointsEnabled() {
+		return snapshotCheckpointsEnabled;
+	}
 
-	public void setCheckpointingEnabled(boolean checkpointingEnabled) {
-		this.checkpointingEnabled = checkpointingEnabled;
+	public CheckpointCoordinator getCheckpointCoordinator() {
+		return checkpointCoordinator;
 	}
 
-	public void setCheckpointingInterval(long checkpointingInterval) {
-		this.checkpointingInterval = checkpointingInterval;
+	private ExecutionVertex[] collectExecutionVertices(List<ExecutionJobVertex> jobVertices) {
+		if (jobVertices.size() == 1) {
+			ExecutionJobVertex jv = jobVertices.get(0);
+			if (jv.getGraph() != this) {
+				throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph");
+			}
+			return jv.getTaskVertices();
+		}
+		else {
+			ArrayList<ExecutionVertex> all = new ArrayList<ExecutionVertex>();
+			for (ExecutionJobVertex jv : jobVertices) {
+				if (jv.getGraph() != this) {
+					throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph");
+				}
+				all.addAll(Arrays.asList(jv.getTaskVertices()));
+			}
+			return all.toArray(new ExecutionVertex[all.size()]);
+		}
 	}
 
+	// --------------------------------------------------------------------------------------------
+	//  Properties and Status of the Execution Graph  
+	// --------------------------------------------------------------------------------------------
+	
 	/**
 	 * Returns a list of BLOB keys referring to the JAR files required to run this job
 	 * @return list of BLOB keys referring to the JAR files required to run this job
@@ -307,8 +362,6 @@ public class ExecutionGraph implements Serializable {
 		return this.requiredJarFiles;
 	}
 
-	// --------------------------------------------------------------------------------------------
-
 	public Scheduler getScheduler() {
 		return scheduler;
 	}
@@ -396,26 +449,42 @@ public class ExecutionGraph implements Serializable {
 		return this.stateTimestamps[status.ordinal()];
 	}
 
-	public boolean isQueuedSchedulingAllowed() {
-		return this.allowQueuedScheduling;
-	}
+	// --------------------------------------------------------------------------------------------
+	//  Actions
+	// --------------------------------------------------------------------------------------------
 
-	public void setQueuedSchedulingAllowed(boolean allowed) {
-		this.allowQueuedScheduling = allowed;
-	}
+	public void attachJobGraph(List<AbstractJobVertex> topologiallySorted) throws JobException {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d "
+					+ "vertices and %d intermediate results.", topologiallySorted.size(), tasks.size(), intermediateResults.size()));
+		}
 
-	public void setScheduleMode(ScheduleMode scheduleMode) {
-		this.scheduleMode = scheduleMode;
-	}
+		final long createTimestamp = System.currentTimeMillis();
 
-	public ScheduleMode getScheduleMode() {
-		return scheduleMode;
-	}
+		for (AbstractJobVertex jobVertex : topologiallySorted) {
 
-	// --------------------------------------------------------------------------------------------
-	//  Actions
-	// --------------------------------------------------------------------------------------------
+			// create the execution job vertex and attach it to the graph
+			ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp);
+			ejv.connectToPredecessors(this.intermediateResults);
 
+			ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
+			if (previousTask != null) {
+				throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
+						jobVertex.getID(), ejv, previousTask));
+			}
+
+			for (IntermediateResult res : ejv.getProducedDataSets()) {
+				IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
+				if (previousDataSet != null) {
+					throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
+							res.getId(), res, previousDataSet));
+				}
+			}
+
+			this.verticesInCreationOrder.add(ejv);
+		}
+	}
+	
 	public void scheduleForExecution(Scheduler scheduler) throws JobException {
 		if (scheduler == null) {
 			throw new IllegalArgumentException("Scheduler must not be null.");
@@ -431,32 +500,24 @@ public class ExecutionGraph implements Serializable {
 			switch (scheduleMode) {
 
 				case FROM_SOURCES:
-					// initially, we simply take the ones without inputs.
-					// next, we implement the logic to go back from vertices that need computation
-					// to the ones we need to start running
+					// simply take the vertices without inputs.
 					for (ExecutionJobVertex ejv : this.tasks.values()) {
 						if (ejv.getJobVertex().isInputVertex()) {
 							ejv.scheduleAll(scheduler, allowQueuedScheduling);
 						}
 					}
-
 					break;
 
 				case ALL:
 					for (ExecutionJobVertex ejv : getVerticesTopologically()) {
 						ejv.scheduleAll(scheduler, allowQueuedScheduling);
 					}
-
 					break;
 
 				case BACKTRACKING:
+					// go back from vertices that need computation to the ones we need to run
 					throw new JobException("BACKTRACKING is currently not supported as schedule mode.");
 			}
-
-			if (checkpointingEnabled) {
-				stateCheckpointerActor = StreamCheckpointCoordinator.spawn(parentContext, this,
-						Duration.create(checkpointingInterval, TimeUnit.MILLISECONDS));
-			}
 		}
 		else {
 			throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
@@ -508,6 +569,83 @@ public class ExecutionGraph implements Serializable {
 		}
 	}
 
+	public void restart() {
+		try {
+			if (state == JobStatus.FAILED) {
+				if (!transitionState(JobStatus.FAILED, JobStatus.RESTARTING)) {
+					throw new IllegalStateException("Execution Graph left the state FAILED while trying to restart.");
+				}
+			}
+
+			synchronized (progressLock) {
+				if (state != JobStatus.RESTARTING) {
+					throw new IllegalStateException("Can only restart job from state restarting.");
+				}
+				if (scheduler == null) {
+					throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null.");
+				}
+
+				this.currentExecutions.clear();
+
+				for (ExecutionJobVertex jv : this.verticesInCreationOrder) {
+					jv.resetForNewExecution();
+				}
+
+				for (int i = 0; i < stateTimestamps.length; i++) {
+					stateTimestamps[i] = 0;
+				}
+				nextVertexToFinish = 0;
+				transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
+				
+				// if we have checkpointed state, reload it into the executions
+				if (checkpointCoordinator != null) {
+					checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false);
+				}
+			}
+
+			scheduleForExecution(scheduler);
+		}
+		catch (Throwable t) {
+			fail(t);
+		}
+	}
+
+	/**
+	 * This method cleans fields that are irrelevant for the archived execution attempt.
+	 */
+	public void prepareForArchiving() {
+		if (!state.isTerminalState()) {
+			throw new IllegalStateException("Can only archive the job from a terminal state");
+		}
+
+		// clear the non-serializable fields
+		userClassLoader = null;
+		scheduler = null;
+		checkpointCoordinator = null;
+
+		for (ExecutionJobVertex vertex : verticesInCreationOrder) {
+			vertex.prepareForArchiving();
+		}
+
+		intermediateResults.clear();
+		currentExecutions.clear();
+		requiredJarFiles.clear();
+		jobStatusListenerActors.clear();
+		executionListenerActors.clear();
+	}
+
+	/**
+	 * For testing: This waits until the job execution has finished.
+	 * @throws InterruptedException
+	 */
+	public void waitUntilFinished() throws InterruptedException {
+		synchronized (progressLock) {
+			while (nextVertexToFinish < verticesInCreationOrder.size()) {
+				progressLock.wait();
+			}
+		}
+	}
+	
 	private boolean transitionState(JobStatus current, JobStatus newState) {
 		return transitionState(current, newState, null);
 	}
@@ -551,24 +689,32 @@ public class ExecutionGraph implements Serializable {
 				if (nextPos == verticesInCreationOrder.size()) {
 					
 					// we are done, transition to the final state
-					
+					JobStatus current;
 					while (true) {
-						JobStatus current = this.state;
-						if (current == JobStatus.RUNNING && transitionState(current, JobStatus.FINISHED)) {
-							break;
+						current = this.state;
+						
+						if (current == JobStatus.RUNNING) {
+							if (transitionState(current, JobStatus.FINISHED)) {
+								postRunCleanup();
+								break;
+							}
 						}
-						if (current == JobStatus.CANCELLING && transitionState(current, JobStatus.CANCELED)) {
-							break;
+						else if (current == JobStatus.CANCELLING) {
+							if (transitionState(current, JobStatus.CANCELED)) {
+								postRunCleanup();
+								break;
+							}
 						}
-						if (current == JobStatus.FAILING) {
+						else if (current == JobStatus.FAILING) {
 							if (numberOfRetriesLeft > 0 && transitionState(current, JobStatus.RESTARTING)) {
 								numberOfRetriesLeft--;
 								future(new Callable<Object>() {
 									@Override
 									public Object call() throws Exception {
-										try{
+										try {
 											Thread.sleep(delayBeforeRetrying);
-										}catch(InterruptedException e){
+										}
+										catch(InterruptedException e){
 											// should only happen on shutdown
 										}
 										restart();
@@ -578,13 +724,15 @@ public class ExecutionGraph implements Serializable {
 								break;
 							}
 							else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) {
+								postRunCleanup();
 								break;
 							}
 						}
-						if (current == JobStatus.CANCELED || current == JobStatus.CREATED || current == JobStatus.FINISHED) {
+						else {
 							fail(new Exception("ExecutionGraph went into final state from state " + current));
 						}
 					}
+					// done transitioning the state
 
 					// also, notify waiters
 					progressLock.notifyAll();
@@ -592,6 +740,19 @@ public class ExecutionGraph implements Serializable {
 			}
 		}
 	}
+	
+	private void postRunCleanup() {
+		try {
+			CheckpointCoordinator coord = this.checkpointCoordinator;
+			this.checkpointCoordinator = null;
+			if (coord != null) {
+				coord.shutdown();
+			}
+		}
+		catch (Exception e) {
+			LOG.error("Error while cleaning up after execution", e);
+		}
+	}
 
 	// --------------------------------------------------------------------------------------------
 	//  Callbacks and Callback Utilities
@@ -623,13 +784,6 @@ public class ExecutionGraph implements Serializable {
 			return false;
 		}
 	}
-	
-	public void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> , StateHandle> states) {
-		synchronized (this.progressLock) {
-			for (Map.Entry<Tuple3<JobVertexID, Integer, Long>, StateHandle> state : states.entrySet())
-				tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue());
-		}
-	}
 
 	public void scheduleOrUpdateConsumers(ResultPartitionID partitionId) {
 
@@ -670,21 +824,19 @@ public class ExecutionGraph implements Serializable {
 	//  Listeners & Observers
 	// --------------------------------------------------------------------------------------------
 
-	public void registerJobStatusListener(ActorRef listener){
-		this.jobStatusListenerActors.add(listener);
-	}
-
-	public void registerExecutionListener(ActorRef listener){
-		this.executionListenerActors.add(listener);
+	public void registerJobStatusListener(ActorRef listener) {
+		if (listener != null) {
+			this.jobStatusListenerActors.add(listener);
+		}
 	}
 
-	public boolean containsJobStatusListener(ActorRef listener) {
-		return this.jobStatusListenerActors.contains(listener);
+	public void registerExecutionListener(ActorRef listener) {
+		if (listener != null) {
+			this.executionListenerActors.add(listener);
+		}
 	}
-
-	/**
-	 * NOTE: This method never throws an error, only logs errors caused by the notified listeners.
-	 */
+	
+	
 	private void notifyJobStatusChange(JobStatus newState, Throwable error) {
 		if (jobStatusListenerActors.size() > 0) {
 			ExecutionGraphMessages.JobStatusChanged message =
@@ -695,10 +847,7 @@ public class ExecutionGraph implements Serializable {
 			}
 		}
 	}
-
-	/**
-	 * NOTE: This method never throws an error, only logs errors caused by the notified listeners.
-	 */
+	
 	void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState
 							newExecutionState, Throwable error)
 	{
@@ -722,65 +871,4 @@ public class ExecutionGraph implements Serializable {
 			fail(error);
 		}
 	}
-
-	public void restart() {
-		try {
-			if (state == JobStatus.FAILED) {
-				if (!transitionState(JobStatus.FAILED, JobStatus.RESTARTING)) {
-					throw new IllegalStateException("Execution Graph left the state FAILED while trying to restart.");
-				}
-			}
-			
-			synchronized (progressLock) {
-				if (state != JobStatus.RESTARTING) {
-					throw new IllegalStateException("Can only restart job from state restarting.");
-				}
-				if (scheduler == null) {
-					throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null.");
-				}
-
-				this.currentExecutions.clear();
-
-				for (ExecutionJobVertex jv : this.verticesInCreationOrder) {
-					jv.resetForNewExecution();
-				}
-
-				for (int i = 0; i < stateTimestamps.length; i++) {
-					stateTimestamps[i] = 0;
-				}
-				nextVertexToFinish = 0;
-				transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
-			}
-
-			scheduleForExecution(scheduler);
-		}
-		catch (Throwable t) {
-			fail(t);
-		}
-	}
-	
-	/**
-	 * This method cleans fields that are irrelevant for the archived execution attempt.
-	 */
-	public void prepareForArchiving() {
-		if (!state.isTerminalState()) {
-			throw new IllegalStateException("Can only archive the job from a terminal state");
-		}
-		
-		userClassLoader = null;
-		
-		for (ExecutionJobVertex vertex : verticesInCreationOrder) {
-			vertex.prepareForArchiving();
-		}
-		
-		intermediateResults.clear();
-		currentExecutions.clear();
-		requiredJarFiles.clear();
-		jobStatusListenerActors.clear();
-		executionListenerActors.clear();
-		
-		scheduler = null;
-		parentContext = null;
-		stateCheckpointerActor = null;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index acbc17a..59b3bb6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.util.SerializableObject;
 import org.slf4j.Logger;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -52,7 +53,7 @@ public class ExecutionJobVertex implements Serializable {
 	/** Use the same log for all ExecutionGraph classes */
 	private static final Logger LOG = ExecutionGraph.LOG;
 	
-	private final Object stateMonitor = new Object();
+	private final SerializableObject stateMonitor = new SerializableObject();
 	
 	private final ExecutionGraph graph;
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index a44fc6f..2ad3a55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import akka.actor.ActorRef;
+
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
@@ -41,8 +42,11 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.SerializedValue;
 import org.slf4j.Logger;
+
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
@@ -95,8 +99,6 @@ public class ExecutionVertex implements Serializable {
 
 	private volatile boolean scheduleLocalOnly;
 
-	private StateHandle operatorState;
-
 	// --------------------------------------------------------------------------------------------
 
 	public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex,
@@ -212,14 +214,6 @@ public class ExecutionVertex implements Serializable {
 	public InstanceConnectionInfo getCurrentAssignedResourceLocation() {
 		return currentExecution.getAssignedResourceLocation();
 	}
-
-	public void setOperatorState(StateHandle operatorState) {
-		this.operatorState = operatorState;
-	}
-
-	public StateHandle getOperatorState() {
-		return operatorState;
-	}
 	
 	public ExecutionGraph getExecutionGraph() {
 		return this.jobVertex.getGraph();
@@ -421,11 +415,6 @@ public class ExecutionVertex implements Serializable {
 				if (grp != null) {
 					this.locationConstraint = grp.getLocationConstraint(subTaskIndex);
 				}
-				
-				if (operatorState != null) {
-					execution.setOperatorState(operatorState);
-				}
-				
 			}
 			else {
 				throw new IllegalStateException("Cannot reset a vertex that is in state " + state);
@@ -524,6 +513,7 @@ public class ExecutionVertex implements Serializable {
 		// clear the unnecessary fields in this class
 		this.resultPartitions = null;
 		this.inputEdges = null;
+		this.locationConstraint = null;
 		this.locationConstraintInstances = null;
 	}
 
@@ -588,10 +578,13 @@ public class ExecutionVertex implements Serializable {
 
 	/**
 	 * Creates a task deployment descriptor to deploy a subtask to the given target slot.
+	 * 
+	 * TODO: This should actually be in the EXECUTION
 	 */
 	TaskDeploymentDescriptor createDeploymentDescriptor(
 			ExecutionAttemptID executionId,
-			SimpleSlot targetSlot) {
+			SimpleSlot targetSlot,
+			SerializedValue<StateHandle<?>> operatorState) {
 
 		// Produced intermediate results
 		List<ResultPartitionDeploymentDescriptor> producedPartitions = new ArrayList<ResultPartitionDeploymentDescriptor>(resultPartitions.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 6d895f9..28fa78e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -38,9 +38,18 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 
 /**
- * A job graph represents an entire Flink runtime job.
+ * The JobGraph represents a Flink dataflow program, at the low level that the JobManager accepts.
+ * All programs from higher level APIs are transformed into JobGraphs.
+ * 
+ * <p>The JobGraph is a graph of vertices and intermediate results that are connected together to
+ * form a DAG. Note that iterations (feedback edges) are currently not encoded inside the JobGraph
+ * but inside certain special vertices that establish the feedback channel amongst themselves.</p>
+ * 
+ * <p>The JobGraph defines the job-wide configuration settings, while each vertex and intermediate result
+ * define the characteristics of the concrete operation and intermediate data.</p>
  */
 public class JobGraph implements Serializable {
 
@@ -74,11 +83,12 @@ public class JobGraph implements Serializable {
 	/** flag to enable queued scheduling */
 	private boolean allowQueuedScheduling;
 
+	/** The mode in which the job is scheduled */
 	private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
 	
-	private boolean checkpointingEnabled = false;
+	/** The settings for asynchronous snapshotting */
+	private JobSnapshottingSettings snapshotSettings;
 	
-	private long checkpointingInterval = 10000;
 	
 	// --------------------------------------------------------------------------------------------
 	
@@ -258,20 +268,24 @@ public class JobGraph implements Serializable {
 		return this.taskVertices.size();
 	}
 
-	public void setCheckpointingEnabled(boolean checkpointingEnabled) {
-		this.checkpointingEnabled = checkpointingEnabled;
-	}
-
-	public boolean isCheckpointingEnabled() {
-		return checkpointingEnabled;
-	}
-
-	public void setCheckpointingInterval(long checkpointingInterval) {
-		this.checkpointingInterval = checkpointingInterval;
+	/**
+	 * Sets the settings for asynchronous snapshots. A value of {@code null} means that
+	 * snapshotting is not enabled.
+	 *
+	 * @param settings The snapshot settings, or null, to disable snapshotting.
+	 */
+	public void setSnapshotSettings(JobSnapshottingSettings settings) {
+		this.snapshotSettings = settings;
 	}
 
-	public long getCheckpointingInterval() {
-		return checkpointingInterval;
+	/**
+	 * Gets the settings for asynchronous snapshots. This method returns null, when
+	 * snapshotting is not enabled.
+	 * 
+	 * @return The snapshot settings, or null, if snapshotting is not enabled.
+	 */
+	public JobSnapshottingSettings getSnapshotSettings() {
+		return snapshotSettings;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index 445c842..1cf5db2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -40,7 +40,7 @@ public abstract class AbstractInvokable {
 
 
 	/** The environment assigned to this invokable. */
-	private volatile Environment environment;
+	private Environment environment;
 
 	/** The execution config, cached from the deserialization from the JobConfiguration */
 	private ExecutionConfig executionConfig;
@@ -66,14 +66,14 @@ public abstract class AbstractInvokable {
 	 * @param environment
 	 *        the environment of this task
 	 */
-	public final void setEnvironment(final Environment environment) {
+	public final void setEnvironment(Environment environment) {
 		this.environment = environment;
 	}
 
 	/**
 	 * Returns the environment of this task.
 	 * 
-	 * @return the environment of this task or <code>null</code> if the environment has not yet been set
+	 * @return The environment of this task.
 	 */
 	public Environment getEnvironment() {
 		return this.environment;

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
new file mode 100644
index 0000000..69cb1f8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+public interface CheckpointCommittingOperator {
+	
+	void confirmCheckpoint(long checkpointId, long timestamp);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
new file mode 100644
index 0000000..d07b07e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+public interface CheckpointedOperator {
+	
+	void triggerCheckpoint(long checkpointId, long timestamp);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
new file mode 100644
index 0000000..86c9b60
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.List;
+
+/**
+ * The JobSnapshottingSettings are attached to a JobGraph and describe the settings
+ * for the asynchronous snapshotting of the JobGraph, such as interval, and which vertices
+ * need to participate.
+ */
+public class JobSnapshottingSettings implements java.io.Serializable{
+	
+	private static final long serialVersionUID = -2593319571078198180L;
+
+	/** The default time in which pending checkpoints need to be acknowledged before timing out */
+	public static final long DEFAULT_SNAPSHOT_TIMEOUT = 10 * 60 * 1000; // 10 minutes
+	
+	private final List<JobVertexID> verticesToTrigger;
+
+	private final List<JobVertexID> verticesToAcknowledge;
+
+	private final List<JobVertexID> verticesToConfirm;
+	
+	private final long checkpointInterval;
+	
+	private final long checkpointTimeout;
+
+
+	public JobSnapshottingSettings(List<JobVertexID> verticesToTrigger,
+									List<JobVertexID> verticesToAcknowledge,
+									List<JobVertexID> verticesToConfirm,
+									long checkpointInterval)
+	{
+		this(verticesToTrigger, verticesToAcknowledge, verticesToConfirm, checkpointInterval, DEFAULT_SNAPSHOT_TIMEOUT);
+	}
+	
+	public JobSnapshottingSettings(List<JobVertexID> verticesToTrigger,
+									List<JobVertexID> verticesToAcknowledge,
+									List<JobVertexID> verticesToConfirm,
+									long checkpointInterval, long checkpointTimeout)
+	{
+		this.verticesToTrigger = verticesToTrigger;
+		this.verticesToAcknowledge = verticesToAcknowledge;
+		this.verticesToConfirm = verticesToConfirm;
+		this.checkpointInterval = checkpointInterval;
+		this.checkpointTimeout = checkpointTimeout;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+
+	public List<JobVertexID> getVerticesToTrigger() {
+		return verticesToTrigger;
+	}
+	
+	public List<JobVertexID> getVerticesToAcknowledge() {
+		return verticesToAcknowledge;
+	}
+
+	public List<JobVertexID> getVerticesToConfirm() {
+		return verticesToConfirm;
+	}
+
+	public long getCheckpointInterval() {
+		return checkpointInterval;
+	}
+
+	public long getCheckpointTimeout() {
+		return checkpointTimeout;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public String toString() {
+		return String.format("SnapshotSettings: interval=%d, timeout=%d, trigger=%s, ack=%s, commit=%s",
+				checkpointInterval, checkpointTimeout, verticesToTrigger, verticesToAcknowledge, verticesToConfirm);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
index 670dc3f..576edb6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
@@ -21,12 +21,18 @@ package org.apache.flink.runtime.jobgraph.tasks;
 import org.apache.flink.runtime.state.StateHandle;
 
 /**
- * This is an interface meant to be implemented by any invokable that has to support state recovery.
- * It is mainly used by the TaskManager to identify operators that support state recovery in order 
- * to inject their initial state upon creation.
+ * This interface must be implemented by any invokable that has recoverable state.
+ * The method {@link #setInitialState(org.apache.flink.runtime.state.StateHandle)} is used
+ * to set the initial state of the operator, upon recovery.
  */
-public interface OperatorStateCarrier {
+public interface OperatorStateCarrier<T extends StateHandle<?>> {
 
-	public void injectState(StateHandle stateHandle);
+	/**
+	 * Sets the initial state of the operator, upon recovery. The initial state is typically
+	 * a snapshot of the state from a previous execution.
+	 * 
+	 * @param stateHandle The handle to the state.
+	 */
+	public void setInitialState(T stateHandle);
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbortCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbortCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbortCheckpoint.java
deleted file mode 100644
index 0493ba6..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AbortCheckpoint.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages.checkpoint;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-
-/**
- * This message is sent from the {@link org.apache.flink.runtime.jobmanager.JobManager} to the
- * {@link org.apache.flink.runtime.taskmanager.TaskManager} to tell a task that the checkpoint
- * has been confirmed and that the task can commit the checkpoint to the outside world.
- */
-public class AbortCheckpoint extends AbstractCheckpointMessage implements java.io.Serializable {
-
-	private static final long serialVersionUID = 2094094662279578953L;
-
-	public AbortCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
-		super(job, taskExecutionId, checkpointId);
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public boolean equals(Object o) {
-		return this == o || ( (o instanceof AbortCheckpoint) && super.equals(o));
-	}
-
-	@Override
-	public String toString() {
-		return String.format("AbortCheckpoint %d for (%s/%s)", 
-				getCheckpointId(), getJob(), getTaskExecutionId());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
index dd94e37..db12e0a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.messages.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.SerializedValue;
 
 /**
  * This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the
@@ -33,18 +34,19 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
 
 	private static final long serialVersionUID = -7606214777192401493L;
 	
-	private final StateHandle state;
+	private final SerializedValue<StateHandle<?>> state;
 
 	public AcknowledgeCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
 		this(job, taskExecutionId, checkpointId, null);
 	}
 
-	public AcknowledgeCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, StateHandle state) {
+	public AcknowledgeCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId,
+									SerializedValue<StateHandle<?>> state) {
 		super(job, taskExecutionId, checkpointId);
 		this.state = state;
 	}
 
-	public StateHandle getState() {
+	public SerializedValue<StateHandle<?>> getState() {
 		return state;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
index cdfd202..d3a4374 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
@@ -30,15 +30,39 @@ public class ConfirmCheckpoint extends AbstractCheckpointMessage implements java
 
 	private static final long serialVersionUID = 2094094662279578953L;
 
-	public ConfirmCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
+	/** The timestamp associated with the checkpoint */
+	private final long timestamp;
+	
+	public ConfirmCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp) {
 		super(job, taskExecutionId, checkpointId);
+		this.timestamp = timestamp;
 	}
 
 	// --------------------------------------------------------------------------------------------
 
+	public long getTimestamp() {
+		return timestamp;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return super.hashCode() + (int) (timestamp ^ (timestamp >>> 32));
+	}
+
 	@Override
 	public boolean equals(Object o) {
-		return this == o || ( (o instanceof ConfirmCheckpoint) && super.equals(o));
+		if (this == o) {
+			return true;
+		}
+		else if (o instanceof ConfirmCheckpoint) {
+			ConfirmCheckpoint that = (ConfirmCheckpoint) o;
+			return this.timestamp == that.timestamp && super.equals(o);
+		}
+		else {
+			return false;
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
index 98712b5..f47b054 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
@@ -18,36 +18,23 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.IOException;
 import java.util.Map;
 
 /**
- * A StateHandle that includes a copy of the state itself. This state handle is recommended for 
- * cases where the operatorState is lightweight enough to pass throughout the network.
- *
- * State is kept in a byte[] because it may contain userclasses, which akka is not able to handle.
+ * A StateHandle that includes a map of operator states directly.
  */
-public class LocalStateHandle implements StateHandle{
+public class LocalStateHandle implements StateHandle<Map<String, OperatorState<?>>> {
+
+	private static final long serialVersionUID = 2093619217898039610L;
 	
-	transient private Map<String, OperatorState<?>> stateMap;
-	private final byte[] state;
+	private final Map<String, OperatorState<?>> stateMap;
 
-	public LocalStateHandle(Map<String,OperatorState<?>> state) throws IOException {
+	public LocalStateHandle(Map<String,OperatorState<?>> state) {
 		this.stateMap = state;
-		this.state = InstantiationUtil.serializeObject(state);
 	}
 
 	@Override
-	public Map<String,OperatorState<?>> getState(ClassLoader usercodeClassloader) {
-		if(stateMap == null) {
-			try {
-				stateMap = (Map<String, OperatorState<?>>) InstantiationUtil.deserializeObject(this.state, usercodeClassloader);
-			} catch (Exception e) {
-				throw new RuntimeException("Error while deserializing the state", e);
-			}
-		}
+	public Map<String,OperatorState<?>> getState() {
 		return stateMap;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
index 1852ce8..409383c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
@@ -19,20 +19,19 @@
 package org.apache.flink.runtime.state;
 
 import java.io.Serializable;
-import java.util.Map;
 
 /**
  * StateHandle is a general handle interface meant to abstract operator state fetching. 
  * A StateHandle implementation can for example include the state itself in cases where the state 
  * is lightweight or fetching it lazily from some external storage when the state is too large.
  */
-public interface StateHandle extends Serializable {
+public interface StateHandle<T> extends Serializable {
 
 	/**
-	 * getState should retrieve and return the state managed the handle. 
+	 * This retrieves and return the state represented by the handle. 
 	 * 
-	 * @return
+	 * @return The state represented by the handle.
+	 * @throws java.lang.Exception Thrown, if the state cannot be fetched.
 	 */
-	public Map<String,OperatorState<?>> getState(ClassLoader userClassloader);
-	
+	T getState() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
new file mode 100644
index 0000000..2cdfef3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
+
+/**
+ * A collection of utility methods for dealing with operator state.
+ */
+public class StateUtils {
+
+	/**
+	 * Utility method to define a common generic bound to be used for setting a generic state
+	 * handle on a generic state carrier.
+	 * 
+	 * This has no impact on runtime, since internally, it performs
+	 * unchecked casts. The purpose is merely to allow the use of generic interfaces without resorting
+	 * to raw types, by giving the compiler a common type bound. 
+	 * 
+	 * @param op The state carrier operator.
+	 * @param state The state handle.
+	 * @param <T> Type bound for the  
+	 */
+	public static <T extends StateHandle<?>> void setOperatorState(OperatorStateCarrier<?> op, StateHandle<?> state) {
+		@SuppressWarnings("unchecked")
+		OperatorStateCarrier<T> typedOp = (OperatorStateCarrier<T>) op;
+		@SuppressWarnings("unchecked")
+		T typedHandle = (T) state;
+
+		typedOp.setInitialState(typedHandle);
+	}
+	
+	
+	// ------------------------------------------------------------------------
+	
+	/** Do not instantiate */
+	private StateUtils() {}
+}