You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/10 17:47:30 UTC

[2/2] flink git commit: [FLINK-5046] [tdd] Preserialize TaskDeploymentDescriptor information

[FLINK-5046] [tdd] Preserialize TaskDeploymentDescriptor information

In order to speed up the serialization of the TaskDeploymentDescriptor we can pre serialize
all information which stays the same for all TaskDeploymentDescriptors. The information which
is static for a TDD is the job related information contained in the ExecutionGraph and the
operator/task related information stored in the ExecutionJobVertex.

In order to pre serialize this information, this PR introduces the JobInformation class
and the TaskInformration class which are stored in serialized form in the ExecutionGraph
and the ExecutionJobVertex, respectively.

This closes #2779.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/58204da1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/58204da1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/58204da1

Branch: refs/heads/master
Commit: 58204da13d42c265d6a503a8cf738b6522e12ba6
Parents: 86a3dd5
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 9 19:11:36 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 10 18:47:04 2016 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/client/JobClient.java  |   6 +-
 .../deployment/TaskDeploymentDescriptor.java    | 279 ++++---------------
 .../flink/runtime/executiongraph/Execution.java |   4 +-
 .../runtime/executiongraph/ExecutionGraph.java  | 103 ++++---
 .../executiongraph/ExecutionGraphBuilder.java   |  33 ++-
 .../executiongraph/ExecutionJobVertex.java      |  37 ++-
 .../runtime/executiongraph/ExecutionVertex.java |  28 +-
 .../runtime/executiongraph/JobInformation.java  |  96 +++++++
 .../runtime/executiongraph/TaskInformation.java |  91 ++++++
 .../groups/TaskManagerJobMetricGroup.java       |  32 ++-
 .../metrics/groups/TaskManagerMetricGroup.java  |  31 ++-
 .../apache/flink/runtime/taskmanager/Task.java  | 114 +++++---
 .../runtime/messages/JobManagerMessages.scala   |   4 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  51 +++-
 ...ExecutionGraphCheckpointCoordinatorTest.java |   2 +
 .../TaskDeploymentDescriptorTest.java           |  50 ++--
 .../executiongraph/AllVerticesIteratorTest.java |   8 +-
 .../ArchivedExecutionGraphTest.java             |   4 +
 .../ExecutionGraphConstructionTest.java         |  59 +++-
 .../ExecutionGraphDeploymentTest.java           |  30 +-
 .../executiongraph/PointwisePatternTest.java    |  22 ++
 .../executiongraph/VertexSlotSharingTest.java   |   9 +-
 .../metrics/groups/TaskManagerGroupTest.java    |  91 +++++-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  42 ++-
 .../runtime/taskmanager/TaskManagerTest.java    | 120 +++++---
 .../flink/runtime/taskmanager/TaskStopTest.java |  35 ++-
 .../flink/runtime/taskmanager/TaskTest.java     |  82 +++---
 .../testingUtils/TestingTaskManagerLike.scala   |  18 +-
 .../tasks/InterruptSensitiveRestoreTest.java    | 104 +++----
 .../streaming/runtime/tasks/StreamTaskTest.java |  69 +++--
 .../test/checkpointing/SavepointITCase.java     |   9 +-
 31 files changed, 1025 insertions(+), 638 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 106947d..132d74c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -55,7 +55,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URL;
-import java.util.List;
+import java.util.Collection;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -211,8 +211,8 @@ public class JobClient {
 			InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort());
 			final BlobCache blobClient = new BlobCache(serverAddress, config);
 
-			final List<BlobKey> requiredJarFiles = props.requiredJarFiles();
-			final List<URL> requiredClasspaths = props.requiredClasspaths();
+			final Collection<BlobKey> requiredJarFiles = props.requiredJarFiles();
+			final Collection<URL> requiredClasspaths = props.requiredClasspaths();
 
 			final URL[] allURLs = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index bf31e51..cf6987d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -18,23 +18,15 @@
 
 package org.apache.flink.runtime.deployment;
 
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.TaskInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 
 import java.io.Serializable;
-import java.net.URL;
 import java.util.Collection;
