You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:24 UTC

[17/50] [abbrv] flink git commit: [Flink-5892] Restore state on operator level

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationV2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationV2.java
deleted file mode 100644
index 83c188c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationV2.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.checkpoint;
-
-//import com.google.common.collect.Lists;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.TaskStateHandles;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * This class encapsulates the operation of assigning restored state when restoring from a checkpoint that works on the
- * granularity of operators. This is the case for checkpoints that were created either with a Flink version >= 1.3 or
- * 1.2 if the savepoint only contains {@link SubtaskState}s for which the length of contained
- * {@link ChainedStateHandle}s is equal to 1.
- */
-public class StateAssignmentOperationV2 {
-
-	private final Logger logger;
-	private final Map<JobVertexID, ExecutionJobVertex> tasks;
-	private final Map<JobVertexID, TaskState> taskStates;
-	private final boolean allowNonRestoredState;
-
-	public StateAssignmentOperationV2(
-			Logger logger,
-			Map<JobVertexID, ExecutionJobVertex> tasks,
-			Map<JobVertexID, TaskState> taskStates,
-			boolean allowNonRestoredState) {
-
-		this.logger = Preconditions.checkNotNull(logger);
-		this.tasks = Preconditions.checkNotNull(tasks);
-		this.taskStates = Preconditions.checkNotNull(taskStates);
-		this.allowNonRestoredState = allowNonRestoredState;
-	}
-
-	public boolean assignStates() throws Exception {
-		Map<JobVertexID, TaskState> localStates = new HashMap<>(taskStates);
-		Map<JobVertexID, ExecutionJobVertex> localTasks = this.tasks;
-
-		Set<JobVertexID> allOperatorIDs = new HashSet<>();
-		for (ExecutionJobVertex executionJobVertex : tasks.values()) {
-			//allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs()));
-		}
-		for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : taskStates.entrySet()) {
-			TaskState taskState = taskGroupStateEntry.getValue();
-			//----------------------------------------find operator for state---------------------------------------------
-
-			if (!allOperatorIDs.contains(taskGroupStateEntry.getKey())) {
-				if (allowNonRestoredState) {
-					logger.info("Skipped checkpoint state for operator {}.", taskState.getJobVertexID());
-					continue;
-				} else {
-					throw new IllegalStateException("There is no operator for the state " + taskState.getJobVertexID());
-				}
-			}
-		}
-
-		for (Map.Entry<JobVertexID, ExecutionJobVertex> task : localTasks.entrySet()) {
-			final ExecutionJobVertex executionJobVertex = task.getValue();
-
-			// find the states of all operators belonging to this task
-			JobVertexID[] operatorIDs = null;//executionJobVertex.getOperatorIDs();
-			JobVertexID[] altOperatorIDs = null;//executionJobVertex.getUserDefinedOperatorIDs();
-			List<TaskState> operatorStates = new ArrayList<>();
-			boolean statelessTask = true;
-			for (int x = 0; x < operatorIDs.length; x++) {
-				JobVertexID operatorID = altOperatorIDs[x] == null
-					? operatorIDs[x]
-					: altOperatorIDs[x];
-
-				TaskState operatorState = localStates.remove(operatorID);
-				if (operatorState == null) {
-					operatorState = new TaskState(
-						operatorID,
-						executionJobVertex.getParallelism(),
-						executionJobVertex.getMaxParallelism(),
-						1);
-				} else {
-					statelessTask = false;
-				}
-				operatorStates.add(operatorState);
-			}
-			if (statelessTask) { // skip tasks where no operator has any state
-				continue;
-			}
-
-			assignAttemptState(task.getValue(), operatorStates);
-		}
-
-		return true;
-	}
-
-	private void assignAttemptState(ExecutionJobVertex executionJobVertex, List<TaskState> operatorStates) {
-
-		JobVertexID[] operatorIDs = null;//executionJobVertex.getOperatorIDs();
-
-		//1. first compute the new parallelism
-		checkParallelismPreconditions(operatorStates, executionJobVertex);
-
-		int newParallelism = executionJobVertex.getParallelism();
-
-		List<KeyGroupRange> keyGroupPartitions = null;//StateAssignmentOperationUtils.createKeyGroupPartitions(
-			//executionJobVertex.getMaxParallelism(),
-			//newParallelism);
-
-		//2. Redistribute the operator state.
-		/**
-		 *
-		 * Redistribute ManagedOperatorStates and RawOperatorStates from old parallelism to new parallelism.
-		 *
-		 * The old ManagedOperatorStates with old parallelism 3:
-		 *
-		 * 		parallelism0 parallelism1 parallelism2
-		 * op0   states0,0    state0,1	   state0,2
-		 * op1
-		 * op2   states2,0    state2,1	   state1,2
-		 * op3   states3,0    state3,1     state3,2
-		 *
-		 * The new ManagedOperatorStates with new parallelism 4:
-		 *
-		 * 		parallelism0 parallelism1 parallelism2 parallelism3
-		 * op0   state0,0	  state0,1 	   state0,2		state0,3
-		 * op1
-		 * op2   state2,0	  state2,1 	   state2,2		state2,3
-		 * op3   state3,0	  state3,1 	   state3,2		state3,3
-		 */
-		List<List<Collection<OperatorStateHandle>>> newManagedOperatorStates = new ArrayList<>();
-		List<List<Collection<OperatorStateHandle>>> newRawOperatorStates = new ArrayList<>();
-
-		reDistributePartitionableStates(operatorStates, newParallelism, newManagedOperatorStates, newRawOperatorStates);
-
-
-		//3. Compute TaskStateHandles of every subTask in the executionJobVertex
-		/**
-		 *  An executionJobVertex's all state handles needed to restore are something like a matrix
-		 *
-		 * 		parallelism0 parallelism1 parallelism2 parallelism3
-		 * op0   sh(0,0)     sh(0,1)       sh(0,2)	    sh(0,3)
-		 * op1   sh(1,0)	 sh(1,1)	   sh(1,2)	    sh(1,3)
-		 * op2   sh(2,0)	 sh(2,1)	   sh(2,2)		sh(2,3)
-		 * op3   sh(3,0)	 sh(3,1)	   sh(3,2)		sh(3,3)
-		 *
-		 * we will compute the state handles column by column.
-		 *
-		 */
-		for (int subTaskIndex = 0; subTaskIndex < newParallelism; subTaskIndex++) {
-
-			Execution currentExecutionAttempt = executionJobVertex.getTaskVertices()[subTaskIndex]
-				.getCurrentExecutionAttempt();
-
-			List<StreamStateHandle> subNonPartitionableState = new ArrayList<>();
-
-			Tuple2<Collection<KeyedStateHandle>, Collection<KeyedStateHandle>> subKeyedState = null;
-
-			List<Collection<OperatorStateHandle>> subManagedOperatorState = new ArrayList<>();
-			List<Collection<OperatorStateHandle>> subRawOperatorState = new ArrayList<>();
-
-
-			for (int operatorIndex = 0; operatorIndex < operatorIDs.length; operatorIndex++) {
-				TaskState operatorState = operatorStates.get(operatorIndex);
-				int oldParallelism = operatorState.getParallelism();
-
-				// NonPartitioned State
-
-				reAssignSubNonPartitionedStates(
-					operatorState,
-					subTaskIndex,
-					newParallelism,
-					oldParallelism,
-					subNonPartitionableState);
-
-				// PartitionedState
-				reAssignSubPartitionableState(newManagedOperatorStates,
-					newRawOperatorStates,
-					subTaskIndex,
-					operatorIndex,
-					subManagedOperatorState,
-					subRawOperatorState);
-
-				// KeyedState
-				if (operatorIndex == operatorIDs.length - 1) {
-					subKeyedState = reAssignSubKeyedStates(operatorState,
-						keyGroupPartitions,
-						subTaskIndex,
-						newParallelism,
-						oldParallelism);
-
-				}
-			}
-
-
-			// check if a stateless task
-			if (!allElementsAreNull(subNonPartitionableState) ||
-				!allElementsAreNull(subManagedOperatorState) ||
-				!allElementsAreNull(subRawOperatorState) ||
-				subKeyedState != null) {
-
-				TaskStateHandles taskStateHandles = new TaskStateHandles(
-
-					new ChainedStateHandle<>(subNonPartitionableState),
-					subManagedOperatorState,
-					subRawOperatorState,
-					subKeyedState != null ? subKeyedState.f0 : null,
-					subKeyedState != null ? subKeyedState.f1 : null);
-
-				currentExecutionAttempt.setInitialState(taskStateHandles);
-			}
-		}
-	}
-
-
-	public void checkParallelismPreconditions(List<TaskState> operatorStates, ExecutionJobVertex executionJobVertex) {
-
-		for (TaskState taskState : operatorStates) {
-			//StateAssignmentOperation.checkParallelismPreconditions(taskState, executionJobVertex, this.logger);
-		}
-	}
-
-
-	private void reAssignSubPartitionableState(
-			List<List<Collection<OperatorStateHandle>>> newMangedOperatorStates,
-			List<List<Collection<OperatorStateHandle>>> newRawOperatorStates,
-			int subTaskIndex, int operatorIndex,
-			List<Collection<OperatorStateHandle>> subManagedOperatorState,
-			List<Collection<OperatorStateHandle>> subRawOperatorState) {
-
-		if (newMangedOperatorStates.get(operatorIndex) != null) {
-			subManagedOperatorState.add(newMangedOperatorStates.get(operatorIndex).get(subTaskIndex));
-		} else {
-			subManagedOperatorState.add(null);
-		}
-		if (newRawOperatorStates.get(operatorIndex) != null) {
-			subRawOperatorState.add(newRawOperatorStates.get(operatorIndex).get(subTaskIndex));
-		} else {
-			subRawOperatorState.add(null);
-		}
-
-
-	}
-
-	private Tuple2<Collection<KeyedStateHandle>, Collection<KeyedStateHandle>> reAssignSubKeyedStates(
-			TaskState operatorState,
-			List<KeyGroupRange> keyGroupPartitions,
-			int subTaskIndex,
-			int newParallelism,
-			int oldParallelism) {
-
-		Collection<KeyedStateHandle> subManagedKeyedState;
-		Collection<KeyedStateHandle> subRawKeyedState;
-
-		if (newParallelism == oldParallelism) {
-			if (operatorState.getState(subTaskIndex) != null) {
-				KeyedStateHandle oldSubManagedKeyedState = operatorState.getState(subTaskIndex).getManagedKeyedState();
-				KeyedStateHandle oldSubRawKeyedState = operatorState.getState(subTaskIndex).getRawKeyedState();
-				subManagedKeyedState = oldSubManagedKeyedState != null ? Collections.singletonList(
-					oldSubManagedKeyedState) : null;
-				subRawKeyedState = oldSubRawKeyedState != null ? Collections.singletonList(
-					oldSubRawKeyedState) : null;
-			} else {
-				subManagedKeyedState = null;
-				subRawKeyedState = null;
-			}
-		} else {
-			subManagedKeyedState = getManagedKeyedStateHandles(operatorState, keyGroupPartitions.get(subTaskIndex));
-			subRawKeyedState = getRawKeyedStateHandles(operatorState, keyGroupPartitions.get(subTaskIndex));
-		}
-		if (subManagedKeyedState == null && subRawKeyedState == null) {
-			return null;
-		}
-		return new Tuple2<>(subManagedKeyedState, subRawKeyedState);
-	}
-
-
-	private <X> boolean allElementsAreNull(List<X> nonPartitionableStates) {
-		for (Object streamStateHandle : nonPartitionableStates) {
-			if (streamStateHandle != null) {
-				return false;
-			}
-		}
-		return true;
-	}
-
-
-	private void reAssignSubNonPartitionedStates(
-			TaskState operatorState,
-			int subTaskIndex,
-			int newParallelism,
-			int oldParallelism,
-			List<StreamStateHandle> subNonPartitionableState) {
-		if (oldParallelism == newParallelism) {
-			if (operatorState.getState(subTaskIndex) != null &&
-				!operatorState.getState(subTaskIndex).getLegacyOperatorState().isEmpty()) {
-				subNonPartitionableState.add(operatorState.getState(subTaskIndex).getLegacyOperatorState().get(0));
-			} else {
-				subNonPartitionableState.add(null);
-			}
-		} else {
-			subNonPartitionableState.add(null);
-		}
-	}
-
-	private void reDistributePartitionableStates(
-			List<TaskState> operatorStates, int newParallelism,
-			List<List<Collection<OperatorStateHandle>>> newManagedOperatorStates,
-			List<List<Collection<OperatorStateHandle>>> newRawOperatorStates) {
-
-		//collect the old partitionalbe state
-		List<List<OperatorStateHandle>> oldManagedOperatorStates = new ArrayList<>();
-		List<List<OperatorStateHandle>> oldRawOperatorStates = new ArrayList<>();
-
-		collectPartionableStates(operatorStates, oldManagedOperatorStates, oldRawOperatorStates);
-
-
-		//redistribute
-		OperatorStateRepartitioner opStateRepartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
-
-		for (int operatorIndex = 0; operatorIndex < operatorStates.size(); operatorIndex++) {
-			int oldParallelism = operatorStates.get(operatorIndex).getParallelism();
-			//newManagedOperatorStates.add(StateAssignmentOperationUtils.applyRepartitioner(opStateRepartitioner,
-			//	oldManagedOperatorStates.get(operatorIndex), oldParallelism, newParallelism));
-			//newRawOperatorStates.add(StateAssignmentOperationUtils.applyRepartitioner(opStateRepartitioner,
-			//	oldRawOperatorStates.get(operatorIndex), oldParallelism, newParallelism));
-
-		}
-	}
-
-
-	private void collectPartionableStates(
-			List<TaskState> operatorStates,
-			List<List<OperatorStateHandle>> managedOperatorStates,
-			List<List<OperatorStateHandle>> rawOperatorStates) {
-
-		for (TaskState operatorState : operatorStates) {
-			List<OperatorStateHandle> managedOperatorState = null;
-			List<OperatorStateHandle> rawOperatorState = null;
-
-			for (int i = 0; i < operatorState.getParallelism(); i++) {
-				SubtaskState subtaskState = operatorState.getState(i);
-				if (subtaskState != null) {
-					if (subtaskState.getManagedOperatorState() != null &&
-						subtaskState.getManagedOperatorState().getLength() > 0 &&
-						subtaskState.getManagedOperatorState().get(0) != null) {
-						if (managedOperatorState == null) {
-							managedOperatorState = new ArrayList<>();
-						}
-						managedOperatorState.add(subtaskState.getManagedOperatorState().get(0));
-					}
-
-					if (subtaskState.getRawOperatorState() != null &&
-						subtaskState.getRawOperatorState().getLength() > 0 &&
-						subtaskState.getRawOperatorState().get(0) != null) {
-						if (rawOperatorState == null) {
-							rawOperatorState = new ArrayList<>();
-						}
-						rawOperatorState.add(subtaskState.getRawOperatorState().get(0));
-					}
-				}
-
-			}
-			managedOperatorStates.add(managedOperatorState);
-			rawOperatorStates.add(rawOperatorState);
-		}
-	}
-
-
-	/**
-	 * Collect {@link KeyGroupsStateHandle  managedKeyedStateHandles} which have intersection with given
-	 * {@link KeyGroupRange} from {@link TaskState operatorState}
-	 *
-	 * @param operatorState        all state handles of a operator
-	 * @param subtaskKeyGroupRange the KeyGroupRange of a subtask
-	 * @return all managedKeyedStateHandles which have intersection with given KeyGroupRange
-	 */
-	public static List<KeyedStateHandle> getManagedKeyedStateHandles(
-			TaskState operatorState,
-			KeyGroupRange subtaskKeyGroupRange) {
-
-		List<KeyedStateHandle> subtaskKeyedStateHandles = null;
-
-		for (int i = 0; i < operatorState.getParallelism(); i++) {
-			if (operatorState.getState(i) != null && operatorState.getState(i).getManagedKeyedState() != null) {
-				KeyedStateHandle intersectedKeyedStateHandle = operatorState.getState(i).getManagedKeyedState().getIntersection(subtaskKeyGroupRange);
-
-				if (intersectedKeyedStateHandle != null) {
-					if (subtaskKeyedStateHandles == null) {
-						subtaskKeyedStateHandles = new ArrayList<>();
-					}
-					subtaskKeyedStateHandles.add(intersectedKeyedStateHandle);
-				}
-			}
-		}
-
-		return subtaskKeyedStateHandles;
-	}
-
-	/**
-	 * Collect {@link KeyGroupsStateHandle  rawKeyedStateHandles} which have intersection with given
-	 * {@link KeyGroupRange} from {@link TaskState operatorState}
-	 *
-	 * @param operatorState        all state handles of a operator
-	 * @param subtaskKeyGroupRange the KeyGroupRange of a subtask
-	 * @return all rawKeyedStateHandles which have intersection with given KeyGroupRange
-	 */
-	public static List<KeyedStateHandle> getRawKeyedStateHandles(
-			TaskState operatorState,
-			KeyGroupRange subtaskKeyGroupRange) {
-
-		List<KeyedStateHandle> subtaskKeyedStateHandles = null;
-
-		for (int i = 0; i < operatorState.getParallelism(); i++) {
-			if (operatorState.getState(i) != null && operatorState.getState(i).getRawKeyedState() != null) {
-				KeyedStateHandle intersectedKeyedStateHandle = operatorState.getState(i).getRawKeyedState().getIntersection(subtaskKeyGroupRange);
-
-				if (intersectedKeyedStateHandle != null) {
-					if (subtaskKeyedStateHandles == null) {
-						subtaskKeyedStateHandles = new ArrayList<>();
-					}
-					subtaskKeyedStateHandles.add(intersectedKeyedStateHandle);
-				}
-			}
-		}
-
-		return subtaskKeyedStateHandles;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index 4f5f536..aa5c516 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -34,7 +34,11 @@ import java.util.Objects;
  * tasks of a {@link org.apache.flink.runtime.jobgraph.JobVertex}.
  *
  * This class basically groups all non-partitioned state and key-group state belonging to the same job vertex together.
+ *
+ * @deprecated Internal class for savepoint backwards compatibility. Don't use for other purposes.
  */
+@Deprecated
+@SuppressWarnings("deprecation")
 public class TaskState implements CompositeStateHandle {
 
 	private static final long serialVersionUID = -4845578005863201810L;

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
index 79ec596..a7cf4b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint.savepoint;
 import org.apache.flink.core.io.Versioned;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.TaskState;
 
 import java.util.Collection;
@@ -54,8 +55,10 @@ public interface Savepoint extends Versioned {
 	 *
 	 * <p>These are used to restore the snapshot state.
 	 *
+	 * @deprecated Only kept for backwards-compatibility with versionS < 1.3. Will be removed in the future.
 	 * @return Snapshotted task states
 	 */
+	@Deprecated
 	Collection<TaskState> getTaskStates();
 
 	/**
@@ -64,6 +67,15 @@ public interface Savepoint extends Versioned {
 	Collection<MasterState> getMasterStates();
 
 	/**
+	 * Returns the snapshotted operator states.
+	 *
+	 * <p>These are used to restore the snapshot state.
+	 *
+	 * @return Snapshotted operator states
+	 */
+	Collection<OperatorState> getOperatorStates();
+
+	/**
 	 * Disposes the savepoint.
 	 */
 	void dispose() throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index 8ee38da..38db7c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -22,9 +22,10 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
-import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 
 import org.slf4j.Logger;
@@ -67,32 +68,42 @@ public class SavepointLoader {
 		final Tuple2<Savepoint, StreamStateHandle> savepointAndHandle = 
 				SavepointStore.loadSavepointWithHandle(savepointPath, classLoader);
 
-		final Savepoint savepoint = savepointAndHandle.f0;
+		Savepoint savepoint = savepointAndHandle.f0;
 		final StreamStateHandle metadataHandle = savepointAndHandle.f1;
 
-		final Map<JobVertexID, TaskState> taskStates = new HashMap<>(savepoint.getTaskStates().size());
+		if (savepoint.getTaskStates() != null) {
+			savepoint = SavepointV2.convertToOperatorStateSavepointV2(tasks, savepoint);
+		}
+		// generate mapping from operator to task
+		Map<OperatorID, ExecutionJobVertex> operatorToJobVertexMapping = new HashMap<>();
+		for (ExecutionJobVertex task : tasks.values()) {
+			for (OperatorID operatorID : task.getOperatorIDs()) {
+				operatorToJobVertexMapping.put(operatorID, task);
+			}
+		}
 
+		// (2) validate it (parallelism, etc)
 		boolean expandedToLegacyIds = false;
 
-		// (2) validate it (parallelism, etc)
-		for (TaskState taskState : savepoint.getTaskStates()) {
+		HashMap<OperatorID, OperatorState> operatorStates = new HashMap<>(savepoint.getOperatorStates().size());
+		for (OperatorState operatorState : savepoint.getOperatorStates()) {
 
-			ExecutionJobVertex executionJobVertex = tasks.get(taskState.getJobVertexID());
+			ExecutionJobVertex executionJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID());
 
 			// on the first time we can not find the execution job vertex for an id, we also consider alternative ids,
 			// for example as generated from older flink versions, to provide backwards compatibility.
 			if (executionJobVertex == null && !expandedToLegacyIds) {
-				tasks = ExecutionJobVertex.includeLegacyJobVertexIDs(tasks);
-				executionJobVertex = tasks.get(taskState.getJobVertexID());
+				operatorToJobVertexMapping = ExecutionJobVertex.includeAlternativeOperatorIDs(operatorToJobVertexMapping);
+				executionJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID());
 				expandedToLegacyIds = true;
-				LOG.info("Could not find ExecutionJobVertex. Including legacy JobVertexIDs in search.");
+				LOG.info("Could not find ExecutionJobVertex. Including user-defined OperatorIDs in search.");
 			}
 
 			if (executionJobVertex != null) {
 
-				if (executionJobVertex.getMaxParallelism() == taskState.getMaxParallelism()
+				if (executionJobVertex.getMaxParallelism() == operatorState.getMaxParallelism()
 						|| !executionJobVertex.isMaxParallelismConfigured()) {
-					taskStates.put(taskState.getJobVertexID(), taskState);
+					operatorStates.put(operatorState.getOperatorID(), operatorState);
 				} else {
 					String msg = String.format("Failed to rollback to savepoint %s. " +
 									"Max parallelism mismatch between savepoint state and new program. " +
@@ -100,21 +111,21 @@ public class SavepointLoader {
 									"max parallelism %d. This indicates that the program has been changed " +
 									"in a non-compatible way after the savepoint.",
 							savepoint,
-							taskState.getJobVertexID(),
-							taskState.getMaxParallelism(),
+							operatorState.getOperatorID(),
+							operatorState.getMaxParallelism(),
 							executionJobVertex.getMaxParallelism());
 
 					throw new IllegalStateException(msg);
 				}
 			} else if (allowNonRestoredState) {
-				LOG.info("Skipping savepoint state for operator {}.", taskState.getJobVertexID());
+				LOG.info("Skipping savepoint state for operator {}.", operatorState.getOperatorID());
 			} else {
 				String msg = String.format("Failed to rollback to savepoint %s. " +
 								"Cannot map savepoint state for operator %s to the new program, " +
 								"because the operator is not available in the new program. If " +
 								"you want to allow to skip this, you can set the --allowNonRestoredState " +
 								"option on the CLI.",
-						savepointPath, taskState.getJobVertexID());
+						savepointPath, operatorState.getOperatorID());
 
 				throw new IllegalStateException(msg);
 			}
@@ -122,8 +133,17 @@ public class SavepointLoader {
 
 		// (3) convert to checkpoint so the system can fall back to it
 		CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
-		return new CompletedCheckpoint(jobId, savepoint.getCheckpointId(), 0L, 0L,
-				taskStates, savepoint.getMasterStates(), props, metadataHandle, savepointPath);
+
+		return new CompletedCheckpoint(
+			jobId,
+			savepoint.getCheckpointId(),
+			0L,
+			0L,
+			operatorStates,
+			savepoint.getMasterStates(),
+			props,
+			metadataHandle,
+			savepointPath);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
index 196c870..daf5b7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint.savepoint;
 
 import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.util.Preconditions;
 
@@ -68,6 +69,11 @@ public class SavepointV1 implements Savepoint {
 	}
 
 	@Override
+	public Collection<OperatorState> getOperatorStates() {
+		return null;
+	}
+
+	@Override
 	public void dispose() throws Exception {
 		// since checkpoints are never deserialized into this format,
 		// this method should never be called

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index ae9f4a9..aaa8cdd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.checkpoint.savepoint;
 
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -38,7 +37,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -97,7 +95,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
 			}
 		}
 
-		return new SavepointV2(checkpointId, taskStates, Collections.<MasterState>emptyList());
+		return new SavepointV2(checkpointId, taskStates);
 	}
 
 	public void serializeOld(SavepointV1 savepoint, DataOutputStream dos) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
index 100982d..6a3b57f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
@@ -19,10 +19,23 @@
 package org.apache.flink.runtime.checkpoint.savepoint;
 
 import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -38,20 +51,48 @@ public class SavepointV2 implements Savepoint {
 	/** The checkpoint ID */
 	private final long checkpointId;
 
-	/** The task states */
+	/**
+	 * The task states 
+	 * @deprecated Only kept for backwards-compatibility with versions < 1.3. Will be removed in the future. 
+	 */
+	@Deprecated
 	private final Collection<TaskState> taskStates;
 
+	/** The operator states */
+	private final Collection<OperatorState> operatorStates;
+
 	/** The states generated by the CheckpointCoordinator */
 	private final Collection<MasterState> masterStates;
 
-
+	/** @deprecated Only kept for backwards-compatibility with versions < 1.3. Will be removed in the future. */
+	@Deprecated
 	public SavepointV2(long checkpointId, Collection<TaskState> taskStates) {
-		this(checkpointId, taskStates, Collections.<MasterState>emptyList());
+		this(
+			checkpointId, 
+			null,
+			checkNotNull(taskStates, "taskStates"),
+			Collections.<MasterState>emptyList()
+		);
+	}
+
+	public SavepointV2(long checkpointId, Collection<OperatorState> operatorStates, Collection<MasterState> masterStates) {
+		this(
+			checkpointId,
+			checkNotNull(operatorStates, "operatorStates"),
+			null,
+			masterStates
+		);
 	}
 
-	public SavepointV2(long checkpointId, Collection<TaskState> taskStates, Collection<MasterState> masterStates) {
+	private SavepointV2(
+			long checkpointId,
+			Collection<OperatorState> operatorStates,
+			Collection<TaskState> taskStates,
+			Collection<MasterState> masterStates) {
+
 		this.checkpointId = checkpointId;
-		this.taskStates = checkNotNull(taskStates, "taskStates");
+		this.operatorStates = operatorStates;
+		this.taskStates = taskStates;
 		this.masterStates = checkNotNull(masterStates, "masterStates");
 	}
 
@@ -66,6 +107,11 @@ public class SavepointV2 implements Savepoint {
 	}
 
 	@Override
+	public Collection<OperatorState> getOperatorStates() {
+		return operatorStates;
+	}
+
+	@Override
 	public Collection<TaskState> getTaskStates() {
 		return taskStates;
 	}
@@ -77,10 +123,10 @@ public class SavepointV2 implements Savepoint {
 
 	@Override
 	public void dispose() throws Exception {
-		for (TaskState taskState : taskStates) {
-			taskState.discardState();
+		for (OperatorState operatorState : operatorStates) {
+			operatorState.discardState();
 		}
-		taskStates.clear();
+		operatorStates.clear();
 		masterStates.clear();
 	}
 
@@ -88,4 +134,97 @@ public class SavepointV2 implements Savepoint {
 	public String toString() {
 		return "Checkpoint Metadata (version=" + VERSION + ')';
 	}
+
+	/**
+	 * Converts the {@link Savepoint} containing {@link TaskState TaskStates} to an equivalent savepoint containing
+	 * {@link OperatorState OperatorStates}.
+	 *
+	 * @param savepoint savepoint to convert
+	 * @param tasks     map of all vertices and their job vertex ids
+	 * @return converted completed checkpoint
+	 * @deprecated Only kept for backwards-compatibility with versions < 1.3. Will be removed in the future.
+	 * */
+	@Deprecated
+	public static Savepoint convertToOperatorStateSavepointV2(
+			Map<JobVertexID, ExecutionJobVertex> tasks,
+			Savepoint savepoint) {
+
+		if (savepoint.getOperatorStates() != null) {
+			return savepoint;
+		}
+
+		boolean expandedToLegacyIds = false;
+
+		Map<OperatorID, OperatorState> operatorStates = new HashMap<>(savepoint.getTaskStates().size() << 1);
+
+		for (TaskState taskState : savepoint.getTaskStates()) {
+			ExecutionJobVertex jobVertex = tasks.get(taskState.getJobVertexID());
+
+			// on the first time we can not find the execution job vertex for an id, we also consider alternative ids,
+			// for example as generated from older flink versions, to provide backwards compatibility.
+			if (jobVertex == null && !expandedToLegacyIds) {
+				tasks = ExecutionJobVertex.includeLegacyJobVertexIDs(tasks);
+				jobVertex = tasks.get(taskState.getJobVertexID());
+				expandedToLegacyIds = true;
+			}
+
+			List<OperatorID> operatorIDs = jobVertex.getOperatorIDs();
+
+			for (int subtaskIndex = 0; subtaskIndex < jobVertex.getParallelism(); subtaskIndex++) {
+				SubtaskState subtaskState = taskState.getState(subtaskIndex);
+
+				if (subtaskState == null) {
+					continue;
+				}
+
+				@SuppressWarnings("deprecation")
+				ChainedStateHandle<StreamStateHandle> nonPartitionedState =
+					subtaskState.getLegacyOperatorState();
+				ChainedStateHandle<OperatorStateHandle> partitioneableState =
+					subtaskState.getManagedOperatorState();
+				ChainedStateHandle<OperatorStateHandle> rawOperatorState =
+					subtaskState.getRawOperatorState();
+
+				for (int chainIndex = 0; chainIndex < taskState.getChainLength(); chainIndex++) {
+
+					// task consists of multiple operators so we have to break the state apart
+					for (int o = 0; o < operatorIDs.size(); o++) {
+						OperatorID operatorID = operatorIDs.get(o);
+						OperatorState operatorState = operatorStates.get(operatorID);
+
+						if (operatorState == null) {
+							operatorState = new OperatorState(
+								operatorID,
+								jobVertex.getParallelism(),
+								jobVertex.getMaxParallelism());
+							operatorStates.put(operatorID, operatorState);
+						}
+
+						KeyedStateHandle managedKeyedState = null;
+						KeyedStateHandle rawKeyedState = null;
+
+						// only the head operator retains the keyed state
+						if (o == operatorIDs.size() - 1) {
+							managedKeyedState = subtaskState.getManagedKeyedState();
+							rawKeyedState = subtaskState.getRawKeyedState();
+						}
+
+						OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(
+							nonPartitionedState != null ? nonPartitionedState.get(o) : null,
+							partitioneableState != null ? partitioneableState.get(o) : null,
+							rawOperatorState != null ? rawOperatorState.get(o) : null,
+							managedKeyedState,
+							rawKeyedState);
+
+						operatorState.putState(subtaskIndex, operatorSubtaskState);
+					}
+				}
+			}
+		}
+
+		return new SavepointV2(
+			savepoint.getCheckpointId(),
+			operatorStates.values(),
+			savepoint.getMasterStates());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
index 307ea16..1b5f2c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -20,10 +20,9 @@ package org.apache.flink.runtime.checkpoint.savepoint;
 
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
@@ -97,25 +96,25 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
 			serializeMasterState(ms, dos);
 		}
 
-		// third: task states
-		final Collection<TaskState> taskStates = checkpointMetadata.getTaskStates();
-		dos.writeInt(taskStates.size());
+		// third: operator states
+		Collection<OperatorState> operatorStates = checkpointMetadata.getOperatorStates();
+		dos.writeInt(operatorStates.size());
 
-		for (TaskState taskState : checkpointMetadata.getTaskStates()) {
-			// Vertex ID
-			dos.writeLong(taskState.getJobVertexID().getLowerPart());
-			dos.writeLong(taskState.getJobVertexID().getUpperPart());
+		for (OperatorState operatorState : operatorStates) {
+			// Operator ID
+			dos.writeLong(operatorState.getOperatorID().getLowerPart());
+			dos.writeLong(operatorState.getOperatorID().getUpperPart());
 
 			// Parallelism
-			int parallelism = taskState.getParallelism();
+			int parallelism = operatorState.getParallelism();
 			dos.writeInt(parallelism);
-			dos.writeInt(taskState.getMaxParallelism());
-			dos.writeInt(taskState.getChainLength());
+			dos.writeInt(operatorState.getMaxParallelism());
+			dos.writeInt(1);
 
 			// Sub task states
-			Map<Integer, SubtaskState> subtaskStateMap = taskState.getSubtaskStates();
+			Map<Integer, OperatorSubtaskState> subtaskStateMap = operatorState.getSubtaskStates();
 			dos.writeInt(subtaskStateMap.size());
-			for (Map.Entry<Integer, SubtaskState> entry : subtaskStateMap.entrySet()) {
+			for (Map.Entry<Integer, OperatorSubtaskState> entry : subtaskStateMap.entrySet()) {
 				dos.writeInt(entry.getKey());
 				serializeSubtaskState(entry.getValue(), dos);
 			}
@@ -147,31 +146,32 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
 			throw new IOException("invalid number of master states: " + numMasterStates);
 		}
 
-		// third: task states
-		final int numTaskStates = dis.readInt();
-		final ArrayList<TaskState> taskStates = new ArrayList<>(numTaskStates);
+		// third: operator states
+		int numTaskStates = dis.readInt();
+		List<OperatorState> operatorStates = new ArrayList<>(numTaskStates);
 
 		for (int i = 0; i < numTaskStates; i++) {
-			JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong());
+			OperatorID jobVertexId = new OperatorID(dis.readLong(), dis.readLong());
 			int parallelism = dis.readInt();
 			int maxParallelism = dis.readInt();
 			int chainLength = dis.readInt();
 
 			// Add task state
-			TaskState taskState = new TaskState(jobVertexId, parallelism, maxParallelism, chainLength);
-			taskStates.add(taskState);
+			OperatorState taskState = new OperatorState(jobVertexId, parallelism, maxParallelism);
+			operatorStates.add(taskState);
 
 			// Sub task states
 			int numSubTaskStates = dis.readInt();
 
 			for (int j = 0; j < numSubTaskStates; j++) {
 				int subtaskIndex = dis.readInt();
-				SubtaskState subtaskState = deserializeSubtaskState(dis);
+
+				OperatorSubtaskState subtaskState = deserializeSubtaskState(dis);
 				taskState.putState(subtaskIndex, subtaskState);
 			}
 		}
 
-		return new SavepointV2(checkpointId, taskStates, masterStates);
+		return new SavepointV2(checkpointId, operatorStates, masterStates);
 	}
 
 	// ------------------------------------------------------------------------
@@ -235,35 +235,32 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
 	//  task state (de)serialization methods
 	// ------------------------------------------------------------------------
 
-	private static void serializeSubtaskState(SubtaskState subtaskState, DataOutputStream dos) throws IOException {
+	private static void serializeSubtaskState(OperatorSubtaskState subtaskState, DataOutputStream dos) throws IOException {
 
 		dos.writeLong(-1);
 
-		ChainedStateHandle<StreamStateHandle> nonPartitionableState = subtaskState.getLegacyOperatorState();
+		StreamStateHandle nonPartitionableState = subtaskState.getLegacyOperatorState();
 
-		int len = nonPartitionableState != null ? nonPartitionableState.getLength() : 0;
+		int len = nonPartitionableState != null ? 1 : 0;
 		dos.writeInt(len);
-		for (int i = 0; i < len; ++i) {
-			StreamStateHandle stateHandle = nonPartitionableState.get(i);
-			serializeStreamStateHandle(stateHandle, dos);
+		if (len == 1) {
+			serializeStreamStateHandle(nonPartitionableState, dos);
 		}
 
-		ChainedStateHandle<OperatorStateHandle> operatorStateBackend = subtaskState.getManagedOperatorState();
+		OperatorStateHandle operatorStateBackend = subtaskState.getManagedOperatorState();
 
-		len = operatorStateBackend != null ? operatorStateBackend.getLength() : 0;
+		len = operatorStateBackend != null ? 1 : 0;
 		dos.writeInt(len);
-		for (int i = 0; i < len; ++i) {
-			OperatorStateHandle stateHandle = operatorStateBackend.get(i);
-			serializeOperatorStateHandle(stateHandle, dos);
+		if (len == 1) {
+			serializeOperatorStateHandle(operatorStateBackend, dos);
 		}
 
-		ChainedStateHandle<OperatorStateHandle> operatorStateFromStream = subtaskState.getRawOperatorState();
+		OperatorStateHandle operatorStateFromStream = subtaskState.getRawOperatorState();
 
-		len = operatorStateFromStream != null ? operatorStateFromStream.getLength() : 0;
+		len = operatorStateFromStream != null ? 1 : 0;
 		dos.writeInt(len);
-		for (int i = 0; i < len; ++i) {
-			OperatorStateHandle stateHandle = operatorStateFromStream.get(i);
-			serializeOperatorStateHandle(stateHandle, dos);
+		if (len == 1) {
+			serializeOperatorStateHandle(operatorStateFromStream, dos);
 		}
 
 		KeyedStateHandle keyedStateBackend = subtaskState.getManagedKeyedState();
@@ -273,49 +270,28 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
 		serializeKeyedStateHandle(keyedStateStream, dos);
 	}
 
-	private static SubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException {
+	private static OperatorSubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException {
 		// Duration field has been removed from SubtaskState
 		long ignoredDuration = dis.readLong();
 
 		int len = dis.readInt();
-		List<StreamStateHandle> nonPartitionableState = new ArrayList<>(len);
-		for (int i = 0; i < len; ++i) {
-			StreamStateHandle streamStateHandle = deserializeStreamStateHandle(dis);
-			nonPartitionableState.add(streamStateHandle);
-		}
-
+		StreamStateHandle nonPartitionableState = len == 0 ? null : deserializeStreamStateHandle(dis);
 
 		len = dis.readInt();
-		List<OperatorStateHandle> operatorStateBackend = new ArrayList<>(len);
-		for (int i = 0; i < len; ++i) {
-			OperatorStateHandle streamStateHandle = deserializeOperatorStateHandle(dis);
-			operatorStateBackend.add(streamStateHandle);
-		}
+		OperatorStateHandle operatorStateBackend = len == 0 ? null : deserializeOperatorStateHandle(dis);
 
 		len = dis.readInt();
-		List<OperatorStateHandle> operatorStateStream = new ArrayList<>(len);
-		for (int i = 0; i < len; ++i) {
-			OperatorStateHandle streamStateHandle = deserializeOperatorStateHandle(dis);
-			operatorStateStream.add(streamStateHandle);
-		}
+		OperatorStateHandle operatorStateStream = len == 0 ? null : deserializeOperatorStateHandle(dis);
 
 		KeyedStateHandle keyedStateBackend = deserializeKeyedStateHandle(dis);
 
 		KeyedStateHandle keyedStateStream = deserializeKeyedStateHandle(dis);
 
-		ChainedStateHandle<StreamStateHandle> nonPartitionableStateChain =
-				new ChainedStateHandle<>(nonPartitionableState);
-
-		ChainedStateHandle<OperatorStateHandle> operatorStateBackendChain =
-				new ChainedStateHandle<>(operatorStateBackend);
-
-		ChainedStateHandle<OperatorStateHandle> operatorStateStreamChain =
-				new ChainedStateHandle<>(operatorStateStream);
 
-		return new SubtaskState(
-				nonPartitionableStateChain,
-				operatorStateBackendChain,
-				operatorStateStreamChain,
+		return new OperatorSubtaskState(
+				nonPartitionableState,
+				operatorStateBackend,
+				operatorStateStream,
 				keyedStateBackend,
 				keyedStateStream);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 5fbce4d..2e5de64 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -68,6 +69,24 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	private final ExecutionGraph graph;
 	
 	private final JobVertex jobVertex;
+
+	/**
+	 * The IDs of all operators contained in this execution job vertex.
+	 *
+	 * The ID's are stored depth-first post-order; for the forking chain below the ID's would be stored as [D, E, B, C, A].
+	 *  A - B - D
+	 *   \    \
+	 *    C    E
+	 * This is the same order that operators are stored in the {@code StreamTask}.
+	 */
+	private final List<OperatorID> operatorIDs;
+
+	/**
+	 * The alternative IDs of all operators contained in this execution job vertex.
+	 *
+	 * The ID's are in the same order as {@link ExecutionJobVertex#operatorIDs}.
+	 */
+	private final List<OperatorID> userDefinedOperatorIds;
 	
 	private final ExecutionVertex[] taskVertices;
 
@@ -139,6 +158,8 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		this.serializedTaskInformation = null;
 
 		this.taskVertices = new ExecutionVertex[numTaskVertices];
+		this.operatorIDs = Collections.unmodifiableList(jobVertex.getOperatorIDs());
+		this.userDefinedOperatorIds = Collections.unmodifiableList(jobVertex.getUserDefinedOperatorIDs());
 		
 		this.inputs = new ArrayList<>(jobVertex.getInputs().size());
 		
@@ -214,6 +235,24 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		finishedSubtasks = new boolean[parallelism];
 	}
 
+	/**
+	 * Returns a list containing the IDs of all operators contained in this execution job vertex.
+	 *
+	 * @return list containing the IDs of all contained operators
+	 */
+	public List<OperatorID> getOperatorIDs() {
+		return operatorIDs;
+	}
+
+	/**
+	 * Returns a list containing the alternative IDs of all operators contained in this execution job vertex.
+	 *
+	 * @return list containing alternative the IDs of all contained operators
+	 */
+	public List<OperatorID> getUserDefinedOperatorIDs() {
+		return userDefinedOperatorIds;
+	}
+
 	public void setMaxParallelism(int maxParallelismDerived) {
 
 		Preconditions.checkState(!maxParallelismConfigured,
@@ -731,6 +770,30 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		return expanded;
 	}
 
+	public static Map<OperatorID, ExecutionJobVertex> includeAlternativeOperatorIDs(
+			Map<OperatorID, ExecutionJobVertex> operatorMapping) {
+
+		Map<OperatorID, ExecutionJobVertex> expanded = new HashMap<>(2 * operatorMapping.size());
+		// first include all existing ids
+		expanded.putAll(operatorMapping);
+
+		// now expand and add user-defined ids
+		for (ExecutionJobVertex executionJobVertex : operatorMapping.values()) {
+			if (executionJobVertex != null) {
+				JobVertex jobVertex = executionJobVertex.getJobVertex();
+				if (jobVertex != null) {
+					for (OperatorID operatorID : jobVertex.getUserDefinedOperatorIDs()) {
+						if (operatorID != null) {
+							expanded.put(operatorID, executionJobVertex);
+						}
+					}
+				}
+			}
+		}
+
+		return expanded;
+	}
+
 	@Override
 	public ArchivedExecutionJobVertex archive() {
 		return new ArchivedExecutionJobVertex(this);

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
index c4fc907..5627ac7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
@@ -39,8 +39,8 @@ public class InputFormatVertex extends JobVertex {
 		super(name, id);
 	}
 
-	public InputFormatVertex(String name, JobVertexID id, List<JobVertexID> alternativeIds) {
-		super(name, id, alternativeIds);
+	public InputFormatVertex(String name, JobVertexID id, List<JobVertexID> alternativeIds, List<OperatorID> operatorIds, List<OperatorID> alternativeOperatorIds) {
+		super(name, id, alternativeIds, operatorIds, alternativeOperatorIds);
 	}
 	
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index 1180db4..4f52895 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -50,8 +50,15 @@ public class JobVertex implements java.io.Serializable {
 	/** The ID of the vertex. */
 	private final JobVertexID id;
 
+	/** The alternative IDs of the vertex. */
 	private final ArrayList<JobVertexID> idAlternatives = new ArrayList<>();
 
+	/** The IDs of all operators contained in this vertex. */
+	private final ArrayList<OperatorID> operatorIDs = new ArrayList<>();
+
+	/** The alternative IDs of all operators contained in this vertex. */
+	private final ArrayList<OperatorID> operatorIdsAlternatives = new ArrayList<>();
+
 	/** List of produced data sets, one per writer */
 	private final ArrayList<IntermediateDataSet> results = new ArrayList<IntermediateDataSet>();
 
@@ -125,6 +132,9 @@ public class JobVertex implements java.io.Serializable {
 	public JobVertex(String name, JobVertexID id) {
 		this.name = name == null ? DEFAULT_NAME : name;
 		this.id = id == null ? new JobVertexID() : id;
+		// the id lists must have the same size
+		this.operatorIDs.add(OperatorID.fromJobVertexID(this.id));
+		this.operatorIdsAlternatives.add(null);
 	}
 
 	/**
@@ -133,11 +143,16 @@ public class JobVertex implements java.io.Serializable {
 	 * @param name The name of the new job vertex.
 	 * @param primaryId The id of the job vertex.
 	 * @param alternativeIds The alternative ids of the job vertex.
+	 * @param operatorIds The ids of all operators contained in this job vertex.
+	 * @param alternativeOperatorIds The alternative ids of all operators contained in this job vertex-
 	 */
-	public JobVertex(String name, JobVertexID primaryId, List<JobVertexID> alternativeIds) {
+	public JobVertex(String name, JobVertexID primaryId, List<JobVertexID> alternativeIds, List<OperatorID> operatorIds, List<OperatorID> alternativeOperatorIds) {
+		Preconditions.checkArgument(operatorIds.size() == alternativeOperatorIds.size());
 		this.name = name == null ? DEFAULT_NAME : name;
 		this.id = primaryId == null ? new JobVertexID() : primaryId;
 		this.idAlternatives.addAll(alternativeIds);
+		this.operatorIDs.addAll(operatorIds);
+		this.operatorIdsAlternatives.addAll(alternativeOperatorIds);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -196,6 +211,14 @@ public class JobVertex implements java.io.Serializable {
 		return this.inputs.size();
 	}
 
+	public List<OperatorID> getOperatorIDs() {
+		return operatorIDs;
+	}
+
+	public List<OperatorID> getUserDefinedOperatorIDs() {
+		return operatorIdsAlternatives;
+	}
+
 	/**
 	 * Returns the vertex's configuration object which can be used to pass custom settings to the task at runtime.
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OperatorID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OperatorID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OperatorID.java
new file mode 100644
index 0000000..0e378de
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OperatorID.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import org.apache.flink.util.AbstractID;
+
+/**
+ * A class for statistically unique operator IDs.
+ */
+public class OperatorID extends AbstractID {
+
+	private static final long serialVersionUID = 1L;
+
+	public OperatorID() {
+		super();
+	}
+
+	public OperatorID(byte[] bytes) {
+		super(bytes);
+	}
+
+	public OperatorID(long lowerPart, long upperPart) {
+		super(lowerPart, upperPart);
+	}
+
+	public static OperatorID fromJobVertexID(JobVertexID id) {
+		return new OperatorID(id.getLowerPart(), id.getUpperPart());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
index 9f94f2f..d293eea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
@@ -190,7 +191,9 @@ public class CheckpointCoordinatorExternalizedCheckpointsTest {
 				false);
 
 		for (ExecutionVertex vertex : vertices) {
-			assertEquals(checkpoint.getTaskState(vertex.getJobvertexId()), loaded.getTaskState(vertex.getJobvertexId()));
+			for (OperatorID operatorID : vertex.getJobVertex().getOperatorIDs()) {
+				assertEquals(checkpoint.getOperatorStates().get(operatorID), loaded.getOperatorStates().get(operatorID));
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 90b7fe7..6e20be3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -25,10 +25,13 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -38,8 +41,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(PendingCheckpoint.class)
@@ -85,8 +90,26 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 		final long checkpointId = coord.getPendingCheckpoints().keySet().iterator().next();
 		
 		SubtaskState subtaskState = mock(SubtaskState.class);
-		PowerMockito.when(subtaskState.getLegacyOperatorState()).thenReturn(null);
-		PowerMockito.when(subtaskState.getManagedOperatorState()).thenReturn(null);
+
+		StreamStateHandle legacyHandle = mock(StreamStateHandle.class);
+		ChainedStateHandle<StreamStateHandle> chainedLegacyHandle = mock(ChainedStateHandle.class);
+		when(chainedLegacyHandle.get(anyInt())).thenReturn(legacyHandle);
+		when(subtaskState.getLegacyOperatorState()).thenReturn(chainedLegacyHandle);
+
+		OperatorStateHandle managedHandle = mock(OperatorStateHandle.class);
+		ChainedStateHandle<OperatorStateHandle> chainedManagedHandle = mock(ChainedStateHandle.class);
+		when(chainedManagedHandle.get(anyInt())).thenReturn(managedHandle);
+		when(subtaskState.getManagedOperatorState()).thenReturn(chainedManagedHandle);
+
+		OperatorStateHandle rawHandle = mock(OperatorStateHandle.class);
+		ChainedStateHandle<OperatorStateHandle> chainedRawHandle = mock(ChainedStateHandle.class);
+		when(chainedRawHandle.get(anyInt())).thenReturn(rawHandle);
+		when(subtaskState.getRawOperatorState()).thenReturn(chainedRawHandle);
+
+		KeyedStateHandle managedKeyedHandle = mock(KeyedStateHandle.class);
+		when(subtaskState.getRawKeyedState()).thenReturn(managedKeyedHandle);
+		KeyedStateHandle managedRawHandle = mock(KeyedStateHandle.class);
+		when(subtaskState.getManagedKeyedState()).thenReturn(managedRawHandle);
 		
 		AcknowledgeCheckpoint acknowledgeMessage = new AcknowledgeCheckpoint(jid, executionAttemptId, checkpointId, new CheckpointMetrics(), subtaskState);
 		
@@ -102,7 +125,11 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 		assertTrue(pendingCheckpoint.isDiscarded());
 
 		// make sure that the subtask state has been discarded after we could not complete it.
-		verify(subtaskState).discardState();
+		verify(subtaskState.getLegacyOperatorState().get(0)).discardState();
+		verify(subtaskState.getManagedOperatorState().get(0)).discardState();
+		verify(subtaskState.getRawOperatorState().get(0)).discardState();
+		verify(subtaskState.getManagedKeyedState()).discardState();
+		verify(subtaskState.getRawKeyedState()).discardState();
 	}
 
 	private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore {

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 7c271a7..d6daa4e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 
@@ -228,7 +229,7 @@ public class CheckpointCoordinatorMasterHooksTest {
 
 		final CompletedCheckpoint checkpoint = new CompletedCheckpoint(
 				jid, checkpointId, 123L, 125L,
-				Collections.<JobVertexID, TaskState>emptyMap(),
+				Collections.<OperatorID, OperatorState>emptyMap(),
 				masterHookStates,
 				CheckpointProperties.forStandardCheckpoint(),
 				null,
@@ -282,7 +283,7 @@ public class CheckpointCoordinatorMasterHooksTest {
 
 		final CompletedCheckpoint checkpoint = new CompletedCheckpoint(
 				jid, checkpointId, 123L, 125L,
-				Collections.<JobVertexID, TaskState>emptyMap(),
+				Collections.<OperatorID, OperatorState>emptyMap(),
 				masterHookStates,
 				CheckpointProperties.forStandardCheckpoint(),
 				null,