You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/10 15:00:10 UTC

[10/14] flink git commit: [FLINK-1638] [streaming] Add StateHandle and include javadoc

[FLINK-1638] [streaming] Add StateHandle and include javadoc

This closes #459


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2b5c21d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2b5c21d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2b5c21d

Branch: refs/heads/master
Commit: f2b5c21da6a297f20ffad99e9f26ccb0a9491881
Parents: 490fa70
Author: Paris Carbone <se...@gmail.com>
Authored: Mon Mar 9 14:58:30 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100

----------------------------------------------------------------------
 .../deployment/TaskDeploymentDescriptor.java    | 13 ++--
 .../flink/runtime/executiongraph/Execution.java | 13 ++--
 .../runtime/executiongraph/ExecutionGraph.java  |  6 +-
 .../runtime/executiongraph/ExecutionVertex.java | 29 ++++----
 .../jobgraph/tasks/BarrierTransceiver.java      | 16 ++++-
 .../jobgraph/tasks/OperatorStateCarrier.java    | 15 ++--
 .../flink/runtime/state/LocalStateHandle.java   | 41 +++++++++++
 .../apache/flink/runtime/state/StateHandle.java | 40 +++++++++++
 .../StreamCheckpointCoordinator.scala           | 76 +++++++++++++-------
 .../flink/runtime/taskmanager/TaskManager.scala |  2 +-
 .../api/streamvertex/StreamVertex.java          |  9 ++-
 11 files changed, 191 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index b2573f7..6993248 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -23,12 +23,11 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.runtime.state.StateHandle;
 
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -79,7 +78,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	/** The list of JAR files required to run this task. */
 	private final List<BlobKey> requiredJarFiles;
 	
