You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:39 UTC

[15/63] [abbrv] Refactor job graph construction to incremental attachment based

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 82600d2..2d00f40 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -16,81 +16,63 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.deployment;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.io.StringRecord;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.util.SerializableArrayList;
-import org.apache.flink.util.StringUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.types.StringValue;
 
 /**
  * A task deployment descriptor contains all the information necessary to deploy a task on a task manager.
- * <p>
- * This class is not thread-safe in general.
- * 
  */
 public final class TaskDeploymentDescriptor implements IOReadableWritable {
 
-	/**
-	 * The ID of the job the tasks belongs to.
-	 */
+	/** The ID of the job the tasks belongs to. */
 	private final JobID jobID;
 
-	/**
-	 * The task's execution vertex ID.
-	 */
-	private final ExecutionVertexID vertexID;
+	/** The task's job vertex ID. */
+	private final JobVertexID vertexID;
+	
+	/** The ID referencing the attempt to execute the task. */
+	private final ExecutionAttemptID executionId;
 
-	/**
-	 * The task's name.
-	 */
+	/** The task's name. */
 	private String taskName;
 
-	/**
-	 * The task's index in the subtask group.
-	 */
+	/** The task's index in the subtask group. */
 	private int indexInSubtaskGroup;
 
-	/**
-	 * The current number of subtasks.
-	 */
+	/** The current number of subtasks. */
 	private int currentNumberOfSubtasks;
 
-	/**
-	 * The configuration of the job the task belongs to.
-	 */
+	/** The configuration of the job the task belongs to. */
 	private Configuration jobConfiguration;
 
-	/**
-	 * The task's configuration object.
-	 */
+	/** The task's configuration object. */
 	private Configuration taskConfiguration;
 
+	/** The name of the class containing the task code to be executed. */
+	private String invokableClassName;
 
-	/**
-	 * The class containing the task code to be executed.
-	 */
-	private Class<? extends AbstractInvokable> invokableClass;
+	/** The list of output gate deployment descriptors. */
+	private List<GateDeploymentDescriptor> outputGates;
 
-	/**
-	 * The list of output gate deployment descriptors.
-	 */
-	private final SerializableArrayList<GateDeploymentDescriptor> outputGates;
-
-	/**
-	 * The list of input gate deployment descriptors.
-	 */
-	private final SerializableArrayList<GateDeploymentDescriptor> inputGates;
+	/** The list of input gate deployment descriptors. */
+	private List<GateDeploymentDescriptor> inputGates;
+	
+	private String[] requiredJarFiles;
+	
+	private int targetSlotNumber;
 
 	/**
 	 * Constructs a task deployment descriptor.
@@ -109,167 +91,58 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable {
 	 *        the configuration of the job the task belongs to
 	 * @param taskConfiguration
 	 *        the task's configuration object
-	 * @param invokableClass
+	 * @param invokableClassName
 	 *        the class containing the task code to be executed
 	 * @param outputGates
 	 *        list of output gate deployment descriptors
-	 * @param inputGateIDs
-	 *        list of input gate deployment descriptors
 	 */
-	public TaskDeploymentDescriptor(final JobID jobID, final ExecutionVertexID vertexID, final String taskName,
-			final int indexInSubtaskGroup, final int currentNumberOfSubtasks, final Configuration jobConfiguration,
-			final Configuration taskConfiguration, 
-			final Class<? extends AbstractInvokable> invokableClass,
-			final SerializableArrayList<GateDeploymentDescriptor> outputGates,
-			final SerializableArrayList<GateDeploymentDescriptor> inputGates) {
-
-		if (jobID == null) {
-			throw new IllegalArgumentException("Argument jobID must not be null");
-		}
-
-		if (vertexID == null) {
-			throw new IllegalArgumentException("Argument vertexID must not be null");
-		}
-
-		if (taskName == null) {
-			throw new IllegalArgumentException("Argument taskName must not be null");
-		}
-
-		if (indexInSubtaskGroup < 0) {
-			throw new IllegalArgumentException("Argument indexInSubtaskGroup must not be smaller than zero");
+	public TaskDeploymentDescriptor(JobID jobID, JobVertexID vertexID, ExecutionAttemptID execuionId,
+			String taskName, int indexInSubtaskGroup, int currentNumberOfSubtasks, 
+			Configuration jobConfiguration, Configuration taskConfiguration,
+			String invokableClassName,
+			List<GateDeploymentDescriptor> outputGates,
+			List<GateDeploymentDescriptor> inputGates,
+			String[] requiredJarFiles,
+			int targetSlotNumber)
+	{
+		if (jobID == null || vertexID == null || execuionId == null || taskName == null || indexInSubtaskGroup < 0 ||
+				currentNumberOfSubtasks <= indexInSubtaskGroup || jobConfiguration == null ||
+				taskConfiguration == null || invokableClassName == null || outputGates == null || inputGates == null)
+		{
+			throw new IllegalArgumentException();
 		}
-
-		if (currentNumberOfSubtasks < indexInSubtaskGroup) {
-			throw new IllegalArgumentException(
-				"Argument currentNumberOfSubtasks must not be smaller than argument indexInSubtaskGroup");
-		}
-
-		if (jobConfiguration == null) {
-			throw new IllegalArgumentException("Argument jobConfiguration must not be null");
-		}
-
-		if (taskConfiguration == null) {
-			throw new IllegalArgumentException("Argument taskConfiguration must not be null");
-		}
-
-		if (invokableClass == null) {
-			throw new IllegalArgumentException("Argument invokableClass must not be null");
-		}
-
-		if (outputGates == null) {
-			throw new IllegalArgumentException("Argument outputGates must not be null");
-		}
-
-		if (inputGates == null) {
-			throw new IllegalArgumentException("Argument inputGates must not be null");
-		}
-
+		
 		this.jobID = jobID;
 		this.vertexID = vertexID;
+		this.executionId = execuionId;
 		this.taskName = taskName;
 		this.indexInSubtaskGroup = indexInSubtaskGroup;
 		this.currentNumberOfSubtasks = currentNumberOfSubtasks;
 		this.jobConfiguration = jobConfiguration;
 		this.taskConfiguration = taskConfiguration;
-		this.invokableClass = invokableClass;
+		this.invokableClassName = invokableClassName;
 		this.outputGates = outputGates;
 		this.inputGates = inputGates;
+		this.requiredJarFiles = requiredJarFiles == null ? new String[0] : requiredJarFiles;
+		this.targetSlotNumber = targetSlotNumber;
 	}
 
 	/**
 	 * Default constructor for serialization/deserialization.
 	 */
 	public TaskDeploymentDescriptor() {
-
 		this.jobID = new JobID();
-		this.vertexID = new ExecutionVertexID();
-		this.taskName = null;
-		this.indexInSubtaskGroup = 0;
-		this.currentNumberOfSubtasks = 0;
+		this.vertexID = new JobVertexID();
+		this.executionId = new ExecutionAttemptID();
 		this.jobConfiguration = new Configuration();
 		this.taskConfiguration = new Configuration();
-		this.invokableClass = null;
-		this.outputGates = new SerializableArrayList<GateDeploymentDescriptor>();
-		this.inputGates = new SerializableArrayList<GateDeploymentDescriptor>();
-	}
-
-
-	@Override
-	public void write(final DataOutputView out) throws IOException {
-
-		this.jobID.write(out);
-		this.vertexID.write(out);
-		StringRecord.writeString(out, this.taskName);
-		out.writeInt(this.indexInSubtaskGroup);
-		out.writeInt(this.currentNumberOfSubtasks);
-
-		// Write out the names of the required jar files
-		final String[] requiredJarFiles = LibraryCacheManager.getRequiredJarFiles(this.jobID);
-
-		out.writeInt(requiredJarFiles.length);
-		for (int i = 0; i < requiredJarFiles.length; i++) {
-			StringRecord.writeString(out, requiredJarFiles[i]);
-		}
-
-		// Write out the name of the invokable class
-		if (this.invokableClass == null) {
-			throw new IOException("this.invokableClass is null");
-		}
-
-		StringRecord.writeString(out, this.invokableClass.getName());
-
-		this.jobConfiguration.write(out);
-		this.taskConfiguration.write(out);
-
-		this.outputGates.write(out);
-		this.inputGates.write(out);
+		this.outputGates = Collections.emptyList();
+		this.inputGates = Collections.emptyList();
+		this.requiredJarFiles = new String[0];
 	}
 
 
-	@SuppressWarnings("unchecked")
-	@Override
-	public void read(final DataInputView in) throws IOException {
-
-		this.jobID.read(in);
-		this.vertexID.read(in);
-		this.taskName = StringRecord.readString(in);
-		this.indexInSubtaskGroup = in.readInt();
-		this.currentNumberOfSubtasks = in.readInt();
-
-		// Read names of required jar files
-		final String[] requiredJarFiles = new String[in.readInt()];
-		for (int i = 0; i < requiredJarFiles.length; i++) {
-			requiredJarFiles[i] = StringRecord.readString(in);
-		}
-
-		// Now register data with the library manager
-		LibraryCacheManager.register(this.jobID, requiredJarFiles);
-
-		// Get ClassLoader from Library Manager
-		final ClassLoader cl = LibraryCacheManager.getClassLoader(this.jobID);
-
-		// Read the name of the invokable class;
-		final String invokableClassName = StringRecord.readString(in);
-
-		if (invokableClassName == null) {
-			throw new IOException("invokableClassName is null");
-		}
-
-		try {
-			this.invokableClass = (Class<? extends AbstractInvokable>) Class.forName(invokableClassName, true, cl);
-		} catch (ClassNotFoundException cnfe) {
-			throw new IOException("Class " + invokableClassName + " not found in one of the supplied jar files: "
-				+ StringUtils.stringifyException(cnfe));
-		}
-
-		this.jobConfiguration = new Configuration(cl);
-		this.jobConfiguration.read(in);
-		this.taskConfiguration = new Configuration(cl);
-		this.taskConfiguration.read(in);
 
-		this.outputGates.read(in);
-		this.inputGates.read(in);
-	}
 
 	/**
 	 * Returns the ID of the job the tasks belongs to.
@@ -277,7 +150,6 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable {
 	 * @return the ID of the job the tasks belongs to
 	 */
 	public JobID getJobID() {
-
 		return this.jobID;
 	}
 
@@ -286,10 +158,13 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable {
 	 * 
 	 * @return the task's execution vertex ID
 	 */
-	public ExecutionVertexID getVertexID() {
-
+	public JobVertexID getVertexID() {
 		return this.vertexID;
 	}
+	
+	public ExecutionAttemptID getExecutionId() {
+		return executionId;
+	}
 
 	/**
 	 * Returns the task's name.
@@ -297,7 +172,6 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable {
 	 * @return the task's name
 	 */
 	public String getTaskName() {
-
 		return this.taskName;
 	}
 
@@ -307,7 +181,6 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable {
 	 * @return the task's index in the subtask group
 	 */
 	public int getIndexInSubtaskGroup() {
-
 		return this.indexInSubtaskGroup;
 	}
 
@@ -317,7 +190,6 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable {
 	 * @return the current number of subtasks
 	 */
 	public int getCurrentNumberOfSubtasks() {
-
 		return this.currentNumberOfSubtasks;
 	}
 
@@ -327,7 +199,6 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable {
 	 * @return the configuration of the job the tasks belongs to
 	 */
 	public Configuration getJobConfiguration() {
-
 		return this.jobConfiguration;
 	}
 
@@ -337,61 +208,97 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable {
 	 * @return the task's configuration object
 	 */
 	public Configuration getTaskConfiguration() {
-
 		return this.taskConfiguration;
 	}
 
 	/**
-	 * Returns the class containing the task code to be executed.
+	 * Returns the name of the class containing the task code to be executed.
 	 * 
-	 * @return the class containing the task code to be executed
+	 * @return The name of the class containing the task code to be executed
 	 */
-	public Class<? extends AbstractInvokable> getInvokableClass() {
-
-		return this.invokableClass;
+	public String getInvokableClassName() {
+		return this.invokableClassName;
 	}
 
-	/**
-	 * Returns the number of output gate deployment descriptors contained in this task deployment descriptor.
-	 * 
-	 * @return the number of output gate deployment descriptors
-	 */
-	public int getNumberOfOutputGateDescriptors() {
-
-		return this.outputGates.size();
+	public List<GateDeploymentDescriptor> getOutputGates() {
+		return outputGates;
 	}
-
-	/**
-	 * Returns the output gate descriptor with the given index
-	 * 
-	 * @param index
-	 *        the index if the output gate descriptor to return
-	 * @return the output gate descriptor with the given index
-	 */
-	public GateDeploymentDescriptor getOutputGateDescriptor(final int index) {
-
-		return this.outputGates.get(index);
+	
+	public List<GateDeploymentDescriptor> getInputGates() {
+		return inputGates;
 	}
-
-	/**
-	 * Returns the number of output gate deployment descriptors contained in this task deployment descriptor.
-	 * 
-	 * @return the number of output gate deployment descriptors
-	 */
-	public int getNumberOfInputGateDescriptors() {
-
-		return this.inputGates.size();
+	
+	// --------------------------------------------------------------------------------------------
+	//  Serialization
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void write(final DataOutputView out) throws IOException {
+		jobID.write(out);
+		vertexID.write(out);
+		executionId.write(out);
+		
+		StringValue.writeString(taskName, out);
+		StringValue.writeString(invokableClassName, out);
+		
+		out.writeInt(indexInSubtaskGroup);
+		out.writeInt(currentNumberOfSubtasks);
+		out.writeInt(targetSlotNumber);
+
+		jobConfiguration.write(out);
+		taskConfiguration.write(out);
+
+		writeGateList(inputGates, out);
+		writeGateList(outputGates, out);
+				
+		out.writeInt(requiredJarFiles.length);
+		for (int i = 0; i < requiredJarFiles.length; i++) {
+			StringValue.writeString(requiredJarFiles[i], out);
+		}
 	}
 
-	/**
-	 * Returns the input gate descriptor with the given index
-	 * 
-	 * @param index
-	 *        the index if the input gate descriptor to return
-	 * @return the input gate descriptor with the given index
-	 */
-	public GateDeploymentDescriptor getInputGateDescriptor(final int index) {
-
-		return this.inputGates.get(index);
+	@Override
+	public void read(DataInputView in) throws IOException {
+		jobID.read(in);
+		vertexID.read(in);
+		executionId.read(in);
+		
+		taskName = StringValue.readString(in);
+		invokableClassName = StringValue.readString(in);
+		
+		indexInSubtaskGroup = in.readInt();
+		currentNumberOfSubtasks = in.readInt();
+		targetSlotNumber = in.readInt();
+		
+		jobConfiguration.read(in);
+		taskConfiguration.read(in);
+
+		inputGates = readGateList(in);
+		outputGates = readGateList(in);
+
+		String[] jarFiles = new String[in.readInt()];
+		for (int i = 0; i < jarFiles.length; i++) {
+			jarFiles[i] = StringValue.readString(in);
+		}
+	}
+	
+	private static final void writeGateList(List<GateDeploymentDescriptor> list, DataOutputView out) throws IOException {
+		out.writeInt(list.size());
+		for (GateDeploymentDescriptor gdd : list) {
+			gdd.write(out);
+		}
+	}
+	
+	private static final List<GateDeploymentDescriptor> readGateList(DataInputView in) throws IOException {
+		final int len = in.readInt();
+		ArrayList<GateDeploymentDescriptor> list = new ArrayList<GateDeploymentDescriptor>(len);
+		
+		for (int i = 0; i < len; i++) {
+			GateDeploymentDescriptor gdd = new GateDeploymentDescriptor();
+			gdd.read(in);
+			list.add(gdd);
+		}
+		
+		return list;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/AbstractEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/AbstractEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/AbstractEvent.java
index 61970da..d7e4712 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/AbstractEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/AbstractEvent.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.event.job;
 
 import java.io.IOException;
@@ -30,29 +29,20 @@ import org.apache.flink.core.memory.DataOutputView;
 
 /**
  * An abstract event is transmitted from the job manager to the
- * job client in order to inform the user about the job progress.
- * 
+ * job client in order to inform the user about the job progress
  */
 public abstract class AbstractEvent implements IOReadableWritable {
 
-	/**
-	 * Static variable that points to the current global sequence number
-	 */
+	/** Static variable that points to the current global sequence number */
 	private static final AtomicLong GLOBAL_SEQUENCE_NUMBER = new AtomicLong(0);
 
-	/**
-	 * Auxiliary object which helps to convert a {@link Date} object to the given string representation.
-	 */
+	/** Auxiliary object which helps to convert a {@link Date} object to the given string representation. */
 	private static final SimpleDateFormat DATA_FORMATTER = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss");
 
-	/**
-	 * The timestamp of the event.
-	 */
+	/** The timestamp of the event. */
 	private long timestamp = -1;
 
-	/**
-	 * The sequence number of the event.
-	 */
+	/** The sequence number of the event. */
 	private long sequenceNumber = -1;
 
 	/**
@@ -61,7 +51,7 @@ public abstract class AbstractEvent implements IOReadableWritable {
 	 * @param timestamp
 	 *        the timestamp of the event.
 	 */
-	public AbstractEvent(final long timestamp) {
+	public AbstractEvent(long timestamp) {
 		this.timestamp = timestamp;
 		this.sequenceNumber = GLOBAL_SEQUENCE_NUMBER.incrementAndGet();
 	}
@@ -75,23 +65,17 @@ public abstract class AbstractEvent implements IOReadableWritable {
 	 * is required for the deserialization process and is not
 	 * supposed to be called directly.
 	 */
-	public AbstractEvent() {
-	}
+	public AbstractEvent() {}
 
 
 	@Override
-	public void read(final DataInputView in) throws IOException {
-
-		// Read the timestamp
+	public void read(DataInputView in) throws IOException {
 		this.timestamp = in.readLong();
 		this.sequenceNumber = in.readLong();
 	}
 
-
 	@Override
-	public void write(final DataOutputView out) throws IOException {
-
-		// Write the timestamp
+	public void write(DataOutputView out) throws IOException {
 		out.writeLong(this.timestamp);
 		out.writeLong(this.sequenceNumber);
 	}
@@ -113,18 +97,14 @@ public abstract class AbstractEvent implements IOReadableWritable {
 	 *        the timestamp in milliseconds since the beginning of "the epoch"
 	 * @return the string unified representation of the timestamp
 	 */
-	public static String timestampToString(final long timestamp) {
-
+	public static String timestampToString(long timestamp) {
 		return DATA_FORMATTER.format(new Date(timestamp));
-
 	}
 
 
 	@Override
 	public boolean equals(final Object obj) {
-
 		if (obj instanceof AbstractEvent) {
-
 			final AbstractEvent abstractEvent = (AbstractEvent) obj;
 			if (this.timestamp == abstractEvent.getTimestamp()) {
 				return true;
@@ -137,7 +117,6 @@ public abstract class AbstractEvent implements IOReadableWritable {
 
 	@Override
 	public int hashCode() {
-
-		return (int) (this.timestamp % Integer.MAX_VALUE);
+		return (int) (this.timestamp ^ (this.timestamp >>> 32));
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ExecutionStateChangeEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ExecutionStateChangeEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ExecutionStateChangeEvent.java
index bb55eac..15bae60 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ExecutionStateChangeEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ExecutionStateChangeEvent.java
@@ -23,13 +23,11 @@ import java.io.IOException;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.ExecutionState2;
 import org.apache.flink.runtime.managementgraph.ManagementVertexID;
-import org.apache.flink.runtime.util.EnumUtils;
 
 /**
- * An {@link ExecutionStateChangeEvent} can be used to notify other objects about an execution state change of a vertex.
- * 
+ * An {@link ExecutionStateChangeEvent} can be used to notify other objects about an execution state change of a vertex. 
  */
 public final class ExecutionStateChangeEvent extends AbstractEvent implements ManagementEvent {
 
@@ -41,7 +39,7 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma
 	/**
 	 * The new execution state of the vertex this event refers to.
 	 */
-	private ExecutionState newExecutionState;
+	private ExecutionState2 newExecutionState;
 
 	/**
 	 * Constructs a new vertex event object.
@@ -53,8 +51,7 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma
 	 * @param newExecutionState
 	 *        the new execution state of the vertex this event refers to
 	 */
-	public ExecutionStateChangeEvent(final long timestamp, final ManagementVertexID managementVertexID,
-			final ExecutionState newExecutionState) {
+	public ExecutionStateChangeEvent(long timestamp, ManagementVertexID managementVertexID, ExecutionState2 newExecutionState) {
 		super(timestamp);
 		this.managementVertexID = managementVertexID;
 		this.newExecutionState = newExecutionState;
@@ -69,7 +66,7 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma
 		super();
 
 		this.managementVertexID = new ManagementVertexID();
-		this.newExecutionState = ExecutionState.CREATED;
+		this.newExecutionState = ExecutionState2.CREATED;
 	}
 
 	/**
@@ -86,28 +83,24 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma
 	 * 
 	 * @return the new execution state of the vertex this event refers to
 	 */
-	public ExecutionState getNewExecutionState() {
+	public ExecutionState2 getNewExecutionState() {
 		return this.newExecutionState;
 	}
 
 
 	@Override
-	public void read(final DataInputView in) throws IOException {
-
+	public void read(DataInputView in) throws IOException {
 		super.read(in);
-
 		this.managementVertexID.read(in);
-		this.newExecutionState = EnumUtils.readEnum(in, ExecutionState.class);
+		this.newExecutionState = ExecutionState2.values()[in.readInt()];
 	}
 
 
 	@Override
-	public void write(final DataOutputView out) throws IOException {
-
+	public void write(DataOutputView out) throws IOException {
 		super.write(out);
-
 		this.managementVertexID.write(out);
-		EnumUtils.writeEnum(out, this.newExecutionState);
+		out.writeInt(this.newExecutionState.ordinal());
 	}
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/JobEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/JobEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/JobEvent.java
index 210fb54..2f32686 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/JobEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/JobEvent.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.event.job;
 
 import java.io.IOException;
@@ -30,18 +29,13 @@ import org.apache.flink.runtime.util.EnumUtils;
 /**
  * A job event object is used by the job manager to inform a client about
  * changes of the job's status.
- * 
  */
 public class JobEvent extends AbstractEvent {
 
-	/**
-	 * The current status of the job.
-	 */
+	/** The current status of the job. */
 	private JobStatus currentJobStatus;
 
-	/**
-	 * An optional message attached to the event, possibly <code>null</code>.
-	 */
+	/** An optional message attached to the event, possibly <code>null</code>. */
 	private String optionalMessage = null;
 
 	/**
@@ -68,8 +62,7 @@ public class JobEvent extends AbstractEvent {
 	 */
 	public JobEvent() {
 		super();
-
-		this.currentJobStatus = JobStatus.SCHEDULED;
+		this.currentJobStatus = JobStatus.CREATED;
 	}
 
 
@@ -111,20 +104,17 @@ public class JobEvent extends AbstractEvent {
 	 * @return the optional message, possibly <code>null</code>.
 	 */
 	public String getOptionalMessage() {
-
 		return this.optionalMessage;
 	}
 
 
 	public String toString() {
-
 		return timestampToString(getTimestamp()) + ":\tJob execution switched to status " + this.currentJobStatus;
 	}
 
 
 	@Override
 	public boolean equals(final Object obj) {
-
 		if (!super.equals(obj)) {
 			return false;
 		}
@@ -154,7 +144,6 @@ public class JobEvent extends AbstractEvent {
 
 	@Override
 	public int hashCode() {
-
 		return super.hashCode();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ManagementEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ManagementEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ManagementEvent.java
index 4b33f47..695ae5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ManagementEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ManagementEvent.java
@@ -16,14 +16,10 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.event.job;
 
 /**
  * This is a marker interface only. It marks events which
- * should only be accessible via the {@link ExtendedManagementProtocol}.
- * 
+ * should only be accessible via the {@link org.apache.flink.runtime.protocols.ExtendedManagementProtocol}.
  */
-public interface ManagementEvent {
-
-}
+public interface ManagementEvent {}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexAssignmentEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexAssignmentEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexAssignmentEvent.java
deleted file mode 100644
index 0e048dd..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexAssignmentEvent.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.event.job;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.managementgraph.ManagementVertexID;
-
-/**
- * A {@link VertexAssignmentEvent} can be used to notify other objects about changes in the assignment of vertices to
- * instances.
- * 
- */
-public final class VertexAssignmentEvent extends AbstractEvent implements ManagementEvent {
-
-	/**
-	 * The ID identifies the vertex this events refers to.
-	 */
-	private ManagementVertexID managementVertexID;
-
-	/**
-	 * The name of the instance the vertex is now assigned to.
-	 */
-	private String instanceName;
-
-	/**
-	 * Constructs a new event.
-	 * 
-	 * @param timestamp
-	 *        the timestamp of the event
-	 * @param managementVertexID
-	 *        identifies the vertex this event refers to
-	 * @param instanceName
-	 *        the name of the instance the vertex is now assigned to
-	 */
-	public VertexAssignmentEvent(final long timestamp, final ManagementVertexID managementVertexID,
-			final String instanceName) {
-		super(timestamp);
-
-		this.managementVertexID = managementVertexID;
-		this.instanceName = instanceName;
-	}
-
-	/**
-	 * Constructor for serialization/deserialization. Should not be called on other occasions.
-	 */
-	public VertexAssignmentEvent() {
-		super();
-
-		this.managementVertexID = new ManagementVertexID();
-	}
-
-	/**
-	 * Returns the ID of the vertex this event refers to.
-	 * 
-	 * @return the ID of the vertex this event refers to
-	 */
-	public ManagementVertexID getVertexID() {
-		return this.managementVertexID;
-	}
-
-	/**
-	 * Returns the name of the instance the vertex is now assigned to.
-	 * 
-	 * @return the name of the instance the vertex is now assigned to
-	 */
-	public String getInstanceName() {
-		return this.instanceName;
-	}
-
-	@Override
-	public void read(final DataInputView in) throws IOException {
-
-		super.read(in);
-
-		this.managementVertexID.read(in);
-		this.instanceName = StringRecord.readString(in);
-	}
-
-
-	@Override
-	public void write(final DataOutputView out) throws IOException {
-
-		super.write(out);
-
-		this.managementVertexID.write(out);
-		StringRecord.writeString(out, this.instanceName);
-	}
-
-
-	@Override
-	public boolean equals(final Object obj) {
-
-		if (!super.equals(obj)) {
-			return false;
-		}
-
-		if (!(obj instanceof VertexAssignmentEvent)) {
-			return false;
-		}
-
-		final VertexAssignmentEvent vae = (VertexAssignmentEvent) obj;
-
-		if (!this.managementVertexID.equals(vae.getVertexID())) {
-			return false;
-		}
-
-		if (this.instanceName == null) {
-			if (vae.getInstanceName() != null) {
-				return false;
-			}
-		} else {
-			if (!this.instanceName.equals(vae.getInstanceName())) {
-				return false;
-			}
-		}
-
-		return true;
-	}
-
-
-	@Override
-	public int hashCode() {
-
-		if (this.managementVertexID != null) {
-			return this.managementVertexID.hashCode();
-		}
-
-		return super.hashCode();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexEvent.java
index 69ab16d..fb3b247 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexEvent.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.event.job;
 
 import java.io.IOException;
@@ -24,46 +23,31 @@ import java.io.IOException;
 import org.apache.flink.core.io.StringRecord;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.ExecutionState2;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.util.EnumUtils;
 
 /**
- * Vertex events are transmitted from the job manager
- * to the job client in order to inform the user about
+ * Vertex events are transmitted from the job manager to the job client in order to inform the user about
  * changes in terms of a tasks execution state.
- * 
  */
 public class VertexEvent extends AbstractEvent {
 
-	/**
-	 * The ID of the job vertex this event belongs to.
-	 */
+	/** The ID of the job vertex this event belongs to. */
 	private JobVertexID jobVertexID;
 
-	/**
-	 * The name of the job vertex this event belongs to.
-	 */
+	/** The name of the job vertex this event belongs to. */
 	private String jobVertexName;
 
-	/**
-	 * The number of subtasks the corresponding vertex has been split into at runtime.
-	 */
+	/** The number of subtasks the corresponding vertex has been split into at runtime. */
 	private int totalNumberOfSubtasks;
 
-	/**
-	 * The index of the subtask that this event belongs to.
-	 */
+	/** The index of the subtask that this event belongs to. */
 	private int indexOfSubtask;
 
-	/**
-	 * The current execution state of the subtask this event belongs to.
-	 */
-	private ExecutionState currentExecutionState;
+	/** The current execution state of the subtask this event belongs to. */
+	private ExecutionState2 currentExecutionState;
 
-	/**
-	 * An optional more detailed description of the event.
-	 */
+	/** An optional more detailed description of the event. */
 	private String description;
 
 	/**
@@ -84,10 +68,12 @@ public class VertexEvent extends AbstractEvent {
 	 * @param description
 	 *        an optional description
 	 */
-	public VertexEvent(final long timestamp, final JobVertexID jobVertexID, final String jobVertexName,
-			final int totalNumberOfSubtasks, final int indexOfSubtask, final ExecutionState currentExecutionState,
-			final String description) {
+	public VertexEvent(long timestamp, JobVertexID jobVertexID, String jobVertexName,
+			int totalNumberOfSubtasks, int indexOfSubtask, ExecutionState2 currentExecutionState,
+			String description)
+	{
 		super(timestamp);
+		
 		this.jobVertexID = jobVertexID;
 		this.jobVertexName = jobVertexName;
 		this.totalNumberOfSubtasks = totalNumberOfSubtasks;
@@ -108,7 +94,7 @@ public class VertexEvent extends AbstractEvent {
 		this.jobVertexName = null;
 		this.totalNumberOfSubtasks = -1;
 		this.indexOfSubtask = -1;
-		this.currentExecutionState = ExecutionState.CREATED;
+		this.currentExecutionState = ExecutionState2.CREATED;
 		this.description = null;
 	}
 
@@ -122,7 +108,7 @@ public class VertexEvent extends AbstractEvent {
 		this.jobVertexName = StringRecord.readString(in);
 		this.totalNumberOfSubtasks = in.readInt();
 		this.indexOfSubtask = in.readInt();
-		this.currentExecutionState = EnumUtils.readEnum(in, ExecutionState.class);
+		this.currentExecutionState = ExecutionState2.values()[in.readInt()];
 		this.description = StringRecord.readString(in);
 	}
 
@@ -136,7 +122,7 @@ public class VertexEvent extends AbstractEvent {
 		StringRecord.writeString(out, this.jobVertexName);
 		out.writeInt(this.totalNumberOfSubtasks);
 		out.writeInt(this.indexOfSubtask);
-		EnumUtils.writeEnum(out, this.currentExecutionState);
+		out.writeInt(this.currentExecutionState.ordinal());
 		StringRecord.writeString(out, this.description);
 	}
 
@@ -183,7 +169,7 @@ public class VertexEvent extends AbstractEvent {
 	 * 
 	 * @return the current execution state of the subtask this event belongs to
 	 */
-	public ExecutionState getCurrentExecutionState() {
+	public ExecutionState2 getCurrentExecutionState() {
 		return currentExecutionState;
 	}
 
@@ -261,7 +247,6 @@ public class VertexEvent extends AbstractEvent {
 
 	@Override
 	public int hashCode() {
-
 		return super.hashCode();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractEvent.java
index e18b1eb..22c0620 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractEvent.java
@@ -16,17 +16,13 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.event.task;
 
 import org.apache.flink.core.io.IOReadableWritable;
 
 /**
  * This type of event can be used to exchange notification messages between
- * different {@link TaskManager} objects at runtime using the communication
- * channels Nephele has established between different tasks.
- * 
+ * different {@link org.apache.flink.runtime.taskmanager.TaskManager} objects
+ * at runtime using the communication channels.
  */
-public abstract class AbstractEvent implements IOReadableWritable {
-
-}
+public abstract class AbstractEvent implements IOReadableWritable {}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 51ba96c..06c9fb6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -82,20 +82,6 @@ public interface Environment {
 	int getIndexInSubtaskGroup();
 
 	/**
-	 * Sends a notification that objects that a new user thread has been started to the execution observer.
-	 *
-	 * @param userThread the user thread which has been started
-	 */
-	void userThreadStarted(Thread userThread);
-
-	/**
-	 * Sends a notification that a user thread has finished to the execution observer.
-	 *
-	 * @param userThread the user thread which has finished
-	 */
-	void userThreadFinished(Thread userThread);
-
-	/**
 	 * Returns the input split provider assigned to this environment.
 	 *
 	 * @return the input split provider or <code>null</code> if no such provider has been assigned to this environment.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionAttempt.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionAttempt.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionAttempt.java
new file mode 100644
index 0000000..9b39851
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionAttempt.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.execution;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+/**
+ * An attempt to execute a task for a {@link ExecutionVertex2}.
+ */
+public class ExecutionAttempt implements java.io.Serializable {
+	
+	private static final long serialVersionUID = 1L;
+	
+
+	private final JobVertexID vertexId;
+	
+	private final int subtaskIndex;
+	
+	private final ExecutionAttemptID executionId;
+	
+	private final int attempt;
+
+	// --------------------------------------------------------------------------------------------
+	
+	public ExecutionAttempt(JobVertexID vertexId, int subtaskIndex, ExecutionAttemptID executionId, int attempt) {
+		if (vertexId == null || executionId == null || subtaskIndex < 0 || attempt < 1) {
+			throw new IllegalArgumentException();
+		}
+		
+		this.vertexId = vertexId;
+		this.subtaskIndex = subtaskIndex;
+		this.executionId = executionId;
+		this.attempt = attempt;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public JobVertexID getVertexId() {
+		return vertexId;
+	}
+	
+	public int getSubtaskIndex() {
+		return subtaskIndex;
+	}
+	
+	public ExecutionAttemptID getExecutionId() {
+		return executionId;
+	}
+	
+	public int getAttempt() {
+		return attempt;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		return vertexId.hashCode() +
+				executionId.hashCode() +
+				31 * subtaskIndex +
+				17 * attempt;
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof ExecutionAttempt) {
+			ExecutionAttempt other = (ExecutionAttempt) obj;
+			return this.executionId.equals(other.executionId) &&
+					this.vertexId.equals(other.vertexId) &&
+					this.subtaskIndex == other.subtaskIndex &&
+					this.attempt == other.attempt;
+		} else {
+			return false;
+		}
+	}
+	
+	@Override
+	public String toString() {
+		return String.format("ExecutionAttempt (vertex=%s, subtask=%d, executionAttemptId=%s, attempt=%d)",
+				vertexId, subtaskIndex, executionId, attempt);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
index ded630f..b08c847 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
@@ -18,55 +18,16 @@
 
 package org.apache.flink.runtime.execution;
 
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 /**
- * This interface must be implemented by classes which should be able to receive notifications about
+ * Implementing this interface allows classes to receive notifications about
  * changes of a task's execution state.
  */
 public interface ExecutionListener {
 
-	/**
-	 * Called when the execution state of the associated task has changed. It is important to point out that multiple
-	 * execution listeners can be invoked as a reaction to a state change, according to their priority. As a result, the
-	 * value of <code>newExecutionState</code> may be out-dated by the time a particular execution listener is called.
-	 * To determine the most recent state of the respective task, it is recommended to store a reference on the
-	 * execution that represents it and then call <code>getExecutionState()</code> on the vertex within this method.
-	 * 
-	 * @param jobID
-	 *        the ID of the job the task belongs to
-	 * @param vertexID
-	 *        the ID of the task whose state has changed
-	 * @param newExecutionState
-	 *        the execution state the task has just switched to
-	 * @param optionalMessage
-	 *        an optional message providing further information on the state change
-	 */
-	void executionStateChanged(JobID jobID, ExecutionVertexID vertexID, ExecutionState newExecutionState,
-			String optionalMessage);
-
-	/**
-	 * Called when the user task has started a new thread.
-	 * 
-	 * @param jobID
-	 *        the ID of the job the task belongs to
-	 * @param vertexID
-	 *        the ID of the task that started of new thread
-	 * @param userThread
-	 *        the user thread which has been started
-	 */
-	void userThreadStarted(JobID jobID, ExecutionVertexID vertexID, Thread userThread);
-
-	/**
-	 * Called when a thread spawn by a user task has finished.
-	 * 
-	 * @param jobID
-	 *        the ID of the job the task belongs to
-	 * @param vertexID
-	 *        the ID of the task whose thread has finished
-	 * @param userThread
-	 *        the user thread which has finished
-	 */
-	void userThreadFinished(JobID jobID, ExecutionVertexID vertexID, Thread userThread);
+	void executionStateChanged(JobID jobID, JobVertexID vertexId, int subtask, ExecutionAttemptID executionId,
+			ExecutionState2 newExecutionState, String optionalMessage);
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionObserver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionObserver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionObserver.java
index c3b9b72..20a6180 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionObserver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionObserver.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.execution;
 
 public interface ExecutionObserver {
@@ -29,23 +28,7 @@ public interface ExecutionObserver {
 	 * @param optionalMessage
 	 *        an optional message providing further information on the state change
 	 */
-	void executionStateChanged(ExecutionState newExecutionState, String optionalMessage);
-
-	/**
-	 * Called when the user task has started a new thread.
-	 * 
-	 * @param userThread
-	 *        the user thread which has been started
-	 */
-	void userThreadStarted(Thread userThread);
-
-	/**
-	 * Called when a thread spawn by a user task has finished.
-	 * 
-	 * @param userThread
-	 *        the user thread which has finished
-	 */
-	void userThreadFinished(Thread userThread);
+	void executionStateChanged(ExecutionState2 newExecutionState, String optionalMessage);
 
 	/**
 	 * Returns whether the task has been canceled.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState2.java
new file mode 100644
index 0000000..c2b2070
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState2.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.execution;
+
+public enum ExecutionState2 {
+
+	CREATED,
+	
+	SCHEDULED,
+	
+	DEPLOYING,
+	
+	RUNNING,
+	
+	FINISHED,
+	
+	CANCELING,
+	
+	CANCELED,
+	
+	FAILED
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionStateTransition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionStateTransition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionStateTransition.java
deleted file mode 100644
index 98557f9..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionStateTransition.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.execution;
-
-import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
-import static org.apache.flink.runtime.execution.ExecutionState.CANCELING;
-import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class is a utility class to check the consistency of Nephele's execution state model.
- * 
- */
-public final class ExecutionStateTransition {
-
-	/**
-	 * The log object used for debugging.
-	 */
-	private static final Logger LOG = LoggerFactory.getLogger(ExecutionStateTransition.class);
-
-	/**
-	 * Private constructor to prevent instantiation of object.
-	 */
-	private ExecutionStateTransition() {
-	}
-
-	/**
-	 * Checks the transition of the execution state and outputs an error in case of an unexpected state transition.
-	 * 
-	 * @param jobManager
-	 *        <code>true</code> to indicate the method is called by the job manager,
-	 *        <code>false/<code> to indicate it is called by a task manager
-	 * @param taskName
-	 *        the name of the task whose execution has changed
-	 * @param oldState
-	 *        the old execution state
-	 * @param newState
-	 *        the new execution state
-	 */
-	public static void checkTransition(boolean jobManager, String taskName, ExecutionState oldState, ExecutionState newState) {
-
-		LOG.info((jobManager ? "JM: " : "TM: ") + "ExecutionState set from " + oldState + " to " + newState + " for task " + taskName);
-
-		boolean unexpectedStateChange = true;
-
-		// This is the regular life cycle of a task
-		if (oldState == ExecutionState.CREATED && newState == ExecutionState.SCHEDULED) {
-			unexpectedStateChange = false;
-		}
-		else if (oldState == ExecutionState.SCHEDULED && newState == ExecutionState.ASSIGNED) {
-			unexpectedStateChange = false;
-		}
-		else if (oldState == ExecutionState.ASSIGNED && newState == ExecutionState.READY) {
-			unexpectedStateChange = false;
-		}
-		else if (oldState == ExecutionState.READY && newState == ExecutionState.STARTING) {
-			unexpectedStateChange = false;
-		}
-		else if (oldState == ExecutionState.STARTING && newState == ExecutionState.RUNNING) {
-			unexpectedStateChange = false;
-		}
-		else if (oldState == ExecutionState.RUNNING && newState == ExecutionState.FINISHING) {
-			unexpectedStateChange = false;
-		}
-		else if (oldState == ExecutionState.FINISHING && newState == ExecutionState.FINISHED) {
-			unexpectedStateChange = false;
-		}
-
-		// A vertex might skip the SCHEDULED state if its resource has been allocated in a previous stage.
-		else if (oldState == ExecutionState.CREATED && newState == ExecutionState.ASSIGNED) {
-			unexpectedStateChange = false;
-		}
-
-		// This transition can appear if a task in a stage which is not yet executed gets canceled.
-		else if (oldState == ExecutionState.SCHEDULED && newState == ExecutionState.CANCELING) {
-			unexpectedStateChange = false;
-		}
-
-		// This transition can appear if a task in a stage which is not yet executed gets canceled.
-		else if (oldState == ExecutionState.ASSIGNED && newState == ExecutionState.CANCELING) {
-			unexpectedStateChange = false;
-		}
-
-		// This transition can appear if a task is canceled that is not yet running on the task manager.
-		else if (oldState == ExecutionState.READY && newState == ExecutionState.CANCELING) {
-			unexpectedStateChange = false;
-		}
-
-		// -------------- error cases --------------
-		else if (newState == FAILED || newState == CANCELED || newState == CANCELING) {
-			// any state may fail or cancel itself
-			unexpectedStateChange = false;
-		}
-
-		if (unexpectedStateChange) {
-			LOG.error("Unexpected state change: " + oldState + " -> " + newState);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index 6bfaf2a..cb7b290 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -16,16 +16,29 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.execution;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.Buffer;
 import org.apache.flink.runtime.io.network.bufferprovider.BufferAvailabilityListener;
@@ -43,177 +56,139 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.protocols.AccumulatorProtocol;
+import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.util.StringUtils;
 
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.FutureTask;
+import com.google.common.base.Preconditions;
+
 
-/**
- * The user code of every Nephele task runs inside a <code>RuntimeEnvironment</code> object. The environment provides
- * important services to the task. It keeps track of setting up the communication channels and provides access to input
- * splits, memory manager, etc.
- * <p/>
- * This class is thread-safe.
- */
 public class RuntimeEnvironment implements Environment, BufferProvider, LocalBufferPoolOwner, Runnable {
 
-	/**
-	 * The log object used for debugging.
-	 */
-	private static final Logger LOG = LoggerFactory.getLogger(RuntimeEnvironment.class);
+	/** The log object used for debugging. */
+	private static final Log LOG = LogFactory.getLog(RuntimeEnvironment.class);
 
-	/**
-	 * The interval to sleep in case a communication channel is not yet entirely set up (in milliseconds).
-	 */
+	/** The interval to sleep in case a communication channel is not yet entirely set up (in milliseconds). */
 	private static final int SLEEPINTERVAL = 100;
+	
+	// --------------------------------------------------------------------------------------------
 
-	/**
-	 * List of output gates created by the task.
-	 */
-	private final List<OutputGate> outputGates = new CopyOnWriteArrayList<OutputGate>();
-
-	/**
-	 * List of input gates created by the task.
-	 */
-	private final List<InputGate<? extends IOReadableWritable>> inputGates = new CopyOnWriteArrayList<InputGate<? extends IOReadableWritable>>();
-
-	/**
-	 * Queue of unbound input gate IDs which are required for deserializing an environment in the course of an RPC
-	 * call.
-	 */
-	private final Queue<GateID> unboundInputGateIDs = new ArrayDeque<GateID>();
-
-	/**
-	 * The memory manager of the current environment (currently the one associated with the executing TaskManager).
-	 */
-	private final MemoryManager memoryManager;
-
-	/**
-	 * The io manager of the current environment (currently the one associated with the executing TaskManager).
-	 */
-	private final IOManager ioManager;
+	/** The task that owns this environment */
+	private final Task owner;
+	
+	
+	/** The job configuration encapsulated in the environment object. */
+	private final Configuration jobConfiguration;
 
-	/**
-	 * Class of the task to run in this environment.
-	 */
+	/** The task configuration encapsulated in the environment object. */
+	private final Configuration taskConfiguration;
+	
+	
+	/** ClassLoader for all user code classes */
+	private final ClassLoader userCodeClassLoader;
+	
+	/** Class of the task to run in this environment. */
 	private final Class<? extends AbstractInvokable> invokableClass;
 
-	/**
-	 * Instance of the class to be run in this environment.
-	 */
+	/** Instance of the class to be run in this environment. */
 	private final AbstractInvokable invokable;
+	
+	
+	/** List of output gates created by the task. */
+	private final ArrayList<OutputGate> outputGates = new ArrayList<OutputGate>();
 
-	/**
-	 * The ID of the job this task belongs to.
-	 */
-	private final JobID jobID;
+	/** List of input gates created by the task. */
+	private final ArrayList<InputGate<? extends IOReadableWritable>> inputGates = new ArrayList<InputGate<? extends IOReadableWritable>>();
 
-	/**
-	 * The job configuration encapsulated in the environment object.
-	 */
-	private final Configuration jobConfiguration;
+	/** Unbound input gate IDs which are required for deserializing an environment in the course of an RPC call. */
+	private final Queue<GateID> unboundInputGateIDs = new ArrayDeque<GateID>();
 
-	/**
-	 * The task configuration encapsulated in the environment object.
-	 */
-	private final Configuration taskConfiguration;
+	/** The memory manager of the current environment (currently the one associated with the executing TaskManager). */
+	private final MemoryManager memoryManager;
 
-	/**
-	 * The input split provider that can be queried for new input splits.
-	 */
+	/** The I/O manager of the current environment (currently the one associated with the executing TaskManager). */
+	private final IOManager ioManager;
+
+	/** The input split provider that can be queried for new input splits.  */
 	private final InputSplitProvider inputSplitProvider;
 
-	/**
-	 * The observer object for the task's execution.
-	 */
-	private volatile ExecutionObserver executionObserver = null;
 	
-	/**
-	 * The thread executing the task in the environment.
-	 */
+	/** The thread executing the task in the environment. */
 	private volatile Thread executingThread;
 
 	/**
 	 * The RPC proxy to report accumulators to JobManager
 	 */
-	private AccumulatorProtocol accumulatorProtocolProxy = null;
-
-	/**
-	 * The index of this subtask in the subtask group.
-	 */
-	private final int indexInSubtaskGroup;
-
-	/**
-	 * The current number of subtasks the respective task is split into.
-	 */
-	private final int currentNumberOfSubtasks;
-
-	/**
-	 * The name of the task running in this environment.
-	 */
-	private final String taskName;
+	private final AccumulatorProtocol accumulatorProtocolProxy;
 
+	private final Map<String,FutureTask<Path>> cacheCopyTasks = new HashMap<String, FutureTask<Path>>();
+	
 	private LocalBufferPool outputBufferPool;
-
-	private final Map<String,FutureTask<Path>> cacheCopyTasks;
 	
-	private volatile boolean canceled;
+	private AtomicBoolean canceled = new AtomicBoolean();
+
+
+	public RuntimeEnvironment(Task owner, TaskDeploymentDescriptor tdd,
+							ClassLoader userCodeClassLoader,
+							MemoryManager memoryManager, IOManager ioManager,
+							InputSplitProvider inputSplitProvider,
+							AccumulatorProtocol accumulatorProtocolProxy)
+		throws Exception
+	{
+		Preconditions.checkNotNull(owner);
+		Preconditions.checkNotNull(memoryManager);
+		Preconditions.checkNotNull(ioManager);
+		Preconditions.checkNotNull(inputSplitProvider);
+		Preconditions.checkNotNull(accumulatorProtocolProxy);
+		Preconditions.checkNotNull(userCodeClassLoader);
+		
+		this.owner = owner;
 
-	/**
-	 * Constructs a runtime environment from a task deployment description.
-	 * 
-	 * @param tdd
-	 *        the task deployment description
-	 * @param memoryManager
-	 *        the task manager's memory manager component
-	 * @param ioManager
-	 *        the task manager's I/O manager component
-	 * @param inputSplitProvider
-	 *        the input split provider for this environment
-	 * @throws Exception
-	 *         thrown if an error occurs while instantiating the invokable class
-	 */
-	public RuntimeEnvironment(final TaskDeploymentDescriptor tdd,
-							final MemoryManager memoryManager, final IOManager ioManager,
-							final InputSplitProvider inputSplitProvider,
-							AccumulatorProtocol accumulatorProtocolProxy, Map<String, FutureTask<Path>> cpTasks) throws Exception {
-
-		this.jobID = tdd.getJobID();
-		this.taskName = tdd.getTaskName();
-		this.invokableClass = tdd.getInvokableClass();
-		this.jobConfiguration = tdd.getJobConfiguration();
-		this.taskConfiguration = tdd.getTaskConfiguration();
-		this.indexInSubtaskGroup = tdd.getIndexInSubtaskGroup();
-		this.currentNumberOfSubtasks = tdd.getCurrentNumberOfSubtasks();
 		this.memoryManager = memoryManager;
 		this.ioManager = ioManager;
 		this.inputSplitProvider = inputSplitProvider;
 		this.accumulatorProtocolProxy = accumulatorProtocolProxy;
-		this.cacheCopyTasks = cpTasks;
 
-		this.invokable = this.invokableClass.newInstance();
+		// load and instantiate the invokable class
+		this.userCodeClassLoader = userCodeClassLoader;
+		try {
+			final String className = tdd.getInvokableClassName();
+			this.invokableClass = Class.forName(className, true, userCodeClassLoader).asSubclass(AbstractInvokable.class);
+		}
+		catch (Throwable t) {
+			throw new Exception("Could not load invokable class.", t);
+		}
+		
+		try {
+			this.invokable = this.invokableClass.newInstance();
+		}
+		catch (Throwable t) {
+			throw new Exception("Could not instantiate the invokable class.", t);
+		}
+		
+		this.jobConfiguration = tdd.getJobConfiguration();
+		this.taskConfiguration = tdd.getTaskConfiguration();
+		
 		this.invokable.setEnvironment(this);
 		this.invokable.registerInputOutput();
 
-		int numOutputGates = tdd.getNumberOfOutputGateDescriptors();
-
-		for (int i = 0; i < numOutputGates; ++i) {
-			this.outputGates.get(i).initializeChannels(tdd.getOutputGateDescriptor(i));
+		List<GateDeploymentDescriptor> inGates = tdd.getInputGates();
+		List<GateDeploymentDescriptor> outGates = tdd.getOutputGates();
+		
+		
+		if (this.inputGates.size() != inGates.size()) {
+			throw new Exception("The number of readers created in 'registerInputOutput()' "
+					+ "is different than the number of connected incoming edges in the job graph.");
 		}
-
-		int numInputGates = tdd.getNumberOfInputGateDescriptors();
-
-		for (int i = 0; i < numInputGates; i++) {
-			this.inputGates.get(i).initializeChannels(tdd.getInputGateDescriptor(i));
+		if (this.outputGates.size() != outGates.size()) {
+			throw new Exception("The number of writers created in 'registerInputOutput()' "
+					+ "is different than the number of connected outgoing edges in the job graph.");
+		}
+		
+		for (int i = 0; i < inGates.size(); i++) {
+			this.inputGates.get(i).initializeChannels(inGates.get(i));
+		}
+		for (int i = 0; i < outGates.size(); i++) {
+			this.outputGates.get(i).initializeChannels(outGates.get(i));
 		}
 	}
 
@@ -228,7 +203,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 	@Override
 	public JobID getJobID() {
-		return this.jobID;
+		return this.owner.getJobID();
 	}
 
 	@Override
@@ -246,30 +221,24 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 	@Override
 	public void run() {
-		if (invokable == null) {
-			LOG.error("ExecutionEnvironment has no Invokable set");
-		}
 
-		// Now the actual program starts to run
-		changeExecutionState(ExecutionState.RUNNING, null);
-
-		// If the task has been canceled in the mean time, do not even start it
-		if (this.executionObserver.isCanceled()) {
-			changeExecutionState(ExecutionState.CANCELED, null);
+		// quick fail in case the task was cancelled while the tread was started
+		if (owner.isCanceled()) {
+			owner.cancelingDone();
 			return;
 		}
-
+		
 		try {
-			ClassLoader cl = LibraryCacheManager.getClassLoader(jobID);
-			Thread.currentThread().setContextClassLoader(cl);
+			Thread.currentThread().setContextClassLoader(userCodeClassLoader);
 			this.invokable.invoke();
 
 			// Make sure, we enter the catch block when the task has been canceled
-			if (this.executionObserver.isCanceled()) {
-				throw new InterruptedException();
+			if (this.owner.isCanceled()) {
+				throw new CancelTaskException();
 			}
-		} catch (Throwable t) {
-			if (!this.executionObserver.isCanceled()) {
+		}
+		catch (Throwable t) {
+			if (!this.owner.isCanceled()) {
 
 				// Perform clean up when the task failed and has been not canceled by the user
 				try {
@@ -282,19 +251,16 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 			// Release all resources that may currently be allocated by the individual channels
 			releaseAllChannelResources();
 
-			if (this.executionObserver.isCanceled() || t instanceof CancelTaskException) {
-				changeExecutionState(ExecutionState.CANCELED, null);
+			if (this.owner.isCanceled() || t instanceof CancelTaskException) {
+				this.owner.cancelingDone();
 			}
 			else {
-				changeExecutionState(ExecutionState.FAILED, StringUtils.stringifyException(t));
+				this.owner.markFailed(t);
 			}
 
 			return;
 		}
 
-		// Task finished running, but there may be unconsumed output data in some of the channels
-		changeExecutionState(ExecutionState.FINISHING, null);
-
 		try {
 			// If there is any unclosed input gate, close it and propagate close operation to corresponding output gate
 			closeInputGates();
@@ -307,16 +273,16 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 			// Now we wait until all output channels have written out their data and are closed
 			waitForOutputChannelsToBeClosed();
-		} catch (Throwable t) {
-
+		}
+		catch (Throwable t) {
 			// Release all resources that may currently be allocated by the individual channels
 			releaseAllChannelResources();
 
-			if (this.executionObserver.isCanceled() || t instanceof CancelTaskException) {
-				changeExecutionState(ExecutionState.CANCELED, null);
+			if (this.owner.isCanceled() || t instanceof CancelTaskException) {
+				this.owner.cancelingDone();
 			}
 			else {
-				changeExecutionState(ExecutionState.FAILED, StringUtils.stringifyException(t));
+				this.owner.markFailed(t);
 			}
 
 			return;
@@ -326,7 +292,9 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 		releaseAllChannelResources();
 
 		// Finally, switch execution state to FINISHED and report to job manager
-		changeExecutionState(ExecutionState.FINISHED, null);
+		if (!owner.markAsFinished()) {
+			owner.markFailed(new Exception());
+		}
 	}
 
 	@Override
@@ -403,12 +371,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 		synchronized (this) {
 
 			if (this.executingThread == null) {
-				if (this.taskName == null) {
-					this.executingThread = new Thread(this);
-				}
-				else {
-					this.executingThread = new Thread(this, getTaskNameWithIndex());
-				}
+				String name = owner.getTaskNameWithSubtasks();
+				this.executingThread = new Thread(this, name);
 			}
 
 			return this.executingThread;
@@ -416,9 +380,11 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	}
 	
 	public void cancelExecution() {
-		canceled = true;
+		if (!canceled.compareAndSet(false, true)) {
+			return;
+		}
 
-		LOG.info("Canceling " + getTaskNameWithIndex());
+		LOG.info("Canceling " + owner.getTaskNameWithSubtasks());
 
 		// Request user code to shut down
 		if (this.invokable != null) {
@@ -429,6 +395,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 			}
 		}
 		
+		final Thread executingThread = this.executingThread;
+		
 		// interrupt the running thread and wait for it to die
 		executingThread.interrupt();
 		
@@ -442,10 +410,10 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 		
 		// Continuously interrupt the user thread until it changed to state CANCELED
 		while (executingThread != null && executingThread.isAlive()) {
-			LOG.warn("Task " + getTaskName() + " did not react to cancelling signal. Sending repeated interrupt.");
+			LOG.warn("Task " + owner.getTaskNameWithSubtasks() + " did not react to cancelling signal. Sending repeated interrupt.");
 
 			if (LOG.isDebugEnabled()) {
-				StringBuilder bld = new StringBuilder("Task ").append(getTaskName()).append(" is stuck in method:\n");
+				StringBuilder bld = new StringBuilder("Task ").append(owner.getTaskNameWithSubtasks()).append(" is stuck in method:\n");
 				
 				StackTraceElement[] stack = executingThread.getStackTrace();
 				for (StackTraceElement e : stack) {
@@ -465,12 +433,11 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	/**
 	 * Blocks until all output channels are closed.
 	 *
-	 * @throws IOException          thrown if an error occurred while closing the output channels
 	 * @throws InterruptedException thrown if the thread waiting for the channels to be closed is interrupted
 	 */
 	private void waitForOutputChannelsToBeClosed() throws InterruptedException {
 		// Make sure, we leave this method with an InterruptedException when the task has been canceled
-		if (this.executionObserver.isCanceled()) {
+		if (this.owner.isCanceled()) {
 			return;
 		}
 
@@ -487,10 +454,10 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	 */
 	private void waitForInputChannelsToBeClosed() throws IOException, InterruptedException {
 		// Wait for disconnection of all output gates
-		while (!canceled) {
+		while (!canceled.get()) {
 
 			// Make sure, we leave this method with an InterruptedException when the task has been canceled
-			if (this.executionObserver.isCanceled()) {
+			if (this.owner.isCanceled()) {
 				throw new InterruptedException();
 			}
 
@@ -554,41 +521,17 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 	@Override
 	public int getCurrentNumberOfSubtasks() {
-		return this.currentNumberOfSubtasks;
+		return owner.getNumberOfSubtasks();
 	}
 
 	@Override
 	public int getIndexInSubtaskGroup() {
-		return this.indexInSubtaskGroup;
-	}
-
-	private void changeExecutionState(final ExecutionState newExecutionState, final String optionalMessage) {
-		if (this.executionObserver != null) {
-			this.executionObserver.executionStateChanged(newExecutionState, optionalMessage);
-		}
+		return owner.getSubtaskIndex();
 	}
 
 	@Override
 	public String getTaskName() {
-		return this.taskName;
-	}
-
-	/**
-	 * Returns the name of the task with its index in the subtask group and the total number of subtasks.
-	 *
-	 * @return the name of the task with its index in the subtask group and the total number of subtasks
-	 */
-	public String getTaskNameWithIndex() {
-		return String.format("%s (%d/%d)", this.taskName, getIndexInSubtaskGroup() + 1, getCurrentNumberOfSubtasks());
-	}
-
-	/**
-	 * Sets the execution observer for this environment.
-	 *
-	 * @param executionObserver the execution observer for this environment
-	 */
-	public void setExecutionObserver(final ExecutionObserver executionObserver) {
-		this.executionObserver = executionObserver;
+		return owner.getTaskName();
 	}
 
 	@Override
@@ -596,20 +539,6 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 		return this.inputSplitProvider;
 	}
 
-	@Override
-	public void userThreadStarted(final Thread userThread) {
-		if (this.executionObserver != null) {
-			this.executionObserver.userThreadStarted(userThread);
-		}
-	}
-
-	@Override
-	public void userThreadFinished(final Thread userThread) {
-		if (this.executionObserver != null) {
-			this.executionObserver.userThreadFinished(userThread);
-		}
-	}
-
 	/**
 	 * Releases the allocated resources (particularly buffer) of input and output channels attached to this task. This
 	 * method should only be called after the respected task has stopped running.
@@ -742,6 +671,10 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 		return accumulatorProtocolProxy;
 	}
 
+	public void addCopyTasksForCacheFile(Map<String, FutureTask<Path>> copyTasks) {
+		this.cacheCopyTasks.putAll(copyTasks);
+	}
+	
 	public void addCopyTaskForCacheFile(String name, FutureTask<Path> copyTask) {
 		this.cacheCopyTasks.put(name, copyTask);
 	}
@@ -809,7 +742,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	@Override
 	public void logBufferUtilization() {
 		LOG.info(String.format("\t%s: %d available, %d requested, %d designated",
-				getTaskNameWithIndex(),
+				owner.getTaskNameWithSubtasks(),
 				this.outputBufferPool.numAvailableBuffers(),
 				this.outputBufferPool.numRequestedBuffers(),
 				this.outputBufferPool.numDesignatedBuffers()));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
index 55bf3f0..1076ede 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
@@ -271,7 +271,7 @@ public final class LibraryCacheManager {
 	 * 
 	 * @param id
 	 *        the ID of the job to be registered.
-	 * @param clientPaths
+	 * @param requiredJarFiles
 	 *        the client path's of the required libraries
 	 * @throws IOException
 	 *         thrown if the library cache manager could not be instantiated or one of the requested libraries is not in
@@ -290,7 +290,7 @@ public final class LibraryCacheManager {
 	 * 
 	 * @param id
 	 *        the ID of the job to be registered.
-	 * @param clientPaths
+	 * @param requiredJarFiles
 	 *        the client path's of the required libraries
 	 * @throws IOException
 	 *         thrown if one of the requested libraries is not in the cache
@@ -438,8 +438,6 @@ public final class LibraryCacheManager {
 	 *        the ID of the job to return the class loader for
 	 * @return the class loader of requested vertex or <code>null</code> if no class loader has been registered with the
 	 *         given ID.
-	 * @throws IOException
-	 *         thrown if the library cache manager could not be instantiated
 	 */
 	private ClassLoader getClassLoaderInternal(final JobID id) {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheUpdate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheUpdate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheUpdate.java
index 4310bfb..2cfbe58 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheUpdate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheUpdate.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.execution.librarycache;
 
 import java.io.IOException;
@@ -26,15 +25,12 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 /**
- * This class is used to encapsulate the transmission of a library file in a Nephele RPC call.
- * 
+ * This class is used to encapsulate the transmission of a library file in a RPC call.
  */
 public class LibraryCacheUpdate implements IOReadableWritable {
 
-	/**
-	 * The name of the library file that is transmitted with this object.
-	 */
-	private String libraryFileName = null;
+	/** The name of the library file that is transmitted with this object. */
+	private String libraryFileName;
 
 	/**
 	 * Constructs a new library cache update object.
@@ -42,32 +38,32 @@ public class LibraryCacheUpdate implements IOReadableWritable {
 	 * @param libraryFileName
 	 *        the name of the library that should be transported within this object.
 	 */
-	public LibraryCacheUpdate(final String libraryFileName) {
+	public LibraryCacheUpdate(String libraryFileName) {
+		if (libraryFileName == null) {
+			throw new IllegalArgumentException("libraryFileName must not be null");
+		}
+		
 		this.libraryFileName = libraryFileName;
 	}
 
 	/**
 	 * Constructor used to reconstruct the object at the receiver of the RPC call.
 	 */
-	public LibraryCacheUpdate() {
-	}
+	public LibraryCacheUpdate() {}
 
 
 	@Override
-	public void read(final DataInputView in) throws IOException {
-
+	public void read(DataInputView in) throws IOException {
 		LibraryCacheManager.readLibraryFromStream(in);
 	}
 
 
 	@Override
-	public void write(final DataOutputView out) throws IOException {
-
+	public void write(DataOutputView out) throws IOException {
 		if (this.libraryFileName == null) {
 			throw new IOException("libraryFileName is null");
 		}
 
 		LibraryCacheManager.writeLibraryToStream(this.libraryFileName, out);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AllVerticesIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AllVerticesIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AllVerticesIterator.java
new file mode 100644
index 0000000..84781cb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AllVerticesIterator.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+class AllVerticesIterator implements Iterator<ExecutionVertex2> {
+
+	private final Iterator<ExecutionJobVertex> jobVertices;
+	
+	private ExecutionVertex2[] currVertices;
+	
+	private int currPos;
+	
+	
+	public AllVerticesIterator(Iterator<ExecutionJobVertex> jobVertices) {
+		this.jobVertices = jobVertices;
+	}
+	
+	
+	@Override
+	public boolean hasNext() {
+		while (true) {
+			if (currVertices != null) {
+				if (currPos < currVertices.length) {
+					return true;
+				} else {
+					currVertices = null;
+				}
+			}
+			else if (jobVertices.hasNext()) {
+				currVertices = jobVertices.next().getTaskVertices();
+				currPos = 0;
+			}
+			else {
+				return false;
+			}
+		}
+	}
+	
+	@Override
+	public ExecutionVertex2 next() {
+		if (hasNext()) {
+			return currVertices[currPos++];
+		} else {
+			throw new NoSuchElementException();
+		}
+	}
+	
+	@Override
+	public void remove() {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DistributionPatternProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DistributionPatternProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DistributionPatternProvider.java
deleted file mode 100644
index 610b294..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DistributionPatternProvider.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-
-public final class DistributionPatternProvider {
-
-	/**
-	 * Checks if two subtasks of different tasks should be wired.
-	 * 
-	 * @param pattern
-	 *        the distribution pattern that should be used
-	 * @param nodeLowerStage
-	 *        the index of the producing task's subtask
-	 * @param nodeUpperStage
-	 *        the index of the consuming task's subtask
-	 * @param sizeSetLowerStage
-	 *        the number of subtasks of the producing task
-	 * @param sizeSetUpperStage
-	 *        the number of subtasks of the consuming task
-	 * @return <code>true</code> if a wire between the two considered subtasks should be created, <code>false</code>
-	 *         otherwise
-	 */
-	public static boolean createWire(final DistributionPattern pattern, final int nodeLowerStage,
-			final int nodeUpperStage, final int sizeSetLowerStage, final int sizeSetUpperStage) {
-
-		switch (pattern) {
-		case BIPARTITE:
-			return true;
-
-		case POINTWISE:
-			if (sizeSetLowerStage < sizeSetUpperStage) {
-				if (nodeLowerStage == (nodeUpperStage % sizeSetLowerStage)) {
-					return true;
-				}
-			} else {
-				if ((nodeLowerStage % sizeSetUpperStage) == nodeUpperStage) {
-					return true;
-				}
-			}
-
-			return false;
-
-			/*
-			 * case STAR:
-			 * if (sizeSetLowerStage > sizeSetUpperStage) {
-			 * int groupNumber = nodeLowerStage / Math.max(sizeSetLowerStage / sizeSetUpperStage, 1);
-			 * if (nodeUpperStage == groupNumber) {
-			 * return true;
-			 * }
-			 * } else {
-			 * int groupNumber = nodeUpperStage / Math.max(sizeSetUpperStage / sizeSetLowerStage, 1);
-			 * if (nodeLowerStage == groupNumber) {
-			 * return true;
-			 * }
-			 * }
-			 * return false;
-			 */
-
-		default:
-			// this will never happen.
-			throw new IllegalStateException("No Match for Distribution Pattern found.");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java
new file mode 100644
index 0000000..18e57f0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.AbstractID;
+
+/**
+ * Unique identifier for the attempt to execute a tasks. Multiple attempts happen
+ * in cases of failures and recovery.
+ */
+public class ExecutionAttemptID extends AbstractID {
+	
+	private static final long serialVersionUID = -1169683445778281344L;
+}