You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:13:02 UTC

[38/63] [abbrv] Finalize ExecutionGraph state machine and calls

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index adc0f09..bbc0c97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -16,992 +16,388 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.executiongraph;
 
-import java.io.IOException;
+import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
+import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
+import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
+
 import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.HashSet;
 import java.util.List;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.runtime.deployment.ChannelDeploymentDescriptor;
+
+import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.ExecutionListener;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.ExecutionStateTransition;
-import org.apache.flink.runtime.instance.AllocatedResource;
-import org.apache.flink.runtime.instance.AllocationID;
-import org.apache.flink.runtime.io.network.gates.GateID;
-import org.apache.flink.runtime.taskmanager.TaskOperationResult;
-import org.apache.flink.runtime.taskmanager.TaskCancelResult;
-import org.apache.flink.runtime.taskmanager.TaskKillResult;
-import org.apache.flink.runtime.taskmanager.TaskSubmissionResult;
-import org.apache.flink.runtime.taskmanager.TaskOperationResult.ReturnCode;
-import org.apache.flink.runtime.util.AtomicEnum;
-import org.apache.flink.runtime.util.SerializableArrayList;
-import org.apache.flink.util.StringUtils;
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 
 /**
- * An execution vertex represents an instance of a task in a Nephele job. An execution vertex
- * is initially created from a job vertex and always belongs to exactly one group vertex.
- * It is possible to duplicate execution vertices in order to distribute a task to several different
- * task managers and process the task in parallel.
- * <p>
- * This class is thread-safe.
- * 
+ * The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several times, each of
+ * which time it spawns an {@link Execution}.
  */