-	private Map<String, OperatorState<?>> operatorStates;
+	private StateHandle operatorStates;
 
 	/**
 	 * Constructs a task deployment descriptor.
@@ -129,13 +128,13 @@ public final class TaskDeploymentDescriptor implements Serializable {
 			Configuration taskConfiguration, String invokableClassName,
 			List<PartitionDeploymentDescriptor> producedPartitions,
 			List<PartitionConsumerDeploymentDescriptor> consumedPartitions,
-			List<BlobKey> requiredJarFiles, int targetSlotNumber, Map<String,OperatorState<?>> operatorStates) {
+			List<BlobKey> requiredJarFiles, int targetSlotNumber, StateHandle operatorStates) {
 
 		this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks,
 				jobConfiguration, taskConfiguration, invokableClassName, producedPartitions,
 				consumedPartitions, requiredJarFiles, targetSlotNumber);
 		
-		setOperatorStates(operatorStates);
+		setOperatorState(operatorStates);
 	}
 
 	/**
@@ -244,11 +243,11 @@ public final class TaskDeploymentDescriptor implements Serializable {
 				strProducedPartitions, strConsumedPartitions);
 	}
 
-	public void setOperatorStates(Map<String,OperatorState<?>> operatorStates) {
+	public void setOperatorState(StateHandle operatorStates) {
 		this.operatorStates = operatorStates;
 	}
 
-	public Map<String, OperatorState<?>> getOperatorStates() {
+	public StateHandle getOperatorStates() {
 		return operatorStates;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 93845c7..cf24b20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -46,7 +46,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
-import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 
@@ -56,7 +56,6 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeoutException;
@@ -124,7 +123,7 @@ public class Execution implements Serializable {
 	
 	private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution
 	
-	private Map<String,OperatorState<?>> operatorStates;
+	private StateHandle operatorState;
 
 	// --------------------------------------------------------------------------------------------
 	
@@ -858,11 +857,11 @@ public class Execution implements Serializable {
 				(assignedResource == null ? "(unassigned)" : assignedResource.toString()), state);
 	}
 
-	public void setOperatorStates(Map<String,OperatorState<?>> operatorStates) {
-		this.operatorStates = operatorStates;
+	public void setOperatorState(StateHandle operatorStates) {
+		this.operatorState = operatorStates;
 	}
 
-	public Map<String,OperatorState<?>> getOperatorStates() {
-		return operatorStates;
+	public StateHandle getOperatorState() {
+		return operatorState;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 0c6c3a7..c319a5c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.StreamCheckpointCoordinator;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
@@ -567,9 +567,9 @@ public class ExecutionGraph implements Serializable {
 		}
 	}
 	
-	public synchronized void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> , Map<String,OperatorState<?>>> states)
+	public synchronized void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> , StateHandle> states)
 	{
-		for(Map.Entry<Tuple3<JobVertexID, Integer, Long> , Map<String,OperatorState<?>>> state : states.entrySet())
+		for(Map.Entry<Tuple3<JobVertexID, Integer, Long> , StateHandle> state : states.entrySet())
 		{
 			tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 41d34d5..24bcf21 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -18,17 +18,17 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.runtime.deployment.PartialPartitionInfo;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.deployment.PartialPartitionInfo;
 import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartitionInfo;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
@@ -38,9 +38,8 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.runtime.state.StateHandle;
 import org.slf4j.Logger;
-
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
@@ -48,13 +47,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import static com.google.common.base.Preconditions.checkElementIndex;
-import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
-import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
-import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 
 /**
  * The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several times, each of
@@ -91,7 +86,7 @@ public class ExecutionVertex implements Serializable {
 	
 	private volatile boolean scheduleLocalOnly;
 	
-	private Map<String,OperatorState<?>> operatorState;
+	private StateHandle operatorState;
 	
 	// --------------------------------------------------------------------------------------------
 
@@ -199,11 +194,11 @@ public class ExecutionVertex implements Serializable {
 		return currentExecution.getAssignedResourceLocation();
 	}
 
-	public void setOperatorState(Map<String,OperatorState<?>> operatorState) {
+	public void setOperatorState(StateHandle operatorState) {
 		this.operatorState = operatorState;
 	}
 
-	public Map<String,OperatorState<?>> getOperatorState() {
+	public StateHandle getOperatorState() {
 		return operatorState;
 	}
 	
@@ -382,7 +377,8 @@ public class ExecutionVertex implements Serializable {
 			Execution execution = currentExecution;
 			ExecutionState state = execution.getState();
 
-			if (state == FINISHED || state == CANCELED || state == FAILED) {
+			if (state == ExecutionState.FINISHED || state == ExecutionState.CANCELED
+					|| state == ExecutionState.FAILED) {
 				priorExecutions.add(execution);
 				currentExecution = new Execution(this, execution.getAttemptNumber()+1,
 						System.currentTimeMillis(), timeout);
@@ -394,7 +390,7 @@ public class ExecutionVertex implements Serializable {
 				
 				if(operatorState!=null)
 				{
-					execution.setOperatorStates(operatorState);
+					execution.setOperatorState(operatorState);
 				}
 				
 			}
@@ -440,8 +436,9 @@ public class ExecutionVertex implements Serializable {
 		ExecutionState state = execution.getState();
 
 		// sanity check
-		if (!(state == FINISHED || state == CANCELED || state == FAILED)) {
-			throw new IllegalStateException("Cannot archive ExecutionVertex that is not in a finished state.");
+		if (!(state == ExecutionState.FINISHED || state == ExecutionState.CANCELED || state == ExecutionState.FAILED)) {
+			throw new IllegalStateException(
+					"Cannot archive ExecutionVertex that is not in a finished state.");
 		}
 		
 		// prepare the current execution for archiving

http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
index c56da62..0a8642e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
@@ -18,10 +18,24 @@
 package org.apache.flink.runtime.jobgraph.tasks;
 
 
+/**
+ * A BarrierTransceiver describes an operator's barrier checkpointing behavior used for 
+ * fault tolerance. In the most common case [[broadcastBarrier]] is being expected to be called 
+ * periodically upon receiving a checkpoint barrier. Furthermore, a [[confirmBarrier]] method should
+ * be implemented and used for acknowledging a specific checkpoint checkpoint.
+ */
 public interface BarrierTransceiver {
 
+	/**
+	 * A callback for notifying an operator of a new checkpoint barrier.
+	 * @param barrierID
+	 */
 	public void broadcastBarrier(long barrierID);
-	
+
+	/**
+	 * A callback for confirming that a barrier checkpoint is complete
+	 * @param barrierID
+	 */
 	public void confirmBarrier(long barrierID);
 	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
index e8b6d6b..670dc3f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
@@ -18,12 +18,15 @@
 
 package org.apache.flink.runtime.jobgraph.tasks;
 
-import org.apache.flink.runtime.state.OperatorState;
-
-import java.util.Map;
+import org.apache.flink.runtime.state.StateHandle;
 
+/**
+ * This is an interface meant to be implemented by any invokable that has to support state recovery.
+ * It is mainly used by the TaskManager to identify operators that support state recovery in order 
+ * to inject their initial state upon creation.
+ */
 public interface OperatorStateCarrier {
-	
-	public void injectStates(Map<String, OperatorState<?>> state);
-	
+
+	public void injectState(StateHandle stateHandle);
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
new file mode 100644
index 0000000..ac40bf8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+
+import java.util.Map;
+
+/**
+ * A StateHandle that includes a copy of the state itself. This state handle is recommended for 
+ * cases where the operatorState is lightweight enough to pass throughout the network. 
+ * 
+ */
+public class LocalStateHandle implements StateHandle{
+	
+	private final Map<String, OperatorState<?>>  state;
+
+	public LocalStateHandle(Map<String,OperatorState<?>> state) {
+		this.state = state;
+	}
+
+	@Override
+	public Map<String,OperatorState<?>> getState() {
+		return state;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
new file mode 100644
index 0000000..ddc8038
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * StateHandle is a general handle interface meant to abstract operator state fetching. 
+ * A StateHandle implementation can for example include the state itself in cases where the state 
+ * is lightweight or fetching it lazily from some external storage when the state is too large.
+ * 
+ */
+public interface StateHandle extends Serializable{
+
+	/**
+	 * getState should retrieve and return the state managed the handle. 
+	 * 
+	 * @return
+	 */
+	public Map<String,OperatorState<?>> getState();
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
index 7ab6a6f..fee69b5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
@@ -26,38 +26,47 @@ import org.apache.flink.runtime.execution.ExecutionState.RUNNING
 import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph, ExecutionVertex}
 import org.apache.flink.runtime.jobgraph.JobStatus._
 import org.apache.flink.runtime.jobgraph.{JobID, JobVertexID}
-import org.apache.flink.runtime.state.OperatorState
+import org.apache.flink.runtime.state.StateHandle
 
 import scala.collection.JavaConversions._
 import scala.collection.immutable.TreeMap
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.duration.{FiniteDuration, _}
 
-object StreamCheckpointCoordinator {
-
-  def spawn(context: ActorContext,executionGraph: ExecutionGraph,
-            interval: FiniteDuration = 5 seconds): ActorRef = {
-
-    val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph)
-    val monitor = context.system.actorOf(Props(new StreamCheckpointCoordinator(executionGraph,
-      vertices,vertices.map(x => ((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex),
-              List.empty[Long])).toMap, Map() ,interval,0L,-1L)))
-    monitor ! InitBarrierScheduler
-    monitor
-  }
-
-  private def getExecutionVertices(executionGraph: ExecutionGraph): Iterable[ExecutionVertex] = {
-    for((_,execJobVertex) <- executionGraph.getAllVertices;
-        execVertex: ExecutionVertex <- execJobVertex.getTaskVertices)
-    yield execVertex
-  }
-}
+/**
+ * The StreamCheckpointCoordinator is responsible for operator state management and checkpoint
+ * coordination in streaming jobs. It periodically sends checkpoint barriers to the sources of a
+ * running job and constantly collects acknowledgements from operators while the barriers are being 
+ * disseminated throughout the execution graph. Upon time intervals it finds the last globally
+ * acknowledged checkpoint barrier to be used for a consistent recovery and loads all associated 
+ * state handles to the respected execution vertices.
+ * 
+ * The following messages describe this actor's expected behavior: 
+ *
+ *  - [[InitBarrierScheduler]] initiates the actor and schedules the periodic [[BarrierTimeout]] 
+ *  and [[CompactAndUpdate]] messages that are used for maintaining the state checkpointing logic. 
+ *
+ *  - [[BarrierTimeout]] is periodically triggered upon initiation in order to start a new 
+ *  checkpoint barrier. That is when the barriers are being disseminated to the source vertices.
+ *
+ *  - [[BarrierAck]] is being sent by each operator upon the completion of a state checkpoint. All
+ *  such acknowledgements are being collected and inspected upon [[CompactAndUpdate]] handling in
+ *  order to find out the last consistent checkpoint.
+ *  
+ *  - [[StateBarrierAck]] describes an acknowledgement such as the case of a [[BarrierAck]] that 
+ *  additionally carries operatorState with it.
+ *
+ * - [[CompactAndUpdate]] marks the last globally consistent checkpoint barrier to be used for 
+ * recovery purposes and removes all older states and acknowledgements up to that barrier.
+ * Furthermore, it updates the current ExecutionGraph with the current operator state handles 
+ * 
+ */
 
 class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
                          val vertices: Iterable[ExecutionVertex],
                          var acks: Map[(JobVertexID,Int),List[Long]],
                          var states: Map[(JobVertexID, Integer, Long), 
-                                 java.util.Map[String,OperatorState[_]]],
+                                 StateHandle],
                          val interval: FiniteDuration,var curId: Long,var ackId: Long)
         extends Actor with ActorLogMessages with ActorLogging {
   
@@ -95,8 +104,6 @@ class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
           }
           log.debug(acks.toString)
       
-      
-      
     case CompactAndUpdate =>
       val barrierCount = acks.values.foldLeft(TreeMap[Long,Int]().withDefaultValue(0))((dict,myList)
       => myList.foldLeft(dict)((dict2,elem) => dict2.updated(elem,dict2(elem) + 1)))
@@ -108,7 +115,26 @@ class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
       executionGraph.loadOperatorStates(states)
       
   }