-import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A task deployment descriptor contains all the information necessary to deploy a task on a task manager.
@@ -43,207 +35,92 @@ public final class TaskDeploymentDescriptor implements Serializable {
 
 	private static final long serialVersionUID = -3233562176034358530L;
 
-	/** The ID of the job the tasks belongs to. */
-	private final JobID jobID;
-	private final String jobName;
+	/** Serialized job information */
+	private final SerializedValue<JobInformation> serializedJobInformation;
 
-	/** The task's job vertex ID. */
-	private final JobVertexID vertexID;
+	/** Serialized task information */
+	private final SerializedValue<TaskInformation> serializedTaskInformation;
 
 	/** The ID referencing the attempt to execute the task. */
 	private final ExecutionAttemptID executionId;
 
-	/** The task's name. */
-	private final String taskName;
-
-	/** The number of key groups aka the max parallelism aka the max number of subtasks. */
-	private final int numberOfKeyGroups;
-
 	/** The task's index in the subtask group. */
-	private final int indexInSubtaskGroup;
-
-	/** The number of sub tasks. */
-	private final int numberOfSubtasks;
+	private final int subtaskIndex;
 
 	/** Attempt number the task */
 	private final int attemptNumber;
 
-	/** The configuration of the job the task belongs to. */
-	private final Configuration jobConfiguration;
-
-	/** The task's configuration object. */
-	private final Configuration taskConfiguration;
-
-	/** The name of the class containing the task code to be executed. */
-	private final String invokableClassName;
-
 	/** The list of produced intermediate result partition deployment descriptors. */
-	private final List<ResultPartitionDeploymentDescriptor> producedPartitions;
+	private final Collection<ResultPartitionDeploymentDescriptor> producedPartitions;
 
 	/** The list of consumed intermediate result partitions. */
-	private final List<InputGateDeploymentDescriptor> inputGates;
+	private final Collection<InputGateDeploymentDescriptor> inputGates;
 
+	/** Slot number to run the sub task in on the target machine */
 	private final int targetSlotNumber;
 
-	/** The list of JAR files required to run this task. */
-	private final List<BlobKey> requiredJarFiles;
-	
-	/** The list of classpaths required to run this task. */
-	private final List<URL> requiredClasspaths;
-
+	/** State handles for the sub task */
 	private final TaskStateHandles taskStateHandles;
 
-	/** The execution configuration (see {@link ExecutionConfig}) related to the specific job. */
-	private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
-
-	/**
-	 * Constructs a task deployment descriptor.
-	 */
 	public TaskDeploymentDescriptor(
-		JobID jobID,
-		String jobName,
-		JobVertexID vertexID,
-		ExecutionAttemptID executionId,
-		SerializedValue<ExecutionConfig> serializedExecutionConfig,
-		String taskName,
-		int numberOfKeyGroups,
-		int indexInSubtaskGroup,
-		int numberOfSubtasks,
-		int attemptNumber,
-		Configuration jobConfiguration,
-		Configuration taskConfiguration,
-		String invokableClassName,
-		List<ResultPartitionDeploymentDescriptor> producedPartitions,
-		List<InputGateDeploymentDescriptor> inputGates,
-		List<BlobKey> requiredJarFiles,
-		List<URL> requiredClasspaths,
-		int targetSlotNumber,
-		TaskStateHandles taskStateHandles) {
-
-		checkArgument(indexInSubtaskGroup >= 0);
-		checkArgument(numberOfSubtasks > indexInSubtaskGroup);
-		checkArgument(targetSlotNumber >= 0);
-		checkArgument(attemptNumber >= 0);
-
-		this.jobID = checkNotNull(jobID);
-		this.jobName = checkNotNull(jobName);
-		this.vertexID = checkNotNull(vertexID);
-		this.executionId = checkNotNull(executionId);
-		this.serializedExecutionConfig = checkNotNull(serializedExecutionConfig);
-		this.taskName = checkNotNull(taskName);
-		this.numberOfKeyGroups = numberOfKeyGroups;
-		this.indexInSubtaskGroup = indexInSubtaskGroup;
-		this.numberOfSubtasks = numberOfSubtasks;
+			SerializedValue<JobInformation> serializedJobInformation,
+			SerializedValue<TaskInformation> serializedTaskInformation,
+			ExecutionAttemptID executionAttemptId,
+			int subtaskIndex,
+			int attemptNumber,
+			int targetSlotNumber,
+			TaskStateHandles taskStateHandles,
+			Collection<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors,
+			Collection<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors) {
+
+		this.serializedJobInformation = Preconditions.checkNotNull(serializedJobInformation);
+		this.serializedTaskInformation = Preconditions.checkNotNull(serializedTaskInformation);
+		this.executionId = Preconditions.checkNotNull(executionAttemptId);
+
+		Preconditions.checkArgument(0 <= subtaskIndex, "The subtask index must be positive.");
+		this.subtaskIndex = subtaskIndex;
+
+		Preconditions.checkArgument(0 <= attemptNumber, "The attempt number must be positive.");
 		this.attemptNumber = attemptNumber;
-		this.jobConfiguration = checkNotNull(jobConfiguration);
-		this.taskConfiguration = checkNotNull(taskConfiguration);
-		this.invokableClassName = checkNotNull(invokableClassName);
-		this.producedPartitions = checkNotNull(producedPartitions);
-		this.inputGates = checkNotNull(inputGates);
-		this.requiredJarFiles = checkNotNull(requiredJarFiles);
-		this.requiredClasspaths = checkNotNull(requiredClasspaths);
+
+		Preconditions.checkArgument(0 <= targetSlotNumber, "The target slot number must be positive.");
 		this.targetSlotNumber = targetSlotNumber;
-		this.taskStateHandles = taskStateHandles;
-	}
 
-	public TaskDeploymentDescriptor(
-		JobID jobID,
-		String jobName,
-		JobVertexID vertexID,
-		ExecutionAttemptID executionId,
-		SerializedValue<ExecutionConfig> serializedExecutionConfig,
-		String taskName,
-		int numberOfKeyGroups,
-		int indexInSubtaskGroup,
-		int numberOfSubtasks,
-		int attemptNumber,
-		Configuration jobConfiguration,
-		Configuration taskConfiguration,
-		String invokableClassName,
-		List<ResultPartitionDeploymentDescriptor> producedPartitions,
-		List<InputGateDeploymentDescriptor> inputGates,
-		List<BlobKey> requiredJarFiles,
-		List<URL> requiredClasspaths,
-		int targetSlotNumber) {
-
-		this(
-			jobID,
-			jobName,
-			vertexID,
-			executionId,
-			serializedExecutionConfig,
-			taskName,
-			numberOfKeyGroups,
-			indexInSubtaskGroup,
-			numberOfSubtasks,
-			attemptNumber,
-			jobConfiguration,
-			taskConfiguration,
-			invokableClassName,
-			producedPartitions,
-			inputGates,
-			requiredJarFiles,
-			requiredClasspaths,
-			targetSlotNumber,
-			null);
-	}
+		this.taskStateHandles = taskStateHandles;
 
-	/**
-	 * Returns the execution configuration (see {@link ExecutionConfig}) related to the
-	 * specific job.
-	 */
-	public SerializedValue<ExecutionConfig> getSerializedExecutionConfig() {
-		return serializedExecutionConfig;
+		this.producedPartitions = Preconditions.checkNotNull(resultPartitionDeploymentDescriptors);
+		this.inputGates = Preconditions.checkNotNull(inputGateDeploymentDescriptors);
 	}
 
 	/**
-	 * Returns the ID of the job the tasks belongs to.
+	 * Return the sub task's serialized job information.
+	 *
+	 * @return serialized job information
 	 */
-	public JobID getJobID() {
-		return jobID;
+	public SerializedValue<JobInformation> getSerializedJobInformation() {
+		return serializedJobInformation;
 	}
-	
-	public String getJobName() { return jobName; }
 
 	/**
-	 * Returns the task's execution vertex ID.
+	 * Return the sub task's serialized task information.
+	 *
+	 * @return serialized task information
 	 */
-	public JobVertexID getVertexID() {
-		return vertexID;
+	public SerializedValue<TaskInformation> getSerializedTaskInformation() {
+		return serializedTaskInformation;
 	}
 
-	public ExecutionAttemptID getExecutionId() {
+	public ExecutionAttemptID getExecutionAttemptId() {
 		return executionId;
 	}
 
 	/**
-	 * Returns the task's name.
-	 */
-	public String getTaskName() {
-		return taskName;
-	}
-
-	/**
-	 * Returns the task's number of key groups.
-	 */
-	public int getNumberOfKeyGroups() {
-		return numberOfKeyGroups;
-	}
-
-	/**
 	 * Returns the task's index in the subtask group.
 	 *
 	 * @return the task's index in the subtask group
 	 */
-	public int getIndexInSubtaskGroup() {
-		return indexInSubtaskGroup;
-	}
-
-	/**
-	 * Returns the current number of subtasks.
-	 */
-	public int getNumberOfSubtasks() {
-		return numberOfSubtasks;
+	public int getSubtaskIndex() {
+		return subtaskIndex;
 	}
 
 	/**
@@ -254,13 +131,6 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	}
 
 	/**
-	 * Returns the {@link TaskInfo} object for the subtask
-	 */
-	public TaskInfo getTaskInfo() {
-		return new TaskInfo(taskName, numberOfKeyGroups, indexInSubtaskGroup, numberOfSubtasks, attemptNumber);
-	}
-
-	/**
 	 * Gets the number of the slot into which the task is to be deployed.
 	 *
 	 * @return The number of the target slot.
@@ -269,68 +139,39 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		return targetSlotNumber;
 	}
 
-	/**
-	 * Returns the configuration of the job the task belongs to.
-	 */
-	public Configuration getJobConfiguration() {
-		return jobConfiguration;
-	}
-
-	/**
-	 * Returns the task's configuration object.
-	 */
-	public Configuration getTaskConfiguration() {
-		return taskConfiguration;
-	}
-
-	/**
-	 * Returns the name of the class containing the task code to be executed.
-	 */
-	public String getInvokableClassName() {
-		return invokableClassName;
-	}
-
-	public List<ResultPartitionDeploymentDescriptor> getProducedPartitions() {
+	public Collection<ResultPartitionDeploymentDescriptor> getProducedPartitions() {
 		return producedPartitions;
 	}
 
-	public List<InputGateDeploymentDescriptor> getInputGates() {
+	public Collection<InputGateDeploymentDescriptor> getInputGates() {
 		return inputGates;
 	}
 
-	public List<BlobKey> getRequiredJarFiles() {
-		return requiredJarFiles;
-	}
-
-	public List<URL> getRequiredClasspaths() {
-		return requiredClasspaths;
+	public TaskStateHandles getTaskStateHandles() {
+		return taskStateHandles;
 	}
 
 	@Override
 	public String toString() {
-		return String.format("TaskDeploymentDescriptor [job id: %s, job vertex id: %s, " +
-						"execution id: %s, task name: %s (%d/%d), attempt: %d, invokable: %s, " +
-						"produced partitions: %s, input gates: %s]",
-				jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks,
-				attemptNumber, invokableClassName, collectionToString(producedPartitions),
-				collectionToString(inputGates));
+		return String.format("TaskDeploymentDescriptor [execution id: %s, attempt: %d, " +
+				"produced partitions: %s, input gates: %s]",
+			executionId,
+			attemptNumber,
+			collectionToString(producedPartitions),
+			collectionToString(inputGates));
 	}
 
-	private String collectionToString(Collection<?> collection) {
+	private static String collectionToString(Iterable<?> collection) {
 		final StringBuilder strBuilder = new StringBuilder();
 
 		strBuilder.append("[");
 
 		for (Object elem : collection) {
-			strBuilder.append(elem.toString());
+			strBuilder.append(elem);
 		}
 
 		strBuilder.append("]");
 
 		return strBuilder.toString();
 	}
-
-	public TaskStateHandles getTaskStateHandles() {
-		return taskStateHandles;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index c0ce799..219d71d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -372,7 +372,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				@Override
 				public Void apply(Throwable failure) {
 					if (failure instanceof TimeoutException) {
-						String taskname = deployment.getTaskInfo().getTaskNameWithSubtasks() + " (" + attemptId + ')';
+						String taskname = vertex.getTaskName() + '(' +
+							(getParallelSubtaskIndex() + 1) + '/' +
+							vertex.getTotalNumberOfParallelSubtasks() + ") (" + attemptId + ')';
 
 						markFailed(new Exception(
 							"Cannot deploy task " + taskname + " - TaskManager (" + assignedResourceLocation

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 1231b45..f8e894a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -122,14 +122,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * within the job. */
 	private final SerializableObject progressLock = new SerializableObject();
 
-	/** The ID of the job this graph has been built for. */
-	private final JobID jobID;
+	/** Job specific information like the job id, job name, job configuration, etc. */
+	private final JobInformation jobInformation;
 
-	/** The name of the original job graph. */
-	private final String jobName;
-
-	/** The job configuration that was originally attached to the JobGraph. */
-	private final Configuration jobConfiguration;
+	/** Serialized version of the job specific information. This is done to avoid multiple
+	 * serializations of the same data when creating a TaskDeploymentDescriptor.
+	 */
+	private final SerializedValue<JobInformation> serializedJobInformation;
 
 	/** {@code true} if all source tasks are stoppable. */
 	private boolean isStoppable = true;
@@ -146,14 +145,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/** The currently executed tasks, for callbacks */
 	private final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions;
 
-	/** A list of all libraries required during the job execution. Libraries have to be stored
-	 * inside the BlobService and are referenced via the BLOB keys. */
-	private final List<BlobKey> requiredJarFiles;
-
-	/** A list of all classpaths required during the job execution. Classpaths have to be
-	 * accessible on all nodes in the cluster. */
-	private final List<URL> requiredClasspaths;
-
 	/** Listeners that receive messages when the entire job switches it status
 	 * (such as from RUNNING to FINISHED) */
 	private final List<JobStatusListener> jobStatusListeners;
@@ -172,9 +163,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 	// ------ Configuration of the Execution -------
 
-	/** The execution configuration (see {@link ExecutionConfig}) related to this specific job. */
-	private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
-
 	/** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able
 	 * to deploy them immediately. */
 	private boolean allowQueuedScheduling = false;
@@ -237,7 +225,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			Configuration jobConfig,
 			SerializedValue<ExecutionConfig> serializedConfig,
 			Time timeout,
-			RestartStrategy restartStrategy) {
+			RestartStrategy restartStrategy) throws IOException {
 		this(
 			executor,
 			jobId,
@@ -264,7 +252,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			List<BlobKey> requiredJarFiles,
 			List<URL> requiredClasspaths,
 			ClassLoader userClassLoader,
-			MetricGroup metricGroup) {
+			MetricGroup metricGroup) throws IOException {
 
 		checkNotNull(executor);
 		checkNotNull(jobId);
@@ -272,11 +260,19 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		checkNotNull(jobConfig);
 		checkNotNull(userClassLoader);
 
+		this.jobInformation = new JobInformation(
+			jobId,
+			jobName,
+			serializedConfig,
+			jobConfig,
+			requiredJarFiles,
+			requiredClasspaths);
+
+		// serialize the job information to do the serialisation work only once
+		this.serializedJobInformation = new SerializedValue<>(jobInformation);
+
 		this.executor = executor;
 
-		this.jobID = jobId;
-		this.jobName = jobName;
-		this.jobConfiguration = jobConfig;
 		this.userClassLoader = userClassLoader;
 
 		this.tasks = new ConcurrentHashMap<JobVertexID, ExecutionJobVertex>();
@@ -290,11 +286,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		this.stateTimestamps = new long[JobStatus.values().length];
 		this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
 
-		this.requiredJarFiles = requiredJarFiles;
-		this.requiredClasspaths = requiredClasspaths;
-
-		this.serializedExecutionConfig = checkNotNull(serializedConfig);
-
 		this.timeout = timeout;
 
 		this.restartStrategy = restartStrategy;
@@ -374,7 +365,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 		// create the coordinator that triggers and commits checkpoints and holds the state
 		checkpointCoordinator = new CheckpointCoordinator(
-				jobID,
+				jobInformation.getJobId(),
 				interval,
 				checkpointTimeout,
 				minPauseBetweenCheckpoints,
@@ -461,16 +452,16 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * Returns a list of BLOB keys referring to the JAR files required to run this job
 	 * @return list of BLOB keys referring to the JAR files required to run this job
 	 */
-	public List<BlobKey> getRequiredJarFiles() {
-		return this.requiredJarFiles;
+	public Collection<BlobKey> getRequiredJarFiles() {
+		return jobInformation.getRequiredJarFileBlobKeys();
 	}
 
 	/**
 	 * Returns a list of classpaths referring to the directories/JAR files required to run this job
 	 * @return list of classpaths referring to the directories/JAR files required to run this job
 	 */
-	public List<URL> getRequiredClasspaths() {
-		return this.requiredClasspaths;
+	public Collection<URL> getRequiredClasspaths() {
+		return jobInformation.getRequiredClasspathURLs();
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -488,14 +479,18 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		return slotProvider;
 	}
 
+	public SerializedValue<JobInformation> getSerializedJobInformation() {
+		return serializedJobInformation;
+	}
+
 	@Override
 	public JobID getJobID() {
-		return jobID;
+		return jobInformation.getJobId();
 	}
 
 	@Override
 	public String getJobName() {
-		return jobName;
+		return jobInformation.getJobName();
 	}
 
 	@Override
@@ -504,7 +499,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	}
 
 	public Configuration getJobConfiguration() {
-		return jobConfiguration;
+		return jobInformation.getJobConfiguration();
 	}
 
 	public ClassLoader getUserClassLoader() {
@@ -663,7 +658,12 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			}
 
 			// create the execution job vertex and attach it to the graph
-			ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp);
+			ExecutionJobVertex ejv = null;
+			try {
+				ejv = new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp);
+			} catch (IOException e) {
+				throw new JobException("Could not create a execution job vertex for " + jobVertex.getID() + '.', e);
+			}
 			ejv.connectToPredecessors(this.intermediateResults);
 
 			ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
@@ -933,26 +933,17 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	public ArchivedExecutionConfig getArchivedExecutionConfig() {
 		// create a summary of all relevant data accessed in the web interface's JobConfigHandler
 		try {
-			ExecutionConfig executionConfig = getSerializedExecutionConfig().deserializeValue(userClassLoader);
+			ExecutionConfig executionConfig = jobInformation.getSerializedExecutionConfig().deserializeValue(userClassLoader);
 			if (executionConfig != null) {
 				return executionConfig.archive();
 			}
 		} catch (IOException | ClassNotFoundException e) {
-			LOG.error("Couldn't create ArchivedExecutionConfig for job {} ", jobID, e);
+			LOG.error("Couldn't create ArchivedExecutionConfig for job {} ", getJobID(), e);
 		};
 		return null;
 	}
 
 	/**
-	 * Returns the serialized {@link ExecutionConfig}.
-	 *
-	 * @return ExecutionConfig
-	 */
-	public SerializedValue<ExecutionConfig> getSerializedExecutionConfig() {
-		return serializedExecutionConfig;
-	}
-
-	/**
 	 * For testing: This waits until the job execution has finished.
 	 * @throws InterruptedException
 	 */
@@ -970,7 +961,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 	private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) {
 		if (STATE_UPDATER.compareAndSet(this, current, newState)) {
-			LOG.info("Job {} ({}) switched from state {} to {}.", jobName, jobID, current, newState, error);
+			LOG.info("Job {} ({}) switched from state {} to {}.", getJobName(), getJobID(), current, newState, error);
 
 			stateTimestamps[newState.ordinal()] = System.currentTimeMillis();
 			notifyJobStatusChange(newState, error);
@@ -1049,20 +1040,20 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {
 			synchronized (progressLock) {
 				if (LOG.isDebugEnabled()) {
-					LOG.debug("Try to restart or fail the job {} ({}) if no longer possible.", jobName, jobID, failureCause);
+					LOG.debug("Try to restart or fail the job {} ({}) if no longer possible.", getJobName(), getJobID(), failureCause);
 				} else {
-					LOG.info("Try to restart or fail the job {} ({}) if no longer possible.", jobName, jobID);
+					LOG.info("Try to restart or fail the job {} ({}) if no longer possible.", getJobName(), getJobID());
 				}
 
 				boolean isRestartable = !(failureCause instanceof SuppressRestartsException) && restartStrategy.canRestart();
 
 				if (isRestartable && transitionState(currentState, JobStatus.RESTARTING)) {
-					LOG.info("Restarting the job {} ({}).", jobName, jobID);
+					LOG.info("Restarting the job {} ({}).", getJobName(), getJobID());
 					restartStrategy.restart(this);
 
 					return true;
 				} else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) {
-					LOG.info("Could not restart the job {} ({}).", jobName, jobID, failureCause);
+					LOG.info("Could not restart the job {} ({}).", getJobName(), getJobID(), failureCause);
 					postRunCleanup();
 
 					return true;
@@ -1196,7 +1187,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				LOG.debug("Received accumulator result for unknown execution {}.", execID);
 			}
 		} catch (Exception e) {
-			LOG.error("Cannot update accumulators for job {}.", jobID, e);
+			LOG.error("Cannot update accumulators for job {}.", getJobID(), e);
 		}
 	}
 
@@ -1223,7 +1214,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 			for (JobStatusListener listener : jobStatusListeners) {
 				try {
-					listener.jobStatusChanges(jobID, newState, timestamp, serializedError);
+					listener.jobStatusChanges(getJobID(), newState, timestamp, serializedError);
 				} catch (Throwable t) {
 					LOG.warn("Error while notifying JobStatusListener", t);
 				}
@@ -1244,7 +1235,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			for (ExecutionStatusListener listener : executionListeners) {
 				try {
 					listener.executionStatusChanged(
-							jobID, vertexId, vertex.getJobVertex().getName(),
+							getJobID(), vertexId, vertex.getJobVertex().getName(),
 							vertex.getParallelism(), subtask, executionID, newExecutionState,
 							timestamp, message);
 				} catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 756464d..3be1d56 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Executor;
@@ -77,19 +78,25 @@ public class ExecutionGraphBuilder {
 		final JobID jobId = jobGraph.getJobID();
 
 		// create a new execution graph, if none exists so far
-		final ExecutionGraph executionGraph = (prior != null) ? prior :
-				new ExecutionGraph(
-						executor,
-						jobId,
-						jobName,
-						jobGraph.getJobConfiguration(),
-						jobGraph.getSerializedExecutionConfig(),
-						timeout,
-						restartStrategy,
-						jobGraph.getUserJarBlobKeys(),
-						jobGraph.getClasspaths(),
-						classLoader,
-						metrics);
+		final ExecutionGraph executionGraph;
+
+		try {
+			executionGraph = (prior != null) ? prior :
+					new ExecutionGraph(
+							executor,
+							jobId,
+							jobName,
+							jobGraph.getJobConfiguration(),
+							jobGraph.getSerializedExecutionConfig(),
+							timeout,
+							restartStrategy,
+							jobGraph.getUserJarBlobKeys(),
+							jobGraph.getClasspaths(),
+							classLoader,
+							metrics);
+		} catch (IOException e) {
+			throw new JobException("Could not create the execution graph.", e);
+		}
 
 		// set the basic properties
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 2bb63d4..47cfde1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -43,10 +43,12 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 
 import scala.Option;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -84,13 +86,20 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	
 	private final InputSplit[] inputSplits;
 
+	/**
+	 * Serialized task information which is for all sub tasks the same. Thus, it avoids to
+	 * serialize the same information multiple times in order to create the
+	 * TaskDeploymentDescriptors.
+	 */
+	private final SerializedValue<TaskInformation> serializedTaskInformation;
+
 	private InputSplitAssigner splitAssigner;
 	
 	public ExecutionJobVertex(
 		ExecutionGraph graph,
 		JobVertex jobVertex,
 		int defaultParallelism,
-		Time timeout) throws JobException {
+		Time timeout) throws JobException, IOException {
 
 		this(graph, jobVertex, defaultParallelism, timeout, System.currentTimeMillis());
 	}
@@ -100,7 +109,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		JobVertex jobVertex,
 		int defaultParallelism,
 		Time timeout,
-		long createTimestamp) throws JobException {
+		long createTimestamp) throws JobException, IOException {
 
 		if (graph == null || jobVertex == null) {
 			throw new NullPointerException();
@@ -108,18 +117,26 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		
 		this.graph = graph;
 		this.jobVertex = jobVertex;
-		
+
 		int vertexParallelism = jobVertex.getParallelism();
 		int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;
-		
+
 		this.parallelism = numTaskVertices;
 
-		int maxParallelism = jobVertex.getMaxParallelism();
+		int maxP = jobVertex.getMaxParallelism();
 
-		Preconditions.checkArgument(maxParallelism >= parallelism, "The maximum parallelism (" +
-			maxParallelism + ") must be greater or equal than the parallelism (" + parallelism +
+		Preconditions.checkArgument(maxP >= parallelism, "The maximum parallelism (" +
+			maxP + ") must be greater or equal than the parallelism (" + parallelism +
 			").");
-		this.maxParallelism = maxParallelism;
+		this.maxParallelism = maxP;
+
+		this.serializedTaskInformation = new SerializedValue<>(new TaskInformation(
+			jobVertex.getID(),
+			jobVertex.getName(),
+			parallelism,
+			maxParallelism,
+			jobVertex.getInvokableClassName(),
+			jobVertex.getConfiguration()));
 
 		this.taskVertices = new ExecutionVertex[numTaskVertices];
 		
@@ -247,6 +264,10 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	public List<IntermediateResult> getInputs() {
 		return inputs;
 	}
+
+	public SerializedValue<TaskInformation> getSerializedTaskInformation() {
+		return serializedTaskInformation;
+	}
 	
 	public boolean isInFinalState() {
 		return numSubtasksInFinalState == parallelism;

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 8979d7c..e7f000c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -19,11 +19,9 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.Archiveable;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
@@ -47,7 +45,6 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -599,30 +596,19 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			consumedPartitions.add(new InputGateDeploymentDescriptor(resultId, queueToRequest, partitions));
 		}
 
-		SerializedValue<ExecutionConfig> serializedConfig = getExecutionGraph().getSerializedExecutionConfig();
-		List<BlobKey> jarFiles = getExecutionGraph().getRequiredJarFiles();
-		List<URL> classpaths = getExecutionGraph().getRequiredClasspaths();
+		SerializedValue<JobInformation> serializedJobInformation = getExecutionGraph().getSerializedJobInformation();
+		SerializedValue<TaskInformation> serializedJobVertexInformation = jobVertex.getSerializedTaskInformation();
 
 		return new TaskDeploymentDescriptor(
-			getJobId(),
-			getExecutionGraph().getJobName(),
-			getJobvertexId(),
+			serializedJobInformation,
+			serializedJobVertexInformation,
 			executionId,
-			serializedConfig,
-			getTaskName(),
-			getMaxParallelism(),
 			subTaskIndex,
-			getTotalNumberOfParallelSubtasks(),
 			attemptNumber,
-			getExecutionGraph().getJobConfiguration(),
-			jobVertex.getJobVertex().getConfiguration(),
-			jobVertex.getJobVertex().getInvokableClassName(),
-			producedPartitions,
-			consumedPartitions,
-			jarFiles,
-			classpaths,
 			targetSlot.getRoot().getSlotNumber(),
-			taskStateHandles);
+			taskStateHandles,
+			producedPartitions,
+			consumedPartitions);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
new file mode 100644
index 0000000..6e3c1e8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.Serializable;
+import java.net.URL;
+import java.util.Collection;
+
+/**
+ * Container class for job information which is stored in the {@link ExecutionGraph}.
+ */
+public class JobInformation implements Serializable {
+
+	private static final long serialVersionUID = 8367087049937822140L;
+
+	/** Id of the job */
+	private final JobID jobId;
+
+	/** Job name */
+	private final String jobName;
+
+	/** Serialized execution config because it can contain user code classes */
+	private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
+
+	/** Configuration of the job */
+	private final Configuration jobConfiguration;
+
+	/** Blob keys for the required jar files */
+	private final Collection<BlobKey> requiredJarFileBlobKeys;
+
+	/** URLs specifying the classpath to add to the class loader */
+	private final Collection<URL> requiredClasspathURLs;
+
+
+	public JobInformation(
+			JobID jobId,
+			String jobName,
+			SerializedValue<ExecutionConfig> serializedExecutionConfig,
+			Configuration jobConfiguration,
+			Collection<BlobKey> requiredJarFileBlobKeys,
+			Collection<URL> requiredClasspathURLs) {
+		this.jobId = Preconditions.checkNotNull(jobId);
+		this.jobName = Preconditions.checkNotNull(jobName);
+		this.serializedExecutionConfig = Preconditions.checkNotNull(serializedExecutionConfig);
+		this.jobConfiguration = Preconditions.checkNotNull(jobConfiguration);
+		this.requiredJarFileBlobKeys = Preconditions.checkNotNull(requiredJarFileBlobKeys);
+		this.requiredClasspathURLs = Preconditions.checkNotNull(requiredClasspathURLs);
+	}
+
+	public JobID getJobId() {
+		return jobId;
+	}
+
+	public String getJobName() {
+		return jobName;
+	}
+
+	public SerializedValue<ExecutionConfig> getSerializedExecutionConfig() {
+		return serializedExecutionConfig;
+	}
+
+	public Configuration getJobConfiguration() {
+		return jobConfiguration;
+	}
+
+	public Collection<BlobKey> getRequiredJarFileBlobKeys() {
+		return requiredJarFileBlobKeys;
+	}
+
+	public Collection<URL> getRequiredClasspathURLs() {
+		return requiredClasspathURLs;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java
new file mode 100644
index 0000000..ccec118
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * Container class for operator/task specific information which are stored at the
+ * {@link ExecutionJobVertex}. This information is shared by all sub tasks of this operator.
+ */
+public class TaskInformation implements Serializable {
+
+	private static final long serialVersionUID = -9006218793155953789L;
+
+	/** Job vertex id of the associated job vertex */
+	private final JobVertexID jobVertexId;
+
+	/** Name of the task */
+	private final String taskName;
+
+	/** The number of subtasks for this operator */
+	private final int numberOfSubtasks;
+
+	/** The maximum parallelism == number of key groups */
+	private final int numberOfKeyGroups;
+
+	/** Class name of the invokable to run */
+	private final String invokableClassName;
+
+	/** Configuration for the task */
+	private final Configuration taskConfiguration;
+
+	public TaskInformation(
+			JobVertexID jobVertexId,
+			String taskName,
+			int numberOfSubtasks,
+			int numberOfKeyGroups,
+			String invokableClassName,
+			Configuration taskConfiguration) {
+		this.jobVertexId = Preconditions.checkNotNull(jobVertexId);
+		this.taskName = Preconditions.checkNotNull(taskName);
+		this.numberOfSubtasks = Preconditions.checkNotNull(numberOfSubtasks);
+		this.numberOfKeyGroups = Preconditions.checkNotNull(numberOfKeyGroups);
+		this.invokableClassName = Preconditions.checkNotNull(invokableClassName);
+		this.taskConfiguration = Preconditions.checkNotNull(taskConfiguration);
+	}
+
+	public JobVertexID getJobVertexId() {
+		return jobVertexId;
+	}
+
+	public String getTaskName() {
+		return taskName;
+	}
+
+	public int getNumberOfSubtasks() {
+		return numberOfSubtasks;
+	}
+
+	public int getNumberOfKeyGroups() {
+		return numberOfKeyGroups;
+	}
+
+	public String getInvokableClassName() {
+		return invokableClassName;
+	}
+
+	public Configuration getTaskConfiguration() {
+		return taskConfiguration;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
index 3961c14..1ac8140 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
@@ -19,7 +19,8 @@ package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.util.AbstractID;
 
@@ -59,20 +60,27 @@ public class TaskManagerJobMetricGroup extends JobMetricGroup<TaskManagerMetricG
 	//  adding / removing tasks
 	// ------------------------------------------------------------------------
 
-	public TaskMetricGroup addTask(TaskDeploymentDescriptor tdd) {
-		AbstractID vertexId = tdd.getVertexID();
-		AbstractID executionId = tdd.getExecutionId();
-		String taskName = tdd.getTaskName();
-		int subtaskIndex = tdd.getIndexInSubtaskGroup();
-		int attemptNumber = tdd.getAttemptNumber();
-
-		checkNotNull(executionId);
+	public TaskMetricGroup addTask(
+			final JobVertexID jobVertexId,
+			final ExecutionAttemptID executionAttemptID,
+			final String taskName,
+			final int subtaskIndex,
+			final int attemptNumber) {
+		checkNotNull(jobVertexId);
+		checkNotNull(executionAttemptID);
+		checkNotNull(taskName);
 
 		synchronized (this) {
 			if (!isClosed()) {
-				TaskMetricGroup task = new TaskMetricGroup(registry, this,
-					vertexId, executionId, taskName, subtaskIndex, attemptNumber);
-				tasks.put(executionId, task);
+				TaskMetricGroup task = new TaskMetricGroup(
+					registry,
+					this,
+					jobVertexId,
+					executionAttemptID,
+					taskName,
+					subtaskIndex,
+					attemptNumber);
+				tasks.put(executionAttemptID, task);
 				return task;
 			} else {
 				return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
index c3ca5fe..92c509a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
@@ -21,10 +21,12 @@ package org.apache.flink.runtime.metrics.groups;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.metrics.CharacterFilter;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+import org.apache.flink.util.Preconditions;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -68,11 +70,19 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup<TaskManagerMetr
 	//  job groups
 	// ------------------------------------------------------------------------
 
-	public TaskMetricGroup addTaskForJob(TaskDeploymentDescriptor tdd) {
-		JobID jobId = tdd.getJobID();
-		String jobName = tdd.getJobName().length() == 0 
-			? tdd.getJobID().toString()
-			: tdd.getJobName();
+	public TaskMetricGroup addTaskForJob(
+			final JobID jobId,
+			final String jobName,
+			final JobVertexID jobVertexId,
+			final ExecutionAttemptID executionAttemptId,
+			final String taskName,
+			final int subtaskIndex,
+			final int attemptNumber) {
+		Preconditions.checkNotNull(jobId);
+
+		String resolvedJobName = jobName == null || jobName.isEmpty()
+			? jobId.toString()
+			: jobName;
 
 		// we cannot strictly lock both our map modification and the job group modification
 		// because it might lead to a deadlock
@@ -83,14 +93,19 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup<TaskManagerMetr
 				currentJobGroup = jobs.get(jobId);
 
 				if (currentJobGroup == null || currentJobGroup.isClosed()) {
-					currentJobGroup = new TaskManagerJobMetricGroup(registry, this, jobId, jobName);
+					currentJobGroup = new TaskManagerJobMetricGroup(registry, this, jobId, resolvedJobName);
 					jobs.put(jobId, currentJobGroup);
 				}
 			}
 
 			// try to add another task. this may fail if we found a pre-existing job metrics
 			// group and it is closed concurrently
-			TaskMetricGroup taskGroup = currentJobGroup.addTask(tdd);
+			TaskMetricGroup taskGroup = currentJobGroup.addTask(
+				jobVertexId,
+				executionAttemptId,
+				taskName,
+				subtaskIndex,
+				attemptNumber);
 
 			if (taskGroup != null) {
 				// successfully added the next task

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index ffcf909..827451e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -35,12 +35,13 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -70,6 +71,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.net.URL;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -84,8 +86,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * The Task represents one execution of a parallel subtask on a TaskManager.
  * A Task wraps a Flink operator (which may be a user function) and
@@ -144,10 +144,10 @@ public class Task implements Runnable, TaskActions {
 	private final Configuration taskConfiguration;
 
 	/** The jar files used by this task */
-	private final List<BlobKey> requiredJarFiles;
+	private final Collection<BlobKey> requiredJarFiles;
 
 	/** The classpaths used by this task */
-	private final List<URL> requiredClasspaths;
+	private final Collection<URL> requiredClasspaths;
 
 	/** The name of the class that holds the invokable code */
 	private final String nameOfInvokableClass;
@@ -249,7 +249,15 @@ public class Task implements Runnable, TaskActions {
 	 * be undone in the case of a failing task deployment.</p>
 	 */
 	public Task(
-		TaskDeploymentDescriptor tdd,
+		JobInformation jobInformation,
+		TaskInformation taskInformation,
+		ExecutionAttemptID executionAttemptID,
+		int subtaskIndex,
+		int attemptNumber,
+		Collection<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors,
+		Collection<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors,
+		int targetSlotNumber,
+		TaskStateHandles taskStateHandles,
 		MemoryManager memManager,
 		IOManager ioManager,
 		NetworkEnvironment networkEnvironment,
@@ -265,36 +273,48 @@ public class Task implements Runnable, TaskActions {
 		PartitionStateChecker partitionStateChecker,
 		Executor executor) {
 
-		this.taskInfo = checkNotNull(tdd.getTaskInfo());
-		this.jobId = checkNotNull(tdd.getJobID());
-		this.vertexId = checkNotNull(tdd.getVertexID());
-		this.executionId  = checkNotNull(tdd.getExecutionId());
+		Preconditions.checkNotNull(jobInformation);
+		Preconditions.checkNotNull(taskInformation);
+
+		Preconditions.checkArgument(0 <= subtaskIndex, "The subtask index must be positive.");
+		Preconditions.checkArgument(0 <= attemptNumber, "The attempt number must be positive.");
+		Preconditions.checkArgument(0 <= targetSlotNumber, "The target slot number must be positive.");
+
+		this.taskInfo = new TaskInfo(
+			taskInformation.getTaskName(),
+			taskInformation.getNumberOfKeyGroups(),
+			subtaskIndex,
+			taskInformation.getNumberOfSubtasks(),
+			attemptNumber);
+
+		this.jobId = jobInformation.getJobId();
+		this.vertexId = taskInformation.getJobVertexId();
+		this.executionId  = Preconditions.checkNotNull(executionAttemptID);
 		this.taskNameWithSubtask = taskInfo.getTaskNameWithSubtasks();
-		this.jobConfiguration = checkNotNull(tdd.getJobConfiguration());
-		this.taskConfiguration = checkNotNull(tdd.getTaskConfiguration());
-		this.requiredJarFiles = checkNotNull(tdd.getRequiredJarFiles());
-		this.requiredClasspaths = checkNotNull(tdd.getRequiredClasspaths());
-		this.nameOfInvokableClass = checkNotNull(tdd.getInvokableClassName());
-		this.serializedExecutionConfig = checkNotNull(tdd.getSerializedExecutionConfig());
-		this.taskStateHandles = tdd.getTaskStateHandles();
-
-		Configuration taskConfig = tdd.getTaskConfiguration();
-		this.taskCancellationInterval = taskConfig.getLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL);
-		this.taskCancellationTimeout = taskConfig.getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT);
-
-		this.memoryManager = checkNotNull(memManager);
-		this.ioManager = checkNotNull(ioManager);
-		this.broadcastVariableManager = checkNotNull(bcVarManager);
+		this.jobConfiguration = jobInformation.getJobConfiguration();
+		this.taskConfiguration = taskInformation.getTaskConfiguration();
+		this.requiredJarFiles = jobInformation.getRequiredJarFileBlobKeys();
+		this.requiredClasspaths = jobInformation.getRequiredClasspathURLs();
+		this.nameOfInvokableClass = taskInformation.getInvokableClassName();
+		this.serializedExecutionConfig = jobInformation.getSerializedExecutionConfig();
+		this.taskStateHandles = taskStateHandles;
+
+		this.taskCancellationInterval = taskConfiguration.getLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL);
+		this.taskCancellationTimeout = taskConfiguration.getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT);
+
+		this.memoryManager = Preconditions.checkNotNull(memManager);
+		this.ioManager = Preconditions.checkNotNull(ioManager);
+		this.broadcastVariableManager = Preconditions.checkNotNull(bcVarManager);
 		this.accumulatorRegistry = new AccumulatorRegistry(jobId, executionId);
 
-		this.inputSplitProvider = checkNotNull(inputSplitProvider);
-		this.checkpointResponder = checkNotNull(checkpointResponder);
-		this.taskManagerConnection = checkNotNull(taskManagerConnection);
+		this.inputSplitProvider = Preconditions.checkNotNull(inputSplitProvider);
+		this.checkpointResponder = Preconditions.checkNotNull(checkpointResponder);
+		this.taskManagerConnection = Preconditions.checkNotNull(taskManagerConnection);
 
-		this.libraryCache = checkNotNull(libraryCache);
-		this.fileCache = checkNotNull(fileCache);
-		this.network = checkNotNull(networkEnvironment);
-		this.taskManagerConfig = checkNotNull(taskManagerConfig);
+		this.libraryCache = Preconditions.checkNotNull(libraryCache);
+		this.fileCache = Preconditions.checkNotNull(fileCache);
+		this.network = Preconditions.checkNotNull(networkEnvironment);
+		this.taskManagerConfig = Preconditions.checkNotNull(taskManagerConfig);
 
 		this.taskExecutionStateListeners = new CopyOnWriteArrayList<>();
 		this.metrics = metricGroup;
@@ -306,18 +326,16 @@ public class Task implements Runnable, TaskActions {
 
 		final String taskNameWithSubtaskAndId = taskNameWithSubtask + " (" + executionId + ')';
 
-		List<ResultPartitionDeploymentDescriptor> partitions = tdd.getProducedPartitions();
-		List<InputGateDeploymentDescriptor> consumedPartitions = tdd.getInputGates();
-
 		// Produced intermediate result partitions
-		this.producedPartitions = new ResultPartition[partitions.size()];
-		this.writers = new ResultPartitionWriter[partitions.size()];
+		this.producedPartitions = new ResultPartition[resultPartitionDeploymentDescriptors.size()];
+		this.writers = new ResultPartitionWriter[resultPartitionDeploymentDescriptors.size()];
 
-		for (int i = 0; i < this.producedPartitions.length; i++) {
-			ResultPartitionDeploymentDescriptor desc = partitions.get(i);
+		int counter = 0;
+
+		for (ResultPartitionDeploymentDescriptor desc: resultPartitionDeploymentDescriptors) {
 			ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), executionId);
 
-			this.producedPartitions[i] = new ResultPartition(
+			this.producedPartitions[counter] = new ResultPartition(
 				taskNameWithSubtaskAndId,
 				this,
 				jobId,
@@ -330,25 +348,31 @@ public class Task implements Runnable, TaskActions {
 				ioManager,
 				networkEnvironment.getDefaultIOMode());
 
-			this.writers[i] = new ResultPartitionWriter(this.producedPartitions[i]);
+			writers[counter] = new ResultPartitionWriter(producedPartitions[counter]);
+
+			++counter;
 		}
 
 		// Consumed intermediate result partitions
-		this.inputGates = new SingleInputGate[consumedPartitions.size()];
+		this.inputGates = new SingleInputGate[inputGateDeploymentDescriptors.size()];
 		this.inputGatesById = new HashMap<>();
 
-		for (int i = 0; i < this.inputGates.length; i++) {
+		counter = 0;
+
+		for (InputGateDeploymentDescriptor inputGateDeploymentDescriptor: inputGateDeploymentDescriptors) {
 			SingleInputGate gate = SingleInputGate.create(
 				taskNameWithSubtaskAndId,
 				jobId,
 				executionId,
-				consumedPartitions.get(i),
+				inputGateDeploymentDescriptor,
 				networkEnvironment,
 				this,
 				metricGroup.getIOMetricGroup());
 
-			this.inputGates[i] = gate;
+			inputGates[counter] = gate;
 			inputGatesById.put(gate.getConsumedResultId(), gate);
+
+			++counter;
 		}
 
 		invokableHasBeenCanceled = new AtomicBoolean(false);

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 3df8c26..3d72f1a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -237,8 +237,8 @@ object JobManagerMessages {
     * @param requiredClasspaths The urls of the required classpaths
     */
   case class ClassloadingProps(blobManagerPort: Integer,
-                               requiredJarFiles: java.util.List[BlobKey],
-                               requiredClasspaths: java.util.List[URL])
+                               requiredJarFiles: java.util.Collection[BlobKey],
+                               requiredClasspaths: java.util.Collection[URL])
 
   /**
    * Requests the port of the blob manager from the job manager. The result is sent back to the

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 429d961..4bb2da4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -75,7 +75,7 @@ import scala.collection.JavaConverters._
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.language.postfixOps
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 
 /**
  * The TaskManager is responsible for executing the individual tasks of a Flink job. It is
@@ -1120,24 +1120,46 @@ class TaskManager(
 
       val jobManagerGateway = new AkkaActorGateway(jobManagerActor, leaderSessionID.orNull)
 
-      var jobName = tdd.getJobName
-      if (tdd.getJobName.length == 0) {
-        jobName = tdd.getJobID.toString()
-      } else {
-        jobName = tdd.getJobName
+      val jobInformation = try {
+        tdd.getSerializedJobInformation.deserializeValue(getClass.getClassLoader)
+      } catch {
+        case e @ (_: IOException | _: ClassNotFoundException) =>
+          throw new IOException("Could not deserialize the job information.", e)
       }
-      
-      val taskMetricGroup = taskManagerMetricGroup.addTaskForJob(tdd)
+
+      val taskInformation = try {
+        tdd.getSerializedTaskInformation.deserializeValue(getClass.getClassLoader)
+      } catch {
+        case e@(_: IOException | _: ClassNotFoundException) =>
+          throw new IOException("Could not deserialize the job vertex information.", e)
+      }
+
+      val taskMetricGroup = taskManagerMetricGroup.addTaskForJob(
+        jobInformation.getJobId,
+        jobInformation.getJobName,
+        taskInformation.getJobVertexId,
+        tdd.getExecutionAttemptId,
+        taskInformation.getTaskName,
+        tdd.getSubtaskIndex,
+        tdd.getAttemptNumber)
 
       val inputSplitProvider = new TaskInputSplitProvider(
         jobManagerGateway,
-        tdd.getJobID,
-        tdd.getVertexID,
-        tdd.getExecutionId,
+        jobInformation.getJobId,
+        taskInformation.getJobVertexId,
+        tdd.getExecutionAttemptId,
         config.timeout)
 
       val task = new Task(
-        tdd,
+        jobInformation,
+        taskInformation,
+        tdd.getExecutionAttemptId,
+        tdd.getSubtaskIndex,
+        tdd.getAttemptNumber,
+        tdd.getProducedPartitions,
+        tdd.getInputGates,
+        tdd.getTargetSlotNumber,
+        tdd.getTaskStateHandles,
         memoryManager,
         ioManager,
         network,
@@ -1155,7 +1177,7 @@ class TaskManager(
 
       log.info(s"Received task ${task.getTaskInfo.getTaskNameWithSubtasks()}")
 
-      val execId = tdd.getExecutionId
+      val execId = tdd.getExecutionAttemptId
       // add the task to the map
       val prevTask = runningTasks.put(execId, task)
       if (prevTask != null) {
@@ -1163,11 +1185,12 @@ class TaskManager(
         runningTasks.put(execId, prevTask)
         throw new IllegalStateException("TaskManager already contains a task for id " + execId)
       }
-      
+
       // all good, we kick off the task, which performs its own initialization
       task.startTaskThread()
 
       sender ! decorateMessage(Acknowledge.get())
+
     }
     catch {
       case t: Throwable =>

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index d352b6d..c8c9350 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
@@ -122,6 +123,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 				new DisabledCheckpointStatsTracker());
 
 		JobVertex jobVertex = new JobVertex("MockVertex");
+		jobVertex.setInvokableClass(AbstractInvokable.class);
 		executionGraph.attachJobGraph(Collections.singletonList(jobVertex));
 
 		return executionGraph;

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 39ea176..593f8d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -31,10 +31,13 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
@@ -60,33 +63,42 @@ public class TaskDeploymentDescriptorTest {
 			final List<BlobKey> requiredJars = new ArrayList<BlobKey>(0);
 			final List<URL> requiredClasspaths = new ArrayList<URL>(0);
 			final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig());
+			final SerializedValue<JobInformation> serializedJobInformation = new SerializedValue<>(new JobInformation(
+				jobID, jobName, executionConfig, jobConfiguration, requiredJars, requiredClasspaths));
+			final SerializedValue<TaskInformation> serializedJobVertexInformation = new SerializedValue<>(new TaskInformation(
+				vertexID, taskName, currentNumberOfSubtasks, numberOfKeyGroups, invokableClass.getName(), taskConfiguration));
+			final int targetSlotNumber = 47;
+			final TaskStateHandles taskStateHandles = new TaskStateHandles();
 
-			final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, jobName, vertexID, execId,
-				executionConfig, taskName, numberOfKeyGroups, indexInSubtaskGroup, currentNumberOfSubtasks, attemptNumber,
-				jobConfiguration, taskConfiguration, invokableClass.getName(), producedResults, inputGates,
-				requiredJars, requiredClasspaths, 47);
+			final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(
+				serializedJobInformation,
+				serializedJobVertexInformation,
+				execId,
+				indexInSubtaskGroup,
+				attemptNumber,
+				targetSlotNumber,
+				taskStateHandles,
+				producedResults,
+				inputGates);
 	
 			final TaskDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig);
 	
-			assertFalse(orig.getJobID() == copy.getJobID());
-			assertFalse(orig.getVertexID() == copy.getVertexID());
-			assertFalse(orig.getTaskName() == copy.getTaskName());
-			assertFalse(orig.getJobConfiguration() == copy.getJobConfiguration());
-			assertFalse(orig.getTaskConfiguration() == copy.getTaskConfiguration());
+			assertFalse(orig.getSerializedJobInformation() == copy.getSerializedJobInformation());
+			assertFalse(orig.getSerializedTaskInformation() == copy.getSerializedTaskInformation());
+			assertFalse(orig.getExecutionAttemptId() == copy.getExecutionAttemptId());
+			assertFalse(orig.getTaskStateHandles() == copy.getTaskStateHandles());
+			assertFalse(orig.getProducedPartitions() == copy.getProducedPartitions());
+			assertFalse(orig.getInputGates() == copy.getInputGates());
 
-			assertEquals(orig.getJobID(), copy.getJobID());
-			assertEquals(orig.getVertexID(), copy.getVertexID());
-			assertEquals(orig.getTaskName(), copy.getTaskName());
-			assertEquals(orig.getNumberOfKeyGroups(), copy.getNumberOfKeyGroups());
-			assertEquals(orig.getIndexInSubtaskGroup(), copy.getIndexInSubtaskGroup());
-			assertEquals(orig.getNumberOfSubtasks(), copy.getNumberOfSubtasks());
+			assertEquals(orig.getSerializedJobInformation(), copy.getSerializedJobInformation());
+			assertEquals(orig.getSerializedTaskInformation(), copy.getSerializedTaskInformation());
+			assertEquals(orig.getExecutionAttemptId(), copy.getExecutionAttemptId());
+			assertEquals(orig.getSubtaskIndex(), copy.getSubtaskIndex());
 			assertEquals(orig.getAttemptNumber(), copy.getAttemptNumber());
+			assertEquals(orig.getTargetSlotNumber(), copy.getTargetSlotNumber());
+			assertEquals(orig.getTaskStateHandles(), copy.getTaskStateHandles());
 			assertEquals(orig.getProducedPartitions(), copy.getProducedPartitions());
 			assertEquals(orig.getInputGates(), copy.getInputGates());
-			assertEquals(orig.getSerializedExecutionConfig(), copy.getSerializedExecutionConfig());
-
-			assertEquals(orig.getRequiredJarFiles(), copy.getRequiredJarFiles());
-			assertEquals(orig.getRequiredClasspaths(), copy.getRequiredClasspaths());
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
index d8ada35..34043eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -37,7 +38,12 @@ public class AllVerticesIteratorTest {
 			JobVertex v2 = new JobVertex("v2");
 			JobVertex v3 = new JobVertex("v3");
 			JobVertex v4 = new JobVertex("v4");
-			
+
+			v1.setInvokableClass(AbstractInvokable.class);
+			v2.setInvokableClass(AbstractInvokable.class);
+			v3.setInvokableClass(AbstractInvokable.class);
+			v4.setInvokableClass(AbstractInvokable.class);
+
 			v1.setParallelism(1);
 			v2.setParallelism(7);
 			v3.setParallelism(3);

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index ff50573..88f9ce0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
@@ -82,6 +83,9 @@ public class ArchivedExecutionGraphTest {
 		v1.setParallelism(1);
 		v2.setParallelism(2);
 
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
+
 		List<JobVertex> vertices = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionConfig config = new ExecutionConfig();

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index 6740293..6f6fcd0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
@@ -95,7 +96,13 @@ public class ExecutionGraphConstructionTest {
 		v3.setParallelism(2);
 		v4.setParallelism(11);
 		v5.setParallelism(4);
-		
+
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
+		v3.setInvokableClass(AbstractInvokable.class);
+		v4.setInvokableClass(AbstractInvokable.class);
+		v5.setInvokableClass(AbstractInvokable.class);
+
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
 		v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
 		v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
@@ -137,7 +144,11 @@ public class ExecutionGraphConstructionTest {
 		v1.setParallelism(5);
 		v2.setParallelism(7);
 		v3.setParallelism(2);
-		
+
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
+		v3.setInvokableClass(AbstractInvokable.class);
+
 		// this creates an intermediate result for v1
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
 		
@@ -171,7 +182,10 @@ public class ExecutionGraphConstructionTest {
 		JobVertex v5 = new JobVertex("vertex5");
 		v4.setParallelism(11);
 		v5.setParallelism(4);
-		
+
+		v4.setInvokableClass(AbstractInvokable.class);
+		v5.setInvokableClass(AbstractInvokable.class);
+
 		v4.connectDataSetAsInput(v2result, DistributionPattern.ALL_TO_ALL);
 		v4.connectDataSetAsInput(v3result_1, DistributionPattern.ALL_TO_ALL);
 		v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
@@ -205,6 +219,10 @@ public class ExecutionGraphConstructionTest {
 		v1.setParallelism(5);
 		v2.setParallelism(7);
 		v3.setParallelism(2);
+
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
+		v3.setInvokableClass(AbstractInvokable.class);
 		
 		// this creates an intermediate result for v1
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
@@ -213,8 +231,7 @@ public class ExecutionGraphConstructionTest {
 		IntermediateDataSet v2result = v2.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
 		IntermediateDataSet v3result_1 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
 		IntermediateDataSet v3result_2 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
-		
-		
+
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
 		ExecutionGraph eg = new ExecutionGraph(
@@ -239,7 +256,10 @@ public class ExecutionGraphConstructionTest {
 		JobVertex v5 = new JobVertex("vertex5");
 		v4.setParallelism(11);
 		v5.setParallelism(4);
-		
+
+		v4.setInvokableClass(AbstractInvokable.class);
+		v5.setInvokableClass(AbstractInvokable.class);
+
 		v4.connectIdInput(v2result.getId(), DistributionPattern.ALL_TO_ALL);
 		v4.connectIdInput(v3result_1.getId(), DistributionPattern.ALL_TO_ALL);
 		v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
@@ -469,7 +489,8 @@ public class ExecutionGraphConstructionTest {
 		// construct part one of the execution graph
 		JobVertex v1 = new JobVertex("vertex1");
 		v1.setParallelism(7);
-		
+		v1.setInvokableClass(AbstractInvokable.class);
+
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1));
 
 		ExecutionGraph eg = new ExecutionGraph(
@@ -490,6 +511,7 @@ public class ExecutionGraphConstructionTest {
 		
 		// attach the second part of the graph
 		JobVertex v2 = new JobVertex("vertex2");
+		v2.setInvokableClass(AbstractInvokable.class);
 		v2.connectIdInput(new IntermediateDataSetID(), DistributionPattern.ALL_TO_ALL);
 		
 		List<JobVertex> ordered2 = new ArrayList<JobVertex>(Arrays.asList(v2));
@@ -520,7 +542,13 @@ public class ExecutionGraphConstructionTest {
 		v3.setParallelism(2);
 		v4.setParallelism(11);
 		v5.setParallelism(4);
-		
+
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
+		v3.setInvokableClass(AbstractInvokable.class);
+		v4.setInvokableClass(AbstractInvokable.class);
+		v5.setInvokableClass(AbstractInvokable.class);
+
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
 		v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
 		v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
@@ -579,6 +607,12 @@ public class ExecutionGraphConstructionTest {
 			v3.setParallelism(2);
 			v4.setParallelism(11);
 			v5.setParallelism(4);
+
+			v1.setInvokableClass(AbstractInvokable.class);
+			v2.setInvokableClass(AbstractInvokable.class);
+			v3.setInvokableClass(AbstractInvokable.class);
+			v4.setInvokableClass(AbstractInvokable.class);
+			v5.setInvokableClass(AbstractInvokable.class);
 			
 			v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
 			v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
@@ -672,6 +706,8 @@ public class ExecutionGraphConstructionTest {
 			JobVertex v2 = new JobVertex("vertex2");
 			v1.setParallelism(6);
 			v2.setParallelism(4);
+			v1.setInvokableClass(AbstractInvokable.class);
+			v2.setInvokableClass(AbstractInvokable.class);
 			
 			SlotSharingGroup sl1 = new SlotSharingGroup();
 			v1.setSlotSharingGroup(sl1);
@@ -690,6 +726,12 @@ public class ExecutionGraphConstructionTest {
 			v5.setParallelism(3);
 			v6.setParallelism(3);
 			v7.setParallelism(3);
+
+			v3.setInvokableClass(AbstractInvokable.class);
+			v4.setInvokableClass(AbstractInvokable.class);
+			v5.setInvokableClass(AbstractInvokable.class);
+			v6.setInvokableClass(AbstractInvokable.class);
+			v7.setInvokableClass(AbstractInvokable.class);
 			
 			SlotSharingGroup sl2 = new SlotSharingGroup();
 			v3.setSlotSharingGroup(sl2);
@@ -706,6 +748,7 @@ public class ExecutionGraphConstructionTest {
 			// isolated vertex
 			JobVertex v8 = new JobVertex("vertex8");
 			v8.setParallelism(2);
+			v8.setInvokableClass(AbstractInvokable.class);
 
 			JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, v4, v5, v6, v7, v8);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/58204da1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index acccd98..63da1ab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -25,8 +25,10 @@ import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -117,22 +119,28 @@ public class ExecutionGraphDeploymentTest {
 			TaskDeploymentDescriptor descr = instanceGateway.lastTDD;
 			assertNotNull(descr);
 
-			assertEquals(jobId, descr.getJobID());
-			assertEquals(jid2, descr.getVertexID());
-			assertEquals(3, descr.getIndexInSubtaskGroup());
-			assertEquals(10, descr.getNumberOfSubtasks());
-			assertEquals(BatchTask.class.getName(), descr.getInvokableClassName());
-			assertEquals("v2", descr.getTaskName());
+			JobInformation jobInformation = descr.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
+			TaskInformation taskInformation = descr.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
 
-			List<ResultPartitionDeploymentDescriptor> producedPartitions = descr.getProducedPartitions();
-			List<InputGateDeploymentDescriptor> consumedPartitions = descr.getInputGates();
+			assertEquals(jobId, jobInformation.getJobId());
+			assertEquals(jid2, taskInformation.getJobVertexId());
+			assertEquals(3, descr.getSubtaskIndex());
+			assertEquals(10, taskInformation.getNumberOfSubtasks());
+			assertEquals(BatchTask.class.getName(), taskInformation.getInvokableClassName());
+			assertEquals("v2", taskInformation.getTaskName());
+
+			Collection<ResultPartitionDeploymentDescriptor> producedPartitions = descr.getProducedPartitions();
+			Collection<InputGateDeploymentDescriptor> consumedPartitions = descr.getInputGates();
 
 			assertEquals(2, producedPartitions.size());
 			assertEquals(1, consumedPartitions.size());
 
-			assertEquals(10, producedPartitions.get(0).getNumberOfSubpartitions());
-			assertEquals(10, producedPartitions.get(1).getNumberOfSubpartitions());
-			assertEquals(10, consumedPartitions.get(0).getInputChannelDeploymentDescriptors().length);
+			Iterator<ResultPartitionDeploymentDescriptor> iteratorProducedPartitions = producedPartitions.iterator();
+			Iterator<InputGateDeploymentDescriptor> iteratorConsumedPartitions = consumedPartitions.iterator();
+
+			assertEquals(10, iteratorProducedPartitions.next().getNumberOfSubpartitions());
+			assertEquals(10, iteratorProducedPartitions.next().getNumberOfSubpartitions());
+			assertEquals(10, iteratorConsumedPartitions.next().getInputChannelDeploymentDescriptors().length);
 		}
 		catch (Exception e) {
 			e.printStackTrace();