-public final class ExecutionVertex {
-
-	/**
-	 * The log object used for debugging.
-	 */
-	private static final Logger LOG = LoggerFactory.getLogger(ExecutionVertex.class);
-
-	/**
-	 * The ID of the vertex.
-	 */
-	private final ExecutionVertexID vertexID;
-
-	/**
-	 * The group vertex this vertex belongs to.
-	 */
-	private final ExecutionGroupVertex groupVertex;
-
-	/**
-	 * The execution graph is vertex belongs to.
-	 */
-	private final ExecutionGraph executionGraph;
-
-	/**
-	 * The allocated resources assigned to this vertex.
-	 */
-	private final AtomicReference<AllocatedResource> allocatedResource = new AtomicReference<AllocatedResource>(null);
-
-	/**
-	 * The allocation ID identifying the allocated resources used by this vertex
-	 * within the instance.
-	 */
-	private volatile AllocationID allocationID = null;
-
-	/**
-	 * A list of {@link VertexAssignmentListener} objects to be notified about changes in the instance assignment.
-	 */
-	private final CopyOnWriteArrayList<VertexAssignmentListener> vertexAssignmentListeners = new CopyOnWriteArrayList<VertexAssignmentListener>();
-
-	/**
-	 * A map of {@link ExecutionListener} objects to be notified about the state changes of a vertex.
-	 */
-	private final ConcurrentMap<Integer, ExecutionListener> executionListeners = new ConcurrentSkipListMap<Integer, ExecutionListener>();
-
-	/**
-	 * The current execution state of the task represented by this vertex
-	 */
-	private final AtomicEnum<ExecutionState> executionState = new AtomicEnum<ExecutionState>(ExecutionState.CREATED);
-
-	/**
-	 * The output gates attached to this vertex.
-	 */
-	private final ExecutionGate[] outputGates;
-
-	/**
-	 * The input gates attached to his vertex.
-	 */
-	private final ExecutionGate[] inputGates;
-
-	/**
-	 * The index of this vertex in the vertex group.
-	 */
-	private volatile int indexInVertexGroup = 0;
-
-	/**
-	 * Stores the number of times the vertex may be still be started before the corresponding task is considered to be
-	 * failed.
-	 */
-	private final AtomicInteger retriesLeft;
-
-
-	/**
-	 * The execution pipeline this vertex is part of.
-	 */
-	private final AtomicReference<ExecutionPipeline> executionPipeline = new AtomicReference<ExecutionPipeline>(null);
-
-	/**
-	 * Flag to indicate whether the vertex has been requested to cancel while in state STARTING
-	 */
-	private final AtomicBoolean cancelRequested = new AtomicBoolean(false);
-
-	/**
-	 * Create a new execution vertex and instantiates its environment.
-	 * 
-	 * @param executionGraph
-	 *        the execution graph the new vertex belongs to
-	 * @param groupVertex
-	 *        the group vertex the new vertex belongs to
-	 * @param numberOfOutputGates
-	 *        the number of output gates attached to this vertex
-	 * @param numberOfInputGates
-	 *        the number of input gates attached to this vertex
-	 */
-	public ExecutionVertex(final ExecutionGraph executionGraph, final ExecutionGroupVertex groupVertex,
-			final int numberOfOutputGates, final int numberOfInputGates) {
-		this(new ExecutionVertexID(), executionGraph, groupVertex, numberOfOutputGates, numberOfInputGates);
-
-		this.groupVertex.addInitialSubtask(this);
-	}
-
-	/**
-	 * Private constructor used to duplicate execution vertices.
-	 * 
-	 * @param vertexID
-	 *        the ID of the new execution vertex.
-	 * @param executionGraph
-	 *        the execution graph the new vertex belongs to
-	 * @param groupVertex
-	 *        the group vertex the new vertex belongs to
-	 * @param numberOfOutputGates
-	 *        the number of output gates attached to this vertex
-	 * @param numberOfInputGates
-	 *        the number of input gates attached to this vertex
-	 */
-	private ExecutionVertex(final ExecutionVertexID vertexID, final ExecutionGraph executionGraph,
-			final ExecutionGroupVertex groupVertex, final int numberOfOutputGates, final int numberOfInputGates) {
-
-		this.vertexID = vertexID;
-		this.executionGraph = executionGraph;
-		this.groupVertex = groupVertex;
-
-		this.retriesLeft = new AtomicInteger(groupVertex.getNumberOfExecutionRetries());
-
-		this.outputGates = new ExecutionGate[numberOfOutputGates];
-		this.inputGates = new ExecutionGate[numberOfInputGates];
+public class ExecutionVertex {
 
-		// Register vertex with execution graph
-		this.executionGraph.registerExecutionVertex(this);
-
-		// Register the vertex itself as a listener for state changes
-		registerExecutionListener(this.executionGraph);
-	}
-
-	/**
-	 * Returns the group vertex this execution vertex belongs to.
-	 * 
-	 * @return the group vertex this execution vertex belongs to
-	 */
-	public ExecutionGroupVertex getGroupVertex() {
-		return this.groupVertex;
-	}
+	@SuppressWarnings("unused")
+	private static final Logger LOG = ExecutionGraph.LOG;
+	
+	private static final int MAX_DISTINCT_LOCATIONS_TO_CONSIDER = 8;
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private final ExecutionJobVertex jobVertex;
+	
+	private final IntermediateResultPartition[] resultPartitions;
+	
+	private final ExecutionEdge[][] inputEdges;
+	
+	private final int subTaskIndex;
+	
+	private final List<Execution> priorExecutions;
+	
+	private volatile Execution currentExecution;	// this field must never be null
+	
+	// --------------------------------------------------------------------------------------------
 
-	/**
-	 * Returns the name of the execution vertex.
-	 * 
-	 * @return the name of the execution vertex
-	 */
-	public String getName() {
-		return this.groupVertex.getName();
+	public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex, IntermediateResult[] producedDataSets) {
+		this(jobVertex, subTaskIndex, producedDataSets, System.currentTimeMillis());
 	}
-
-	/**
-	 * Returns a duplicate of this execution vertex.
-	 * 
-	 * @param preserveVertexID
-	 *        <code>true</code> to copy the vertex's ID to the duplicated vertex, <code>false</code> to create a new ID
-	 * @return a duplicate of this execution vertex
-	 */
-	public ExecutionVertex duplicateVertex(final boolean preserveVertexID) {
-
-		ExecutionVertexID newVertexID;
-		if (preserveVertexID) {
-			newVertexID = this.vertexID;
-		} else {
-			newVertexID = new ExecutionVertexID();
-		}
-
-		final ExecutionVertex duplicatedVertex = new ExecutionVertex(newVertexID, this.executionGraph,
-			this.groupVertex, this.outputGates.length, this.inputGates.length);
-
-		// Duplicate gates
-		for (int i = 0; i < this.outputGates.length; ++i) {
-			duplicatedVertex.outputGates[i] = new ExecutionGate(new GateID(), duplicatedVertex,
-				this.outputGates[i].getGroupEdge(), false);
-		}
-
-		for (int i = 0; i < this.inputGates.length; ++i) {
-			duplicatedVertex.inputGates[i] = new ExecutionGate(new GateID(), duplicatedVertex,
-				this.inputGates[i].getGroupEdge(), true);
+	
+	public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex, IntermediateResult[] producedDataSets, long createTimestamp) {
+		this.jobVertex = jobVertex;
+		this.subTaskIndex = subTaskIndex;
+		
+		this.resultPartitions = new IntermediateResultPartition[producedDataSets.length];
+		for (int i = 0; i < producedDataSets.length; i++) {
+			IntermediateResultPartition irp = new IntermediateResultPartition(producedDataSets[i], this, subTaskIndex);
+			this.resultPartitions[i] = irp;
+			producedDataSets[i].setPartition(subTaskIndex, irp);
 		}
-
-		// TODO set new profiling record with new vertex id
-		duplicatedVertex.setAllocatedResource(this.allocatedResource.get());
-
-		return duplicatedVertex;
+		
+		this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][];
+		this.priorExecutions = new CopyOnWriteArrayList<Execution>();
+		
+		this.currentExecution = new Execution(this, 0, createTimestamp);
 	}
-
-	/**
-	 * Inserts the output gate at the given position.
-	 * 
-	 * @param pos
-	 *        the position to insert the output gate
-	 * @param outputGate
-	 *        the output gate to be inserted
-	 */
-	void insertOutputGate(final int pos, final ExecutionGate outputGate) {
-
-		if (this.outputGates[pos] != null) {
-			throw new IllegalStateException("Output gate at position " + pos + " is not null");
-		}
-
-		this.outputGates[pos] = outputGate;
+	
+	
+	// --------------------------------------------------------------------------------------------
+	//  Properties
+	// --------------------------------------------------------------------------------------------
+	
+	public JobID getJobId() {
+		return this.jobVertex.getJobId();
 	}
-
-	/**
-	 * Inserts the input gate at the given position.
-	 * 
-	 * @param pos
-	 *        the position to insert the input gate
-	 * @param outputGate
-	 *        the input gate to be inserted
-	 */
-	void insertInputGate(final int pos, final ExecutionGate inputGate) {
-
-		if (this.inputGates[pos] != null) {
-			throw new IllegalStateException("Input gate at position " + pos + " is not null");
-		}
-
-		this.inputGates[pos] = inputGate;
+	
+	public ExecutionJobVertex getJobVertex() {
+		return jobVertex;
 	}
-
-	/**
-	 * Returns a duplicate of this execution vertex. The duplicated vertex receives
-	 * a new vertex ID.
-	 * 
-	 * @return a duplicate of this execution vertex.
-	 */
-	public ExecutionVertex splitVertex() {
-
-		return duplicateVertex(false);
+	
+	public JobVertexID getJobvertexId() {
+		return this.jobVertex.getJobVertexId();
 	}
-
-	/**
-	 * Returns this execution vertex's current execution status.
-	 * 
-	 * @return this execution vertex's current execution status
-	 */
-	public ExecutionState getExecutionState() {
-		return this.executionState.get();
+	
+	public String getTaskName() {
+		return this.jobVertex.getJobVertex().getName();
 	}
-
-	/**
-	 * Updates the vertex's current execution state through the job's executor service.
-	 * 
-	 * @param newExecutionState
-	 *        the new execution state
-	 * @param optionalMessage
-	 *        an optional message related to the state change
-	 */
-	public void updateExecutionStateAsynchronously(final ExecutionState newExecutionState,
-			final String optionalMessage) {
-
-		final Runnable command = new Runnable() {
-
-			/**
-			 * {@inheritDoc}
-			 */
-			@Override
-			public void run() {
-
-				updateExecutionState(newExecutionState, optionalMessage);
-			}
-		};
-
-		this.executionGraph.executeCommand(command);
+	
+	public int getTotalNumberOfParallelSubtasks() {
+		return this.jobVertex.getParallelism();
 	}
-
-	/**
-	 * Updates the vertex's current execution state through the job's executor service.
-	 * 
-	 * @param newExecutionState
-	 *        the new execution state
-	 */
-	public void updateExecutionStateAsynchronously(final ExecutionState newExecutionState) {
-
-		updateExecutionStateAsynchronously(newExecutionState, null);
+	
+	public int getParallelSubtaskIndex() {
+		return this.subTaskIndex;
 	}
-
-	/**
-	 * Updates the vertex's current execution state.
-	 * 
-	 * @param newExecutionState
-	 *        the new execution state
-	 */
-	public ExecutionState updateExecutionState(final ExecutionState newExecutionState) {
-		return updateExecutionState(newExecutionState, null);
+	
+	public int getNumberOfInputs() {
+		return this.inputEdges.length;
 	}
-
-	/**
-	 * Updates the vertex's current execution state.
-	 * 
-	 * @param newExecutionState
-	 *        the new execution state
-	 * @param optionalMessage
-	 *        an optional message related to the state change
-	 */
-	public ExecutionState updateExecutionState(ExecutionState newExecutionState, final String optionalMessage) {
-
-		if (newExecutionState == null) {
-			throw new IllegalArgumentException("Argument newExecutionState must not be null");
-		}
-
-		final ExecutionState currentExecutionState = this.executionState.get();
-		if (currentExecutionState == ExecutionState.CANCELING) {
-
-			// If we are in CANCELING, ignore state changes to FINISHING
-			if (newExecutionState == ExecutionState.FINISHING) {
-				return currentExecutionState;
-			}
-
-			// Rewrite FINISHED to CANCELED if the task has been marked to be canceled
-			if (newExecutionState == ExecutionState.FINISHED) {
-				LOG.info("Received transition from CANCELING to FINISHED for vertex " + toString()
-					+ ", converting it to CANCELED");
-				newExecutionState = ExecutionState.CANCELED;
-			}
-		}
-
-		// Check and save the new execution state
-		final ExecutionState previousState = this.executionState.getAndSet(newExecutionState);
-		if (previousState == newExecutionState) {
-			return previousState;
-		}
-
-		// Check the transition
-		ExecutionStateTransition.checkTransition(true, toString(), previousState, newExecutionState);
-
-		// Notify the listener objects
-		final Iterator<ExecutionListener> it = this.executionListeners.values().iterator();
-		while (it.hasNext()) {
-			it.next().executionStateChanged(this.executionGraph.getJobID(), this.vertexID, newExecutionState,
-				optionalMessage);
+	
+	public ExecutionEdge[] getInputEdges(int input) {
+		if (input < 0 || input >= this.inputEdges.length) {
+			throw new IllegalArgumentException(String.format("Input %d is out of range [0..%d)", input, this.inputEdges.length));
 		}
-
-		// The vertex was requested to be canceled by another thread
-		checkCancelRequestedFlag();
-
-		return previousState;
+		return inputEdges[input];
 	}
-
-	public boolean compareAndUpdateExecutionState(final ExecutionState expected, final ExecutionState update) {
-
-		if (update == null) {
-			throw new IllegalArgumentException("Argument update must not be null");
-		}
-
-		if (!this.executionState.compareAndSet(expected, update)) {
-			return false;
-		}
-
-		// Check the transition
-		ExecutionStateTransition.checkTransition(true, toString(), expected, update);
-
-		// Notify the listener objects
-		final Iterator<ExecutionListener> it = this.executionListeners.values().iterator();
-		while (it.hasNext()) {
-			it.next().executionStateChanged(this.executionGraph.getJobID(), this.vertexID, update,
-				null);
-		}
-
-		// Check if the vertex was requested to be canceled by another thread
-		checkCancelRequestedFlag();
-
-		return true;
+	
+	public Execution getCurrentExecutionAttempt() {
+		return currentExecution;
 	}
-
-	/**
-	 * Checks if another thread requested the vertex to cancel while it was in state STARTING. If so, the method clears
-	 * the respective flag and repeats the cancel request.
-	 */
-	private void checkCancelRequestedFlag() {
-
-		if (this.cancelRequested.compareAndSet(true, false)) {
-			final TaskCancelResult tsr = cancelTask();
-			if (tsr.getReturnCode() != AbstractTaskResult.TaskOperationResult.SUCCESS
-				&& tsr.getReturnCode() != AbstractTaskResult.TaskOperationResult.TASK_NOT_FOUND) {
-				LOG.error("Unable to cancel vertex " + this + ": " + tsr.getReturnCode().toString()
-					+ ((tsr.getDescription() != null) ? (" (" + tsr.getDescription() + ")") : ""));
-			}
-		}
+	
+	public ExecutionState getExecutionState() {
+		return currentExecution.getState();
 	}
-
-	/**
-	 * Assigns the execution vertex with an {@link AllocatedResource}.
-	 * 
-	 * @param allocatedResource
-	 *        the resources which are supposed to be allocated to this vertex
-	 */
-	public void setAllocatedResource(final AllocatedResource allocatedResource) {
-
-		if (allocatedResource == null) {
-			throw new IllegalArgumentException("Argument allocatedResource must not be null");
-		}
-
-		final AllocatedResource previousResource = this.allocatedResource.getAndSet(allocatedResource);
-		if (previousResource != null) {
-			previousResource.removeVertexFromResource(this);
-		}
-
-		allocatedResource.assignVertexToResource(this);
-
-		// Notify all listener objects
-		final Iterator<VertexAssignmentListener> it = this.vertexAssignmentListeners.iterator();
-		while (it.hasNext()) {
-			it.next().vertexAssignmentChanged(this.vertexID, allocatedResource);
-		}
+	
+	public long getStateTimestamp(ExecutionState state) {
+		return currentExecution.getStateTimestamp(state);
 	}
-
-	/**
-	 * Returns the allocated resources assigned to this execution vertex.
-	 * 
-	 * @return the allocated resources assigned to this execution vertex
-	 */
-	public AllocatedResource getAllocatedResource() {
-
-		return this.allocatedResource.get();
+	
+	public Throwable getFailureCause() {
+		return currentExecution.getFailureCause();
 	}
-
-	/**
-	 * Returns the allocation ID which identifies the resources used
-	 * by this vertex within the assigned instance.
-	 * 
-	 * @return the allocation ID which identifies the resources used
-	 *         by this vertex within the assigned instance or <code>null</code> if the instance is still assigned to a
-	 *         {@link org.apache.flink.runtime.instance.DummyInstance}.
-	 */
-	public AllocationID getAllocationID() {
-		return this.allocationID;
+	
+	public AllocatedSlot getCurrentAssignedResource() {
+		return currentExecution.getAssignedResource();
 	}
-
-	/**
-	 * Returns the ID of this execution vertex.
-	 * 
-	 * @return the ID of this execution vertex
-	 */
-	public ExecutionVertexID getID() {
-		return this.vertexID;
+	
+	public ExecutionGraph getExecutionGraph() {
+		return this.jobVertex.getGraph();
 	}
-
-	/**
-	 * Returns the number of predecessors, i.e. the number of vertices
-	 * which connect to this vertex.
-	 * 
-	 * @return the number of predecessors
-	 */
-	public int getNumberOfPredecessors() {
-
-		int numberOfPredecessors = 0;
-
-		for (int i = 0; i < this.inputGates.length; ++i) {
-			numberOfPredecessors += this.inputGates[i].getNumberOfEdges();
+	
+	// --------------------------------------------------------------------------------------------
+	//  Graph building
+	// --------------------------------------------------------------------------------------------
+	
+	public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {
+		
+		final DistributionPattern pattern = edge.getDistributionPattern();
+		final IntermediateResultPartition[] sourcePartitions = source.getPartitions();
+		
+		ExecutionEdge[] edges = null;
+		
+		switch (pattern) {
+			case POINTWISE:
+				edges = connectPointwise(sourcePartitions, inputNumber);
+				break;
+				
+			case BIPARTITE: 
+				edges = connectAllToAll(sourcePartitions, inputNumber);
+				break;
+				
+			default:
+				throw new RuntimeException("Unrecognized distribution pattern.");
+		
 		}
-
-		return numberOfPredecessors;
-	}
-
-	/**
-	 * Returns the number of successors, i.e. the number of vertices
-	 * this vertex is connected to.
-	 * 
-	 * @return the number of successors
-	 */
-	public int getNumberOfSuccessors() {
-
-		int numberOfSuccessors = 0;
-
-		for (int i = 0; i < this.outputGates.length; ++i) {
-			numberOfSuccessors += this.outputGates[i].getNumberOfEdges();
+		
+		this.inputEdges[inputNumber] = edges;
+		
+		ExecutionGraph graph = getExecutionGraph();
+		
+		// add the consumers to the source
+		// for now (until the receiver initiated handshake is in place), we need to register the 
+		// edges as the execution graph
+		for (ExecutionEdge ee : edges) {
+			ee.getSource().addConsumer(ee, consumerNumber);
+			graph.registerExecutionEdge(ee);
 		}
-
-		return numberOfSuccessors;
 	}
-
-	public ExecutionVertex getPredecessor(int index) {
-
-		if (index < 0) {
-			throw new IllegalArgumentException("Argument index must be greather or equal to 0");
-		}
-
-		for (int i = 0; i < this.inputGates.length; ++i) {
-
-			final ExecutionGate inputGate = this.inputGates[i];
-			final int numberOfEdges = inputGate.getNumberOfEdges();
-
-			if (index >= 0 && index < numberOfEdges) {
-
-				final ExecutionEdge edge = inputGate.getEdge(index);
-				return edge.getOutputGate().getVertex();
-			}
-			index -= numberOfEdges;
+	
+	private ExecutionEdge[] connectAllToAll(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
+		ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length];
+		
+		for (int i = 0; i < sourcePartitions.length; i++) {
+			IntermediateResultPartition irp = sourcePartitions[i];
+			edges[i] = new ExecutionEdge(irp, this, inputNumber);
 		}
-
-		return null;
+		
+		return edges;
 	}
-
-	public ExecutionVertex getSuccessor(int index) {
-
-		if (index < 0) {
-			throw new IllegalArgumentException("Argument index must be greather or equal to 0");
-		}
-
-		for (int i = 0; i < this.outputGates.length; ++i) {
-
-			final ExecutionGate outputGate = this.outputGates[i];
-			final int numberOfEdges = outputGate.getNumberOfEdges();
-
-			if (index >= 0 && index < numberOfEdges) {
-
-				final ExecutionEdge edge = outputGate.getEdge(index);
-				return edge.getInputGate().getVertex();
+	
+	private ExecutionEdge[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
+		final int numSources = sourcePartitions.length;
+		final int parallelism = getTotalNumberOfParallelSubtasks();
+		
+		// simple case same number of sources as targets
+		if (numSources == parallelism) {
+			return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) };
+		}
+		else if (numSources < parallelism) {
+			
+			int sourcePartition;
+			
+			// check if the pattern is regular or irregular
+			// we use int arithmetics for regular, and floating point with rounding for irregular
+			if (parallelism % numSources == 0) {
+				// same number of targets per source
+				int factor = parallelism / numSources;
+				sourcePartition = subTaskIndex / factor;
+			}
+			else {
+				// different number of targets per source
+				float factor = ((float) parallelism) / numSources;
+				sourcePartition = (int) (subTaskIndex / factor);
+			}
+			
+			return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[sourcePartition], this, inputNumber) };
+		}
+		else {
+			if (numSources % parallelism == 0) {
+				// same number of targets per source
+				int factor = numSources / parallelism;
+				int startIndex = subTaskIndex * factor;
+				
+				ExecutionEdge[] edges = new ExecutionEdge[factor];
+				for (int i = 0; i < factor; i++) {
+					edges[i] = new ExecutionEdge(sourcePartitions[startIndex + i], this, inputNumber);
+				}
+				return edges;
+			}
+			else {
+				float factor = ((float) numSources) / parallelism;
+				
+				int start = (int) (subTaskIndex * factor);
+				int end = (subTaskIndex == getTotalNumberOfParallelSubtasks() - 1) ?
+						sourcePartitions.length : 
+						(int) ((subTaskIndex + 1) * factor);
+				
+				ExecutionEdge[] edges = new ExecutionEdge[end - start];
+				for (int i = 0; i < edges.length; i++) {
+					edges[i] = new ExecutionEdge(sourcePartitions[start + i], this, inputNumber);
+				}
+				
+				return edges;
 			}
-			index -= numberOfEdges;
 		}
-
-		return null;
-
-	}
-
-	/**
-	 * Checks if this vertex is an input vertex in its stage, i.e. has either no
-	 * incoming connections or only incoming connections to group vertices in a lower stage.
-	 * 
-	 * @return <code>true</code> if this vertex is an input vertex, <code>false</code> otherwise
-	 */
-	public boolean isInputVertex() {
-
-		return this.groupVertex.isInputVertex();
-	}
-
-	/**
-	 * Checks if this vertex is an output vertex in its stage, i.e. has either no
-	 * outgoing connections or only outgoing connections to group vertices in a higher stage.
-	 * 
-	 * @return <code>true</code> if this vertex is an output vertex, <code>false</code> otherwise
-	 */
-	public boolean isOutputVertex() {
-
-		return this.groupVertex.isOutputVertex();
-	}
-
-	/**
-	 * Returns the index of this vertex in the vertex group.
-	 * 
-	 * @return the index of this vertex in the vertex group
-	 */
-	public int getIndexInVertexGroup() {
-
-		return this.indexInVertexGroup;
-	}
-
-	/**
-	 * Sets the vertex' index in the vertex group.
-	 * 
-	 * @param indexInVertexGroup
-	 *        the vertex' index in the vertex group
-	 */
-	void setIndexInVertexGroup(final int indexInVertexGroup) {
-
-		this.indexInVertexGroup = indexInVertexGroup;
-	}
-
-	/**
-	 * Returns the number of output gates attached to this vertex.
-	 * 
-	 * @return the number of output gates attached to this vertex
-	 */
-	public int getNumberOfOutputGates() {
-
-		return this.outputGates.length;
 	}
 
 	/**
-	 * Returns the output gate with the given index.
+	 * Gets the location preferences of this task, determined by the locations of the predecessors from which
+	 * it receives input data.
+	 * If there are more than MAX_DISTINCT_LOCATIONS_TO_CONSIDER different locations of source data, this
+	 * method returns {@code null} to indicate no location preference.
 	 * 
-	 * @param index
-	 *        the index of the output gate to return
-	 * @return the output gate with the given index
-	 */
-	public ExecutionGate getOutputGate(final int index) {
-
-		return this.outputGates[index];
-	}
-
-	/**
-	 * Returns the number of input gates attached to this vertex.
-	 * 
-	 * @return the number of input gates attached to this vertex
-	 */
-	public int getNumberOfInputGates() {
-
-		return this.inputGates.length;
-	}
-
-	/**
-	 * Returns the input gate with the given index.
-	 * 
-	 * @param index
-	 *        the index of the input gate to return
-	 * @return the input gate with the given index
-	 */
-	public ExecutionGate getInputGate(final int index) {
-
-		return this.inputGates[index];
-	}
-
-	/**
-	 * Deploys and starts the task represented by this vertex
-	 * on the assigned instance.
-	 * 
-	 * @return the result of the task submission attempt
-	 */
-	public TaskSubmissionResult startTask() {
-
-		final AllocatedResource ar = this.allocatedResource.get();
-
-		if (ar == null) {
-			final TaskSubmissionResult result = new TaskSubmissionResult(getID(),
-				AbstractTaskResult.TaskOperationResult.NO_INSTANCE);
-			result.setDescription("Assigned instance of vertex " + this.toString() + " is null!");
-			return result;
-		}
-
-		final List<TaskDeploymentDescriptor> tasks = new SerializableArrayList<TaskDeploymentDescriptor>();
-		tasks.add(constructDeploymentDescriptor());
-
-		try {
-			final List<TaskSubmissionResult> results = ar.getInstance().submitTasks(tasks);
-
-			return results.get(0);
-
-		} catch (IOException e) {
-			final TaskSubmissionResult result = new TaskSubmissionResult(getID(),
-				AbstractTaskResult.TaskOperationResult.IPC_ERROR);
-			result.setDescription(StringUtils.stringifyException(e));
-			return result;
-		}
-	}
-
-	/**
-	 * Kills and removes the task represented by this vertex from the instance it is currently running on. If the
-	 * corresponding task is not in the state <code>RUNNING</code>, this call will be ignored. If the call has been
-	 * executed
-	 * successfully, the task will change the state <code>FAILED</code>.
-	 *
-	 * @return the result of the task kill attempt
+	 * @return The preferred locations for this vertex execution, or null, if there is no preference.
 	 */
-	public TaskKillResult killTask() {
-
-		final ExecutionState state = this.executionState.get();
-
-		if (state != ExecutionState.RUNNING) {
-			final TaskKillResult result = new TaskKillResult(getID(), AbstractTaskResult.TaskOperationResult.ILLEGAL_STATE);
-			result.setDescription("Vertex " + this.toString() + " is in state " + state);
-			return result;
-		}
-
-		final AllocatedResource ar = this.allocatedResource.get();
-
-		if (ar == null) {
-			final TaskKillResult result = new TaskKillResult(getID(), AbstractTaskResult.TaskOperationResult.NO_INSTANCE);
-			result.setDescription("Assigned instance of vertex " + this.toString() + " is null!");
-			return result;
-		}
-
-		try {
-			return ar.getInstance().killTask(this.vertexID);
-		} catch (IOException e) {
-			final TaskKillResult result = new TaskKillResult(getID(), AbstractTaskResult.TaskOperationResult.IPC_ERROR);
-			result.setDescription(StringUtils.stringifyException(e));
-			return result;
+	public Iterable<Instance> getPreferredLocations() {
+		HashSet<Instance> locations = new HashSet<Instance>();
+		
+		for (int i = 0; i < inputEdges.length; i++) {
+			ExecutionEdge[] sources = inputEdges[i];
+			if (sources != null) {
+				for (int k = 0; k < sources.length; k++) {
+					Instance source = sources[k].getSource().getProducer().getCurrentAssignedResource().getInstance();
+					locations.add(source);
+					if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
+						return null;
+					}
+				}
+			}
 		}
+		return locations;
 	}
-
-	/**
-	 * Cancels and removes the task represented by this vertex
-	 * from the instance it is currently running on. If the task
-	 * is not currently running, its execution state is simply
-	 * updated to <code>CANCELLED</code>.
-	 * 
-	 * @return the result of the task cancel attempt
-	 */
-	public TaskCancelResult cancelTask() {
-
-		while (true) {
-
-			final ExecutionState previousState = this.executionState.get();
-
-			if (previousState == ExecutionState.CANCELED) {
-				return new TaskCancelResult(getID(), AbstractTaskResult.TaskOperationResult.SUCCESS);
-			}
-
-			if (previousState == ExecutionState.FAILED) {
-				return new TaskCancelResult(getID(), AbstractTaskResult.TaskOperationResult.SUCCESS);
-			}
-
-			if (previousState == ExecutionState.FINISHED) {
-				return new TaskCancelResult(getID(), AbstractTaskResult.TaskOperationResult.SUCCESS);
+	
+	// --------------------------------------------------------------------------------------------
+	//   Actions
+	// --------------------------------------------------------------------------------------------
+	
+	public void resetForNewExecution() {
+		synchronized (priorExecutions) {
+			Execution execution = currentExecution;
+			ExecutionState state = execution.getState();
+			
+			if (state == FINISHED || state == CANCELED || state == FAILED) {
+				priorExecutions.add(execution);
+				currentExecution = new Execution(this, execution.getAttemptNumber()+1, System.currentTimeMillis());
 			}
-
-			// The vertex has already received a cancel request
-			if (previousState == ExecutionState.CANCELING) {
-				return new TaskCancelResult(getID(), ReturnCode.SUCCESS);
-			}
-
-			// Do not trigger the cancel request when vertex is in state STARTING, this might cause a race between RPC
-			// calls.
-			if (previousState == ExecutionState.STARTING) {
-
-				this.cancelRequested.set(true);
-
-				// We had a race, so we unset the flag and take care of cancellation ourselves
-				if (this.executionState.get() != ExecutionState.STARTING) {
-					this.cancelRequested.set(false);
-					continue;
-				}
-
-				return new TaskCancelResult(getID(), AbstractTaskResult.TaskOperationResult.SUCCESS);
-			}
-
-			// Check if we had a race. If state change is accepted, send cancel request
-			if (compareAndUpdateExecutionState(previousState, ExecutionState.CANCELING)) {
-
-				if (this.groupVertex.getStageNumber() != this.executionGraph.getIndexOfCurrentExecutionStage()) {
-					// Set to canceled directly
-					updateExecutionState(ExecutionState.CANCELED, null);
-					return new TaskCancelResult(getID(), AbstractTaskResult.TaskOperationResult.SUCCESS);
-				}
-
-				if (previousState != ExecutionState.RUNNING && previousState != ExecutionState.FINISHING) {
-					// Set to canceled directly
-					updateExecutionState(ExecutionState.CANCELED, null);
-					return new TaskCancelResult(getID(), AbstractTaskResult.TaskOperationResult.SUCCESS);
-				}
-
-				final AllocatedResource ar = this.allocatedResource.get();
-
-				if (ar == null) {
-					final TaskCancelResult result = new TaskCancelResult(getID(),
-						AbstractTaskResult.TaskOperationResult.NO_INSTANCE);
-					result.setDescription("Assigned instance of vertex " + this.toString() + " is null!");
-					return result;
-				}
-
-				try {
-					return ar.getInstance().cancelTask(this.vertexID);
-
-				} catch (IOException e) {
-					final TaskCancelResult result = new TaskCancelResult(getID(),
-						AbstractTaskResult.TaskOperationResult.IPC_ERROR);
-					result.setDescription(StringUtils.stringifyException(e));
-					return result;
-				}
+			else {
+				throw new IllegalStateException("Cannot reset a vertex that is in state " + state);
 			}
 		}
 	}
-
-	/**
-	 * Returns the {@link ExecutionGraph} this vertex belongs to.
-	 * 
-	 * @return the {@link ExecutionGraph} this vertex belongs to
-	 */
-	public ExecutionGraph getExecutionGraph() {
-
-		return this.executionGraph;
+	
+	public void scheduleForExecution(DefaultScheduler scheduler, boolean queued) throws NoResourceAvailableException {
+		this.currentExecution.scheduleForExecution(scheduler, queued);
 	}
-
-
-	@Override
-	public String toString() {
-
-		final StringBuilder sb = new StringBuilder(this.groupVertex.getName());
-		sb.append(" (");
-		sb.append(this.indexInVertexGroup + 1);
-		sb.append('/');
-		sb.append(this.groupVertex.getCurrentNumberOfGroupMembers());
-		sb.append(')');
-
-		return sb.toString();
+	
+	public void deployToSlot(AllocatedSlot slot) throws JobException {
+		this.currentExecution.deployToSlot(slot);
 	}
-
-	/**
-	 * Returns the task represented by this vertex has
-	 * a retry attempt left in case of an execution
-	 * failure.
-	 * 
-	 * @return <code>true</code> if the task has a retry attempt left, <code>false</code> otherwise
-	 */
-	@Deprecated
-	public boolean hasRetriesLeft() {
-		if (this.retriesLeft.get() <= 0) {
-			return false;
-		}
-		return true;
+	
+	public void cancel() {
+		this.currentExecution.cancel();
 	}
-
-	/**
-	 * Decrements the number of retries left and checks whether another attempt to run the task is possible.
-	 * 
-	 * @return <code>true</code>if the task represented by this vertex can be started at least once more,
-	 *         <code>false/<code> otherwise
-	 */
-	public boolean decrementRetriesLeftAndCheck() {
-		return (this.retriesLeft.decrementAndGet() > 0);
+	
+	public void fail(Throwable t) {
+		this.currentExecution.fail(t);
 	}
-
-	/**
-	 * Registers the {@link VertexAssignmentListener} object for this vertex. This object
-	 * will be notified about reassignments of this vertex to another instance.
-	 * 
-	 * @param vertexAssignmentListener
-	 *        the object to be notified about reassignments of this vertex to another instance
-	 */
-	public void registerVertexAssignmentListener(final VertexAssignmentListener vertexAssignmentListener) {
-
-		this.vertexAssignmentListeners.addIfAbsent(vertexAssignmentListener);
+	
+	// --------------------------------------------------------------------------------------------
+	//   Notifications from the Execution Attempt
+	// --------------------------------------------------------------------------------------------
+	
+	void executionFinished() {
+		jobVertex.vertexFinished(subTaskIndex);
 	}
-
-	/**
-	 * Unregisters the {@link VertexAssignmentListener} object for this vertex. This object
-	 * will no longer be notified about reassignments of this vertex to another instance.
-	 * 
-	 * @param vertexAssignmentListener
-	 *        the listener to be unregistered
-	 */
-	public void unregisterVertexAssignmentListener(final VertexAssignmentListener vertexAssignmentListener) {
-
-		this.vertexAssignmentListeners.remove(vertexAssignmentListener);
+	
+	void executionCanceled() {
+		jobVertex.vertexCancelled(subTaskIndex);
 	}
-
-
-	/**
-	 * Registers the {@link ExecutionListener} object for this vertex. This object
-	 * will be notified about particular events during the vertex's lifetime.
-	 * 
-	 * @param executionListener
-	 *        the object to be notified about particular events during the vertex's lifetime
-	 */
-	public void registerExecutionListener(final ExecutionListener executionListener) {
-
-		final Integer priority = Integer.valueOf(executionListener.getPriority());
-
-		if (priority.intValue() < 0) {
-			LOG.error("Priority for execution listener " + executionListener.getClass() + " must be non-negative.");
-			return;
-		}
-
-		final ExecutionListener previousValue = this.executionListeners.putIfAbsent(priority, executionListener);
-
-		if (previousValue != null) {
-			LOG.error("Cannot register " + executionListener.getClass() + " as an execution listener. Priority "
-				+ priority.intValue() + " is already taken.");
-		}
+	
+	void executionFailed(Throwable t) {
+		jobVertex.vertexFailed(subTaskIndex, t);
 	}
-
+	
+	// --------------------------------------------------------------------------------------------
+	//   Miscellaneous
+	// --------------------------------------------------------------------------------------------
+	
 	/**
-	 * Unregisters the {@link ExecutionListener} object for this vertex. This object
-	 * will no longer be notified about particular events during the vertex's lifetime.
+	 * Simply forward this notification. This is for logs and event archivers.
 	 * 
-	 * @param executionListener
-	 *        the object to be unregistered
+	 * @param executionId
+	 * @param newState
+	 * @param error
 	 */
-	public void unregisterExecutionListener(final ExecutionListener executionListener) {
-
-		this.executionListeners.remove(Integer.valueOf(executionListener.getPriority()));
+	void notifyStateTransition(ExecutionAttemptID executionId, ExecutionState newState, Throwable error) {
+		getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, executionId, newState, error);
 	}
-
-
-	/**
-	 * Sets the {@link ExecutionPipeline} this vertex shall be part of.
-	 * 
-	 * @param executionPipeline
-	 *        the execution pipeline this vertex shall be part of
-	 */
-	void setExecutionPipeline(final ExecutionPipeline executionPipeline) {
-
-		final ExecutionPipeline oldPipeline = this.executionPipeline.getAndSet(executionPipeline);
-		if (oldPipeline != null) {
-			oldPipeline.removeFromPipeline(this);
+	
+	TaskDeploymentDescriptor createDeploymentDescriptor(ExecutionAttemptID executionId, AllocatedSlot slot) {
+		//  create the input gate deployment descriptors
+		List<GateDeploymentDescriptor> inputGates = new ArrayList<GateDeploymentDescriptor>(inputEdges.length);
+		for (ExecutionEdge[] channels : inputEdges) {
+			inputGates.add(GateDeploymentDescriptor.fromEdges(channels));
 		}
-
-		executionPipeline.addToPipeline(this);
+		
+		// create the output gate deployment descriptors
+		List<GateDeploymentDescriptor> outputGates = new ArrayList<GateDeploymentDescriptor>(resultPartitions.length);
+		for (IntermediateResultPartition partition : resultPartitions) {
+			for (List<ExecutionEdge> channels : partition.getConsumers()) {
+				outputGates.add(GateDeploymentDescriptor.fromEdges(channels));
+			}
+		}
+		
+		String[] jarFiles = getExecutionGraph().getUserCodeJarFiles();
+		
+		return new TaskDeploymentDescriptor(getJobId(), getJobvertexId(), executionId, getTaskName(), 
+				subTaskIndex, getTotalNumberOfParallelSubtasks(), 
+				getExecutionGraph().getJobConfiguration(), jobVertex.getJobVertex().getConfiguration(),
+				jobVertex.getJobVertex().getInvokableClassName(), outputGates, inputGates, jarFiles, slot.getSlotNumber());
 	}
-
-	/**
-	 * Returns the {@link ExecutionPipeline} this vertex is part of.
-	 * 
-	 * @return the execution pipeline this vertex is part of
-	 */
-	public ExecutionPipeline getExecutionPipeline() {
-
-		return this.executionPipeline.get();
+	
+	
+	// --------------------------------------------------------------------------------------------
+	//  Utilities
+	// --------------------------------------------------------------------------------------------
+	
+	public void execute(Runnable action) {
+		this.jobVertex.execute(action);
 	}
-
+	
 	/**
-	 * Constructs a new task deployment descriptor for this vertex.
+	 * Creates a simple name representation in the style 'taskname (x/y)', where
+	 * 'taskname' is the name as returned by {@link #getTaskName()}, 'x' is the parallel
+	 * subtask index as returned by {@link #getParallelSubtaskIndex()}{@code + 1}, and 'y' is the total
+	 * number of tasks, as returned by {@link #getTotalNumberOfParallelSubtasks()}.
 	 * 
-	 * @return a new task deployment descriptor for this vertex
+	 * @return A simple name representation.
 	 */
-	public TaskDeploymentDescriptor constructDeploymentDescriptor() {
-
-		final SerializableArrayList<GateDeploymentDescriptor> ogd = new SerializableArrayList<GateDeploymentDescriptor>(
-			this.outputGates.length);
-		for (int i = 0; i < this.outputGates.length; ++i) {
-
-			final ExecutionGate eg = this.outputGates[i];
-			final List<ChannelDeploymentDescriptor> cdd = new ArrayList<ChannelDeploymentDescriptor>(
-				eg.getNumberOfEdges());
-			final int numberOfOutputChannels = eg.getNumberOfEdges();
-			for (int j = 0; j < numberOfOutputChannels; ++j) {
-
-				final ExecutionEdge ee = eg.getEdge(j);
-				cdd.add(new ChannelDeploymentDescriptor(ee.getOutputChannelID(), ee.getInputChannelID()));
-			}
-
-			ogd.add(new GateDeploymentDescriptor(eg.getGateID(), eg.getChannelType(), cdd));
-		}
-
-		final SerializableArrayList<GateDeploymentDescriptor> igd = new SerializableArrayList<GateDeploymentDescriptor>(
-			this.inputGates.length);
-		for (int i = 0; i < this.inputGates.length; ++i) {
-
-			final ExecutionGate eg = this.inputGates[i];
-			final List<ChannelDeploymentDescriptor> cdd = new ArrayList<ChannelDeploymentDescriptor>(
-				eg.getNumberOfEdges());
-			final int numberOfInputChannels = eg.getNumberOfEdges();
-			for (int j = 0; j < numberOfInputChannels; ++j) {
-
-				final ExecutionEdge ee = eg.getEdge(j);
-				cdd.add(new ChannelDeploymentDescriptor(ee.getOutputChannelID(), ee.getInputChannelID()));
-			}
-
-			igd.add(new GateDeploymentDescriptor(eg.getGateID(), eg.getChannelType(), cdd));
-		}
-
-		final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(this.executionGraph.getJobID(),
-			this.vertexID, this.groupVertex.getName(), this.indexInVertexGroup,
-			this.groupVertex.getCurrentNumberOfGroupMembers(), this.executionGraph.getJobConfiguration(),
-			this.groupVertex.getConfiguration(), this.groupVertex.getInvokableClass(), ogd,
-			igd);
-
-		return tdd;
+	public String getSimpleName() {
+		return getTaskName() + " (" + (getParallelSubtaskIndex()+1) + '/' + getTotalNumberOfParallelSubtasks() + ')';
 	}
 	
-	public void handleException(Throwable t) {
-		
+	@Override
+	public String toString() {
+		return getSimpleName();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
deleted file mode 100644
index 787605d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
+++ /dev/null
@@ -1,710 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-import org.apache.commons.logging.Log;
-import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.ExecutionState2;
-import org.apache.flink.runtime.instance.AllocatedSlot;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobEdge;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
-import org.apache.flink.runtime.taskmanager.TaskOperationResult;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-
-import static org.apache.flink.runtime.execution.ExecutionState2.*;
-
-/**
- * 
- * NOTE ABOUT THE DESIGN RATIONAL:
- * 
- * In several points of the code, we need to deal with possible concurrent state changes and actions.
- * For example, while the call to deploy a task (send it to the TaskManager) happens, the task gets cancelled.
- * 
- * We could lock the entire portion of the code (decision to deploy, deploy, set state to running) such that
- * it is guaranteed that any "cancel command" will only pick up after deployment is done and that the "cancel
- * command" call will never overtake the deploying call.
- * 
- * This blocks the threads big time, because the remote calls may take long. Depending of their locking behavior, it
- * may even result in distributed deadlocks (unless carefully avoided). We therefore use atomic state updates and
- * occasional double-checking to ensure that the state after a completed call is as expected, and trigger correcting
- * actions if it is not. Many actions are also idempotent (like canceling).
- */
-public class ExecutionVertex2 {
-	
-	private static final AtomicReferenceFieldUpdater<ExecutionVertex2, ExecutionState2> STATE_UPDATER =
-			AtomicReferenceFieldUpdater.newUpdater(ExecutionVertex2.class, ExecutionState2.class, "state");
-	
-	private static final AtomicReferenceFieldUpdater<ExecutionVertex2, ExecutionAttempt> ATTEMPT_UPDATER =
-			AtomicReferenceFieldUpdater.newUpdater(ExecutionVertex2.class, ExecutionAttempt.class, "currentOrLastAttempt");
-
-	private static final Logger LOG = ExecutionGraph.LOG;
-	
-	private static final int NUM_CANCEL_CALL_TRIES = 3;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private final ExecutionJobVertex jobVertex;
-	
-	private final IntermediateResultPartition[] resultPartitions;
-	
-	private final ExecutionEdge2[][] inputEdges;
-	
-	private final int subTaskIndex;
-	
-	private final long[] stateTimestamps;
-	
-	private final List<ExecutionAttempt> priorAttempts;
-	
-	private volatile ExecutionAttempt currentOrLastAttempt;
-	
-	private volatile ExecutionState2 state = CREATED;
-	
-	
-	public ExecutionVertex2(ExecutionJobVertex jobVertex, int subTaskIndex, IntermediateResult[] producedDataSets) {
-		this.jobVertex = jobVertex;
-		this.subTaskIndex = subTaskIndex;
-		
-		this.resultPartitions = new IntermediateResultPartition[producedDataSets.length];
-		for (int i = 0; i < producedDataSets.length; i++) {
-			IntermediateResultPartition irp = new IntermediateResultPartition(producedDataSets[i], this, subTaskIndex);
-			this.resultPartitions[i] = irp;
-			producedDataSets[i].setPartition(subTaskIndex, irp);
-		}
-		
-		this.inputEdges = new ExecutionEdge2[jobVertex.getJobVertex().getInputs().size()][];
-		
-		this.stateTimestamps = new long[ExecutionState2.values().length];
-		this.priorAttempts = new CopyOnWriteArrayList<ExecutionAttempt>();
-	}
-	
-	
-	// --------------------------------------------------------------------------------------------
-	//  Properties
-	// --------------------------------------------------------------------------------------------
-	
-	public JobID getJobId() {
-		return this.jobVertex.getJobId();
-	}
-	
-	public JobVertexID getJobvertexId() {
-		return this.jobVertex.getJobVertexId();
-	}
-	
-	public String getTaskName() {
-		return this.jobVertex.getJobVertex().getName();
-	}
-	
-	public int getTotalNumberOfParallelSubtasks() {
-		return this.jobVertex.getParallelism();
-	}
-	
-	public int getParallelSubtaskIndex() {
-		return this.subTaskIndex;
-	}
-	
-	public int getNumberOfInputs() {
-		return this.inputEdges.length;
-	}
-	
-	public ExecutionEdge2[] getInputEdges(int input) {
-		if (input < 0 || input >= this.inputEdges.length) {
-			throw new IllegalArgumentException(String.format("Input %d is out of range [0..%d)", input, this.inputEdges.length));
-		}
-		return inputEdges[input];
-	}
-	
-	public ExecutionState2 getExecutionState() {
-		return state;
-	}
-	
-	public long getStateTimestamp(ExecutionState2 state) {
-		return this.stateTimestamps[state.ordinal()];
-	}
-	
-	private ExecutionGraph getExecutionGraph() {
-		return this.jobVertex.getGraph();
-	}
-	
-	public Throwable getLastFailureCause() {
-		// copy reference to the stack
-		ExecutionAttempt attempt = this.currentOrLastAttempt;
-		if (attempt != null) {
-			return attempt.getFailureCause();
-		}
-		else if (priorAttempts.size() > 0) {
-			// since the list is append-only, this always works in the presence of concurrent modifications
-			return priorAttempts.get(priorAttempts.size() - 1).getFailureCause();
-		}
-		else {
-			return null;
-		}
-	}
-	
-	public AllocatedSlot getCurrentAssignedResource() {
-		// copy reference to the stack
-		ExecutionAttempt attempt = this.currentOrLastAttempt;
-		return attempt == null ? null : attempt.getAssignedResource();
-	}
-	
-	public AllocatedSlot getLastAssignedResource() {
-		// copy reference to the stack
-		ExecutionAttempt attempt = this.currentOrLastAttempt;
-		if (attempt != null) {
-			return attempt.getAssignedResource();
-		}
-		else if (priorAttempts.size() > 0) {
-			// since the list is append-only, this always works in the presence of concurrent modifications
-			return priorAttempts.get(priorAttempts.size() - 1).getAssignedResource();
-		}
-		else {
-			return null;
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Graph building
-	// --------------------------------------------------------------------------------------------
-	
-	public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {
-		
-		final DistributionPattern pattern = edge.getDistributionPattern();
-		final IntermediateResultPartition[] sourcePartitions = source.getPartitions();
-		
-		ExecutionEdge2[] edges = null;
-		
-		switch (pattern) {
-			case POINTWISE:
-				edges = connectPointwise(sourcePartitions, inputNumber);
-				break;
-				
-			case BIPARTITE: 
-				edges = connectAllToAll(sourcePartitions, inputNumber);
-				break;
-				
-			default:
-				throw new RuntimeException("Unrecognized distribution pattern.");
-		
-		}
-		
-		this.inputEdges[inputNumber] = edges;
-		
-		// add the cousumers to the source
-		for (ExecutionEdge2 ee : edges) {
-			ee.getSource().addConsumer(ee, consumerNumber);
-		}
-	}
-	
-	private ExecutionEdge2[] connectAllToAll(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
-		ExecutionEdge2[] edges = new ExecutionEdge2[sourcePartitions.length];
-		
-		for (int i = 0; i < sourcePartitions.length; i++) {
-			IntermediateResultPartition irp = sourcePartitions[i];
-			edges[i] = new ExecutionEdge2(irp, this, inputNumber);
-		}
-		
-		return edges;
-	}
-	
-	private ExecutionEdge2[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
-		final int numSources = sourcePartitions.length;
-		final int parallelism = getTotalNumberOfParallelSubtasks();
-		
-		// simple case same number of sources as targets
-		if (numSources == parallelism) {
-			return new ExecutionEdge2[] { new ExecutionEdge2(sourcePartitions[subTaskIndex], this, inputNumber) };
-		}
-		else if (numSources < parallelism) {
-			
-			int sourcePartition;
-			
-			// check if the pattern is regular or irregular
-			// we use int arithmetics for regular, and floating point with rounding for irregular
-			if (parallelism % numSources == 0) {
-				// same number of targets per source
-				int factor = parallelism / numSources;
-				sourcePartition = subTaskIndex / factor;
-			}
-			else {
-				// different number of targets per source
-				float factor = ((float) parallelism) / numSources;
-				sourcePartition = (int) (subTaskIndex / factor);
-			}
-			
-			return new ExecutionEdge2[] { new ExecutionEdge2(sourcePartitions[sourcePartition], this, inputNumber) };
-		}
-		else {
-			if (numSources % parallelism == 0) {
-				// same number of targets per source
-				int factor = numSources / parallelism;
-				int startIndex = subTaskIndex * factor;
-				
-				ExecutionEdge2[] edges = new ExecutionEdge2[factor];
-				for (int i = 0; i < factor; i++) {
-					edges[i] = new ExecutionEdge2(sourcePartitions[startIndex + i], this, inputNumber);
-				}
-				return edges;
-			}
-			else {
-				float factor = ((float) numSources) / parallelism;
-				
-				int start = (int) (subTaskIndex * factor);
-				int end = (subTaskIndex == getTotalNumberOfParallelSubtasks() - 1) ?
-						sourcePartitions.length : 
-						(int) ((subTaskIndex + 1) * factor);
-				
-				ExecutionEdge2[] edges = new ExecutionEdge2[end - start];
-				for (int i = 0; i < edges.length; i++) {
-					edges[i] = new ExecutionEdge2(sourcePartitions[start + i], this, inputNumber);
-				}
-				
-				return edges;
-			}
-		}
-	}
-
-	
-	// --------------------------------------------------------------------------------------------
-	//  Scheduling
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * NOTE: This method only throws exceptions if it is in an illegal state to be scheduled, or if the tasks needs
-	 *       to be scheduled immediately and no resource is available. If the task is accepted by the schedule, any
-	 *       error sets the vertex state to failed and triggers the recovery logic.
-	 * 
-	 * @param scheduler
-	 * 
-	 * @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling.
-	 * @throws NoResourceAvailableException Thrown is no queued scheduling is allowed and no resources are currently available.
-	 */
-	public void scheduleForExecution(DefaultScheduler scheduler) throws NoResourceAvailableException {
-		if (scheduler == null) {
-			throw new NullPointerException();
-		}
-		
-		if (STATE_UPDATER.compareAndSet(this, CREATED, SCHEDULED)) {
-			
-			getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, SCHEDULED, null);
-			
-			ScheduledUnit toSchedule = new ScheduledUnit(this, jobVertex.getSlotSharingGroup());
-		
-			// IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned
-			//     in all cases where the deployment failed. we use many try {} finally {} clauses to assure that
-			
-			boolean queued = jobVertex.getGraph().isQueuedSchedulingAllowed();
-			if (queued) {
-				SlotAllocationFuture future = scheduler.scheduleQueued(toSchedule);
-				
-				future.setFutureAction(new SlotAllocationFutureAction() {
-					@Override
-					public void slotAllocated(AllocatedSlot slot) {
-						try {
-							deployToSlot(slot);
-						}
-						catch (Throwable t) {
-							try {
-								slot.releaseSlot();
-							} finally {
-								fail(t);
-							}
-						}
-					}
-				});
-			}
-			else {
-				AllocatedSlot slot = scheduler.scheduleImmediately(toSchedule);
-				try {
-					deployToSlot(slot);
-				}
-				catch (Throwable t) {
-					try {
-						slot.releaseSlot();
-					} finally {
-						fail(t);
-					}
-				}
-			}
-		}
-		else if (this.state == CANCELED) {
-			// this can occur very rarely through heavy races. if the task was canceled, we do not
-			// schedule it
-			return;
-		}
-		else {
-			throw new IllegalStateException("The vertex must be in CREATED state to be scheduled.");
-		}
-	}
-	
-
-	public void deployToSlot(final AllocatedSlot slot) throws JobException {
-		// sanity checks
-		if (slot == null) {
-			throw new NullPointerException();
-		}
-		if (!slot.isAlive()) {
-			throw new IllegalArgumentException("Cannot deploy to a slot that is not alive.");
-		}
-		
-		// make sure exactly one deployment call happens from the correct state
-		// note: the transition from CREATED to DEPLOYING is for testing purposes only
-		ExecutionState2 previous = this.state;
-		if (previous == SCHEDULED || previous == CREATED) {
-			if (!STATE_UPDATER.compareAndSet(this, previous, DEPLOYING)) {
-				// race condition, someone else beat us to the deploying call.
-				// this should actually not happen and indicates a race somewhere else
-				throw new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");
-			}
-			
-			getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, DEPLOYING, null);
-		}
-		else {
-			// vertex may have been cancelled, or it was already scheduled
-			throw new IllegalStateException("The vertex must be in CREATED or SCHEDULED state to be deployed. Found state " + previous);
-		}
-		
-		// good, we are allowed to deploy
-		if (!slot.setExecutedVertex(this)) {
-			throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
-		}
-		setAssignedSlot(slot);
-		
-		
-		final TaskDeploymentDescriptor deployment = createDeploymentDescriptor();
-		
-		// we execute the actual deploy call in a concurrent action to prevent this call from blocking for long
-		Runnable deployaction = new Runnable() {
-
-			@Override
-			public void run() {
-				try {
-					Instance instance = slot.getInstance();
-					instance.checkLibraryAvailability(getJobId());
-					
-					TaskOperationResult result = instance.getTaskManagerProxy().submitTask(deployment);
-					if (result.isSuccess()) {
-						switchToRunning();
-					}
-					else {
-						// deployment failed :(
-						fail(new Exception("Failed to deploy the tast to slot " + slot + ": " + result.getDescription()));
-					}
-				}
-				catch (Throwable t) {
-					// some error occurred. fail the task
-					fail(t);
-				}
-			}
-		};
-		
-		execute(deployaction);
-	}
-	
-	private void switchToRunning() {
-		
-		// transition state
-		if (STATE_UPDATER.compareAndSet(ExecutionVertex2.this, DEPLOYING, RUNNING)) {
-			
-			getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, RUNNING, null);
-			
-			this.jobVertex.vertexSwitchedToRunning(subTaskIndex);
-		}
-		else {
-			// something happened while the call was in progress.
-			// typically, that means canceling while deployment was in progress
-			
-			ExecutionState2 currentState = ExecutionVertex2.this.state;
-			
-			if (currentState == CANCELING) {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug(String.format("Concurrent canceling of task %s while deployment was in progress.", ExecutionVertex2.this.toString()));
-				}
-				
-				sendCancelRpcCall();
-			} else {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug(String.format("Concurrent unexpected state transition of task %s while deployment was in progress.", ExecutionVertex2.this.toString()));
-				}
-				
-				// undo the deployment
-				sendCancelRpcCall();
-				
-				// record the failure
-				fail(new Exception("Asynchronous state error. Execution Vertex switched to " + currentState + " while deployment was in progress."));
-			}
-		}
-	}
-	
-	public void cancel() {
-		// depending on the previous state, we go directly to cancelled (no cancel call necessary)
-		// -- or to canceling (cancel call needs to be sent to the task manager)
-		
-		// because of several possibly previous states, we need to again loop until we make a
-		// successful atomic state transition
-		while (true) {
-			
-			ExecutionState2 current = this.state;
-			
-			if (current == CANCELING || current == CANCELED) {
-				// already taken care of, no need to cancel again
-				return;
-			}
-				
-			// these two are the common cases where we need to send a cancel call
-			else if (current == RUNNING || current == DEPLOYING) {
-				// try to transition to canceling, if successful, send the cancel call
-				if (STATE_UPDATER.compareAndSet(this, current, CANCELING)) {
-					
-					getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, CANCELING, null);
-					
-					sendCancelRpcCall();
-					return;
-				}
-				// else: fall through the loop
-			}
-			
-			else if (current == FINISHED || current == FAILED) {
-				// nothing to do any more. finished failed before it could be cancelled.
-				// in any case, the task is removed from the TaskManager already
-				return;
-			}
-			else if (current == CREATED || current == SCHEDULED) {
-				// from here, we can directly switch to cancelled, because the no task has been deployed
-				if (STATE_UPDATER.compareAndSet(this, current, CANCELED)) {
-					
-					getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, CANCELED, null);
-					
-					return;
-				}
-				// else: fall through the loop
-			}
-			else {
-				throw new IllegalStateException(current.name());
-			}
-		}
-	}
-	
-	public void fail(Throwable t) {
-		
-		// damn, we failed. This means only that we keep our books and notify our parent JobExecutionVertex
-		// the actual computation on the task manager is cleaned up by the taskmanager that noticed the failure
-		
-		// we may need to loop multiple times (in the presence of concurrent calls) in order to
-		// atomically switch to failed 
-		while (true) {
-			ExecutionState2 current = this.state;
-			
-			if (current == FAILED) {
-				// concurrently set to failed. It is enough to remember once that we failed (its sad enough)
-				return;
-			}
-			
-			if (current == CANCELED) {
-				// we already aborting
-				if (LOG.isDebugEnabled()) {
-					LOG.debug(String.format("Ignoring transition of vertex %s to %s while being %s",
-							getSimpleName(), FAILED, current));
-				}
-				return;
-			}
-			
-			// we should be in DEPLOYING or RUNNING when a regular failure happens
-			if (current != DEPLOYING && current != RUNNING && current != CANCELING) {
-				// this should not happen. still, what else to do but to comply and go to the FAILED state
-				// at least we should complain loudly to the log
-				LOG.error(String.format("Vertex %s unexpectedly went from state %s to %s with error: %s",
-						getSimpleName(), CREATED, FAILED, t.getMessage()), t);
-			}
-			
-			if (STATE_UPDATER.compareAndSet(this, current, FAILED)) {
-				// success (in a manner of speaking)
-				this.failureCause = t;
-				
-				getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, FAILED, StringUtils.stringifyException(t));
-				
-				// release the slot (concurrency safe)
-				setAssignedSlot(null);
-				
-				this.jobVertex.vertexFailed(subTaskIndex);
-				
-				// leave the loop
-				return;
-			}
-		}
-	}
-	
-	private void sendCancelRpcCall() {
-		// first of all, copy a reference to the stack. any concurrent change to the
-		// field does not affect us now
-		final AllocatedSlot slot = this.assignedSlot;
-		if (slot == null) {
-			throw new IllegalStateException("Cannot cancel when task was not running or deployed.");
-		}
-		
-		Runnable cancelAction = new Runnable() {
-			
-			@Override
-			public void run() {
-				Throwable exception = null;
-				
-				for (int triesLeft = NUM_CANCEL_CALL_TRIES; triesLeft > 0; --triesLeft) {
-					
-					try {
-						// send the call. it may be that the task is not really there (asynchronous / overtaking messages)
-						// in which case it is fine (the deployer catches it)
-						TaskOperationResult result = slot.getInstance().getTaskManagerProxy().cancelTask(getJobvertexId(), subTaskIndex);
-						
-						if (result.isSuccess()) {
-							
-							// make sure that we release the slot
-							try {
-								// found and canceled
-								if (STATE_UPDATER.compareAndSet(ExecutionVertex2.this, CANCELING, CANCELED)) {
-									// we completed the call. 
-									// release the slot resource and let the parent know we have cancelled
-									ExecutionVertex2.this.jobVertex.vertexCancelled(ExecutionVertex2.this.subTaskIndex);
-								}
-								else {
-									ExecutionState2 foundState = ExecutionVertex2.this.state;
-									// failing in the meantime may happen and is no problem
-									if (foundState != FAILED) {
-										// corner case? log at least
-										LOG.error(String.format("Asynchronous race: Found state %s after successful cancel call.", foundState));
-									}
-									
-								}
-							} finally {
-								slot.releaseSlot();
-							}
-						}
-						else {
-							// the task was not found, which may be when the task concurrently finishes or fails, or
-							// when the cancel call overtakes the deployment call
-							if (LOG.isDebugEnabled()) {
-								LOG.debug("Cancel task call did not find task. Probably cause: Acceptable asynchronous race.");
-							}
-						}
-						
-						// in any case, we need not call multiple times, so we quit
-						return;
-					}
-					catch (Throwable t) {
-						if (exception == null) {
-							exception = t;
-						}
-						LOG.error("Canceling vertex " + getSimpleName() + " failed (" + triesLeft + " tries left): " + t.getMessage() , t);
-					}
-				}
-				
-				// dang, utterly unsuccessful - the target node must be down, in which case the tasks are lost anyways
-				fail(new Exception("Task could not be canceled.", exception));
-			}
-		};
-		
-		execute(cancelAction);
-	}
-	
-	public Iterable<Instance> getPreferredLocations() {
-		return null;
-	}
-	
-	private void setAssignedSlot(AllocatedSlot slot) {
-		
-		while (true) {
-			AllocatedSlot previous = this.assignedSlot;
-			if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, previous, slot)) {
-				// successfully swapped
-				// release the predecessor, if it was not null. this call is idempotent, so it does not matter if it is
-				// called more than once
-				try {
-					if (previous != null) {
-						previous.releaseSlot();
-					}
-				} catch (Throwable t) {
-					LOG.debug("Error releasing slot " + slot, t);
-				}
-				return;
-			}
-		}
-	}
-	
-	
-	private TaskDeploymentDescriptor createDeploymentDescriptor() {
-		//  create the input gate deployment descriptors
-		List<GateDeploymentDescriptor> inputGates = new ArrayList<GateDeploymentDescriptor>(inputEdges.length);
-		for (ExecutionEdge2[] channels : inputEdges) {
-			inputGates.add(GateDeploymentDescriptor.fromEdges(channels));
-		}
-		
-		// create the output gate deployment descriptors
-		List<GateDeploymentDescriptor> outputGates = new ArrayList<GateDeploymentDescriptor>(resultPartitions.length);
-		for (IntermediateResultPartition partition : resultPartitions) {
-			for (List<ExecutionEdge2> channels : partition.getConsumers()) {
-				outputGates.add(GateDeploymentDescriptor.fromEdges(channels));
-			}
-		}
-		
-		String[] jarFiles = getExecutionGraph().getUserCodeJarFiles();
-		
-		return new TaskDeploymentDescriptor(getJobId(), getJobvertexId(), getTaskName(), 
-				subTaskIndex, getTotalNumberOfParallelSubtasks(), 
-				getExecutionGraph().getJobConfiguration(), jobVertex.getJobVertex().getConfiguration(),
-				jobVertex.getJobVertex().getInvokableClassName(), outputGates, inputGates, jarFiles);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Utilities
-	// --------------------------------------------------------------------------------------------
-	
-	public void execute(Runnable action) {
-		this.jobVertex.execute(action);
-	}
-	
-	/**
-	 * Creates a simple name representation in the style 'taskname (x/y)', where
-	 * 'taskname' is the name as returned by {@link #getTaskName()}, 'x' is the parallel
-	 * subtask index as returned by {@link #getParallelSubtaskIndex()}{@code + 1}, and 'y' is the total
-	 * number of tasks, as returned by {@link #getTotalNumberOfParallelSubtasks()}.
-	 * 
-	 * @return A simple name representation.
-	 */
-	public String getSimpleName() {
-		return getTaskName() + " (" + (getParallelSubtaskIndex()+1) + '/' + getTotalNumberOfParallelSubtasks() + ')';
-	}
-	
-	@Override
-	public String toString() {
-		return getSimpleName() + " [" + state + ']';
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
index b733baa..1c4e1fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
@@ -25,21 +25,21 @@ public class IntermediateResultPartition {
 	
 	private final IntermediateResult totalResut;
 	
-	private final ExecutionVertex2 producer;
+	private final ExecutionVertex producer;
 	
 	private final int partition;
 	
-	private List<List<ExecutionEdge2>> consumers;
+	private List<List<ExecutionEdge>> consumers;
 	
 	
-	public IntermediateResultPartition(IntermediateResult totalResut, ExecutionVertex2 producer, int partition) {
+	public IntermediateResultPartition(IntermediateResult totalResut, ExecutionVertex producer, int partition) {
 		this.totalResut = totalResut;
 		this.producer = producer;
 		this.partition = partition;
-		this.consumers = new ArrayList<List<ExecutionEdge2>>(0);
+		this.consumers = new ArrayList<List<ExecutionEdge>>(0);
 	}
 	
-	public ExecutionVertex2 getProducer() {
+	public ExecutionVertex getProducer() {
 		return producer;
 	}
 	
@@ -51,17 +51,23 @@ public class IntermediateResultPartition {
 		return totalResut;
 	}
 	
-	public List<List<ExecutionEdge2>> getConsumers() {
+	public List<List<ExecutionEdge>> getConsumers() {
 		return consumers;
 	}
 	
 	int addConsumerGroup() {
 		int pos = consumers.size();
-		consumers.add(new ArrayList<ExecutionEdge2>());
+		
+		// NOTE: currently we support only one consumer per result!!!
+		if (pos != 0) {
+			throw new RuntimeException("Currenty, each intermediate result can only have one consumer.");
+		}
+		
+		consumers.add(new ArrayList<ExecutionEdge>());
 		return pos;
 	}
 	
-	public void addConsumer(ExecutionEdge2 edge, int consumerNumber) {
+	void addConsumer(ExecutionEdge edge, int consumerNumber) {
 		consumers.get(consumerNumber).add(edge);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
index f3f489b..2f3fa10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.instance;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
+import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.jobgraph.JobID;
 
 /**
@@ -32,8 +32,8 @@ public class AllocatedSlot {
 	private static final AtomicIntegerFieldUpdater<AllocatedSlot> STATUS_UPDATER = 
 			AtomicIntegerFieldUpdater.newUpdater(AllocatedSlot.class, "status");
 	
-	private static final AtomicReferenceFieldUpdater<AllocatedSlot, ExecutionVertex2> VERTEX_UPDATER =
-			AtomicReferenceFieldUpdater.newUpdater(AllocatedSlot.class, ExecutionVertex2.class, "executedVertex");
+	private static final AtomicReferenceFieldUpdater<AllocatedSlot, Execution> VERTEX_UPDATER =
+			AtomicReferenceFieldUpdater.newUpdater(AllocatedSlot.class, Execution.class, "executedTask");
 	
 	private static final int ALLOCATED_AND_ALIVE = 0;		// tasks may be added and might be running
 	private static final int CANCELLED = 1;					// no more tasks may run
@@ -49,8 +49,8 @@ public class AllocatedSlot {
 	/** The number of the slot on which the task is deployed */
 	private final int slotNumber;
 	
-	/** Vertex being executed in the slot. Volatile to force a memory barrier and allow for correct double-checking */
-	private volatile ExecutionVertex2 executedVertex;
+	/** Task being executed in the slot. Volatile to force a memory barrier and allow for correct double-checking */
+	private volatile Execution executedTask;
 	
 	/** The state of the vertex, only atomically updated */
 	private volatile int status = ALLOCATED_AND_ALIVE;
@@ -85,7 +85,11 @@ public class AllocatedSlot {
 		return slotNumber;
 	}
 	
-	public boolean setExecutedVertex(ExecutionVertex2 executedVertex) {
+	public Execution getExecutedVertex() {
+		return executedTask;
+	}
+	
+	public boolean setExecutedVertex(Execution executedVertex) {
 		if (executedVertex == null) {
 			throw new NullPointerException();
 		}
@@ -102,17 +106,13 @@ public class AllocatedSlot {
 
 		// we need to do a double check that we were not cancelled in the meantime
 		if (status != ALLOCATED_AND_ALIVE) {
-			this.executedVertex = null;
+			this.executedTask = null;
 			return false;
 		}
 		
 		return true;
 	}
 	
-	public ExecutionVertex2 getExecutedVertex() {
-		return executedVertex;
-	}
-	
 	// --------------------------------------------------------------------------------------------
 	//  Status and life cycle
 	// --------------------------------------------------------------------------------------------
@@ -133,8 +133,9 @@ public class AllocatedSlot {
 	public void cancel() {
 		if (STATUS_UPDATER.compareAndSet(this, ALLOCATED_AND_ALIVE, CANCELLED)) {
 			// kill all tasks currently running in this slot
-			if (this.executedVertex != null) {
-				this.executedVertex.fail(new Exception("The slot in which the task was scheduled has been cancelled."));
+			Execution exec = this.executedTask;
+			if (exec != null && !exec.isFinished()) {
+				exec.fail(new Exception("The slot in which the task was scheduled has been cancelled."));
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 543ae86..bc30254 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -197,19 +197,21 @@ public class Instance {
 			throw new IOException("No entry of required libraries for job " + jobID);
 		}
 
-		LibraryCacheProfileRequest request = new LibraryCacheProfileRequest();
-		request.setRequiredLibraries(requiredLibraries);
-
-		// Send the request
-		LibraryCacheProfileResponse response = getTaskManagerProxy().getLibraryCacheProfile(request);
-
-		// Check response and transfer libraries if necessary
-		for (int k = 0; k < requiredLibraries.length; k++) {
-			if (!response.isCached(k)) {
-				LibraryCacheUpdate update = new LibraryCacheUpdate(requiredLibraries[k]);
-				getTaskManagerProxy().updateLibraryCache(update);
+//		if (requiredLibraries.length > 0) {
+			LibraryCacheProfileRequest request = new LibraryCacheProfileRequest();
+			request.setRequiredLibraries(requiredLibraries);
+	
+			// Send the request
+			LibraryCacheProfileResponse response = getTaskManagerProxy().getLibraryCacheProfile(request);
+	
+			// Check response and transfer libraries if necessary
+			for (int k = 0; k < requiredLibraries.length; k++) {
+				if (!response.isCached(k)) {
+					LibraryCacheUpdate update = new LibraryCacheUpdate(requiredLibraries[k]);
+					getTaskManagerProxy().updateLibraryCache(update);
+				}
 			}
-		}
+//		}
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
index ec63c00..c17e631 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
@@ -23,9 +23,9 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.io.StringRecord;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.StringUtils;
 
 /**
  * This class encapsulates all connection information necessary to connect to the instance's task manager.
@@ -154,9 +154,7 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 		this.ipcPort = in.readInt();
 		this.dataPort = in.readInt();
 		
-		if (in.readBoolean()) {
-			this.hostName = StringRecord.readString(in);
-		}
+		this.hostName = StringUtils.readNullableString(in);
 
 		try {
 			this.inetAddress = InetAddress.getByAddress(address);
@@ -174,12 +172,7 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 		out.writeInt(this.ipcPort);
 		out.writeInt(this.dataPort);
 		
-		if (this.hostName != null) {
-			out.writeBoolean(true);
-			StringRecord.writeString(out, this.hostName);
-		} else {
-			out.writeBoolean(false);
-		}
+		StringUtils.writeNullableString(hostName, out);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
index 5571ccb..3066bb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
@@ -110,6 +110,10 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 		this.globalBufferPool.destroy();
 	}
 
+	public GlobalBufferPool getGlobalBufferPool() {
+		return globalBufferPool;
+	}
+	
 	// -----------------------------------------------------------------------------------------------------------------
 	//                                               Task registration
 	// -----------------------------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
index 23d1205..cdf9b87 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.network;
 
 import java.io.IOException;
@@ -28,18 +27,17 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 /**
- * Objects of this class uniquely identify a connection to a remote {@link TaskManager}.
- * 
+ * Objects of this class uniquely identify a connection to a remote {@link org.apache.flink.runtime.taskmanager.TaskManager}.
  */
 public final class RemoteReceiver implements IOReadableWritable {
 
 	/**
-	 * The address of the connection to the remote {@link TaskManager}.
+	 * The address of the connection to the remote TaskManager.
 	 */
 	private InetSocketAddress connectionAddress;
 
 	/**
-	 * The index of the connection to the remote {@link TaskManager}.
+	 * The index of the connection to the remote TaskManager.
 	 */
 	private int connectionIndex;
 
@@ -47,9 +45,9 @@ public final class RemoteReceiver implements IOReadableWritable {
 	 * Constructs a new remote receiver object.
 	 * 
 	 * @param connectionAddress
-	 *        the address of the connection to the remote {@link TaskManager}
+	 *        the address of the connection to the remote TaskManager
 	 * @param connectionIndex
-	 *        the index of the connection to the remote {@link TaskManager}
+	 *        the index of the connection to the remote TaskManager
 	 */
 	public RemoteReceiver(final InetSocketAddress connectionAddress, final int connectionIndex) {
 		if (connectionAddress == null) {
@@ -72,18 +70,18 @@ public final class RemoteReceiver implements IOReadableWritable {
 	}
 
 	/**
-	 * Returns the address of the connection to the remote {@link TaskManager}.
+	 * Returns the address of the connection to the remote TaskManager.
 	 * 
-	 * @return the address of the connection to the remote {@link TaskManager}
+	 * @return the address of the connection to the remote TaskManager
 	 */
 	public InetSocketAddress getConnectionAddress() {
 		return this.connectionAddress;
 	}
 
 	/**
-	 * Returns the index of the connection to the remote {@link TaskManager}.
+	 * Returns the index of the connection to the remote TaskManager.
 	 * 
-	 * @return the index of the connection to the remote {@link TaskManager}
+	 * @return the index of the connection to the remote TaskManager
 	 */
 	public int getConnectionIndex() {
 		return this.connectionIndex;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableReader.java
index 6f494eb..5a0fbb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableReader.java
@@ -16,23 +16,13 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.network.api;
 
 import java.io.IOException;
 
 import org.apache.flink.core.io.IOReadableWritable;
 
-/**
- * 
- */
 public interface MutableReader<T extends IOReadableWritable> extends ReaderBase {
 	
-	/**
-	 * @param target
-	 * @return
-	 * @throws IOException
-	 * @throws InterruptedException
-	 */
 	boolean next(T target) throws IOException, InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrier.java
index 27658f1..1fecdbd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrier.java
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.iterative.concurrent;
 import java.util.concurrent.CountDownLatch;
 
 /**
- * Resettable barrier to synchronize {@link IterationHeadPactTask} and {@link IterationTailPactTask} in case of
+ * Resettable barrier to synchronize the
+ * {@link org.apache.flink.runtime.iterative.task.IterationHeadPactTask} and
+ * the {@link org.apache.flink.runtime.iterative.task.IterationTailPactTask} in case of
  * iterations that contain a separate solution set tail.
  */
 public class SolutionSetUpdateBarrier {