-  
+}
+
+object StreamCheckpointCoordinator {
+
+  def spawn(context: ActorContext,executionGraph: ExecutionGraph,
+            interval: FiniteDuration = 5 seconds): ActorRef = {
+
+    val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph)
+    val monitor = context.system.actorOf(Props(new StreamCheckpointCoordinator(executionGraph,
+      vertices,vertices.map(x => ((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex),
+              List.empty[Long])).toMap, Map() ,interval,0L,-1L)))
+    monitor ! InitBarrierScheduler
+    monitor
+  }
+
+  private def getExecutionVertices(executionGraph: ExecutionGraph): Iterable[ExecutionVertex] = {
+    for((_,execJobVertex) <- executionGraph.getAllVertices;
+        execVertex: ExecutionVertex <- execJobVertex.getTaskVertices)
+    yield execVertex
+  }
 }
 
 case class BarrierTimeout()
@@ -122,7 +148,7 @@ case class BarrierReq(attemptID: ExecutionAttemptID,checkpointID: Long)
 case class BarrierAck(jobID: JobID,jobVertexID: JobVertexID,instanceID: Int,checkpointID: Long)
 
 case class StateBarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Integer,
-                           checkpointID: Long, states: java.util.Map[String,OperatorState[_]])
+                           checkpointID: Long, states: StateHandle)
        
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 3de917b..53c45ce 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -443,7 +443,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
       {
         val vertex = task.getEnvironment.getInvokable match {
           case opStateCarrier: OperatorStateCarrier =>
-            opStateCarrier.injectStates(tdd.getOperatorStates)
+            opStateCarrier.injectState(tdd.getOperatorStates)
         }
       }
       

http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index eb0d6ed..3548712 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
 import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
 import org.apache.flink.runtime.jobmanager.BarrierAck;
 import org.apache.flink.runtime.jobmanager.StateBarrierAck;
+import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.streaming.api.StreamConfig;
@@ -112,7 +114,8 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 		if (configuration.getStateMonitoring() && !states.isEmpty()) {
 			getEnvironment().getJobManager().tell(
 					new StateBarrierAck(getEnvironment().getJobID(), getEnvironment()
-							.getJobVertexId(), context.getIndexOfThisSubtask(), barrierID, states),
+							.getJobVertexId(), context.getIndexOfThisSubtask(), barrierID, 
+							new LocalStateHandle(states)),
 					ActorRef.noSender());
 		} else {
 			getEnvironment().getJobManager().tell(
@@ -284,8 +287,8 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 	 * Re-injects the user states into the map
 	 */
 	@Override
-	public void injectStates(Map<String, OperatorState<?>> states) {
-		this.states.putAll(states);
+	public void injectState(StateHandle stateHandle) {
+		this.states.putAll(stateHandle.getState());
 	}
 
 	private class SuperstepEventListener implements EventListener<TaskEvent> {