You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/26 11:46:32 UTC

[07/53] [abbrv] Rework the Taskmanager to a slot based model and remove legacy cloud code

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
index 6e25796..8a3cba4 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
@@ -16,13 +16,14 @@ package eu.stratosphere.nephele.jobmanager;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import eu.stratosphere.nephele.ExecutionMode;
 import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
 import eu.stratosphere.nephele.taskmanager.TaskKillResult;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
@@ -68,14 +70,11 @@ import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
 import eu.stratosphere.nephele.executiongraph.GraphConversionException;
 import eu.stratosphere.nephele.executiongraph.InternalJobStatus;
 import eu.stratosphere.nephele.executiongraph.JobStatusListener;
-import eu.stratosphere.nephele.instance.AbstractInstance;
 import eu.stratosphere.nephele.instance.DummyInstance;
 import eu.stratosphere.nephele.instance.HardwareDescription;
+import eu.stratosphere.nephele.instance.Instance;
 import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
 import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeDescription;
-import eu.stratosphere.nephele.instance.local.LocalInstanceManager;
 import eu.stratosphere.runtime.io.channels.ChannelID;
 import eu.stratosphere.nephele.ipc.RPC;
 import eu.stratosphere.nephele.ipc.Server;
@@ -85,7 +84,7 @@ import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.jobmanager.accumulators.AccumulatorManager;
 import eu.stratosphere.nephele.jobmanager.archive.ArchiveListener;
 import eu.stratosphere.nephele.jobmanager.archive.MemoryArchivist;
-import eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler;
+import eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler;
 import eu.stratosphere.nephele.jobmanager.scheduler.SchedulingException;
 import eu.stratosphere.nephele.jobmanager.splitassigner.InputSplitManager;
 import eu.stratosphere.nephele.jobmanager.splitassigner.InputSplitWrapper;
@@ -106,6 +105,7 @@ import eu.stratosphere.nephele.taskmanager.TaskSubmissionResult;
 import eu.stratosphere.runtime.io.network.ConnectionInfoLookupResponse;
 import eu.stratosphere.runtime.io.network.RemoteReceiver;
 import eu.stratosphere.nephele.taskmanager.ExecutorThreadFactory;
+import eu.stratosphere.nephele.taskmanager.transferenvelope.RegisterTaskManagerResult;
 import eu.stratosphere.nephele.topology.NetworkTopology;
 import eu.stratosphere.nephele.types.IntegerRecord;
 import eu.stratosphere.nephele.util.SerializableArrayList;
@@ -135,7 +135,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 
 	private final InputSplitManager inputSplitManager;
 
-	private final AbstractScheduler scheduler;
+	private final DefaultScheduler scheduler;
 	
 	private AccumulatorManager accumulatorManager;
 
@@ -213,20 +213,11 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 		LOG.info("Starting job manager in " + executionMode + " mode");
 
 		// Try to load the instance manager for the given execution mode
-		// Try to load the scheduler for the given execution mode
-		if (executionMode == ExecutionMode.LOCAL) {
-			try {
-				this.instanceManager = new LocalInstanceManager();
-			} catch (Throwable t) {
-				throw new Exception("Cannot instantiate local instance manager: " + t.getMessage(), t);
-			}
-		} else {
-			final String instanceManagerClassName = JobManagerUtils.getInstanceManagerClassName(executionMode);
-			LOG.info("Trying to load " + instanceManagerClassName + " as instance manager");
-			this.instanceManager = JobManagerUtils.loadInstanceManager(instanceManagerClassName);
-			if (this.instanceManager == null) {
-				throw new Exception("Unable to load instance manager " + instanceManagerClassName);
-			}
+		final String instanceManagerClassName = JobManagerUtils.getInstanceManagerClassName(executionMode);
+		LOG.info("Trying to load " + instanceManagerClassName + " as instance manager");
+		this.instanceManager = JobManagerUtils.loadInstanceManager(instanceManagerClassName);
+		if (this.instanceManager == null) {
+			throw new Exception("Unable to load instance manager " + instanceManagerClassName);
 		}
 
 		// Try to load the scheduler for the given execution mode
@@ -479,7 +470,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 			ExecutionGraph eg;
 	
 			try {
-				eg = new ExecutionGraph(job, this.instanceManager);
+				eg = new ExecutionGraph(job, this.getAvailableSlots());
 			} catch (GraphConversionException e) {
 				if (e.getCause() == null) {
 					return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(e));
@@ -520,7 +511,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 			}
 	
 			try {
-				this.scheduler.schedulJob(eg);
+				this.scheduler.scheduleJob(eg);
 			} catch (SchedulingException e) {
 				unregisterJob(eg);
 				JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(e));
@@ -561,10 +552,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 			}
 		}
 
-		// Cancel all pending requests for instances
-		this.instanceManager.cancelPendingRequests(executionGraph.getJobID()); // getJobID is final member, no
-																				// synchronization necessary
-
 		// Remove job from input split manager
 		if (this.inputSplitManager != null) {
 			this.inputSplitManager.unregisterJob(executionGraph);
@@ -582,8 +569,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 
 
 	@Override
-	public void sendHeartbeat(final InstanceConnectionInfo instanceConnectionInfo,
-			final HardwareDescription hardwareDescription) {
+	public void sendHeartbeat(final InstanceConnectionInfo instanceConnectionInfo) {
 
 		// Delegate call to instance manager
 		if (this.instanceManager != null) {
@@ -592,7 +578,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 
 				@Override
 				public void run() {
-					instanceManager.reportHeartBeat(instanceConnectionInfo, hardwareDescription);
+					instanceManager.reportHeartBeat(instanceConnectionInfo);
 				}
 			};
 
@@ -600,6 +586,25 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 		}
 	}
 
+	@Override
+	public RegisterTaskManagerResult registerTaskManager(final InstanceConnectionInfo instanceConnectionInfo,
+									final HardwareDescription hardwareDescription, final IntegerRecord numberOfSlots){
+		if(this.instanceManager != null) {
+			final Runnable registerTaskManagerRunnable = new Runnable() {
+				@Override
+				public void run(){
+					instanceManager.registerTaskManager(instanceConnectionInfo, hardwareDescription,
+							numberOfSlots.getValue());
+				}
+			};
+
+			this.executorService.execute(registerTaskManagerRunnable);
+			return new RegisterTaskManagerResult(RegisterTaskManagerResult.ReturnCode.SUCCESS);
+		}
+
+		return new RegisterTaskManagerResult(RegisterTaskManagerResult.ReturnCode.FAILURE);
+	}
+
 
 	@Override
 	public void updateTaskExecutionState(final TaskExecutionState executionState) throws IOException {
@@ -730,9 +735,10 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 
 		if (sourceChannelID.equals(edge.getInputChannelID())) {
 			// Request was sent from an input channel
+
 			final ExecutionVertex connectedVertex = edge.getOutputGate().getVertex();
 
-			final AbstractInstance assignedInstance = connectedVertex.getAllocatedResource().getInstance();
+			final Instance assignedInstance = connectedVertex.getAllocatedResource().getInstance();
 			if (assignedInstance == null) {
 				LOG.error("Cannot resolve lookup: vertex found for channel ID " + edge.getOutputGateIndex()
 					+ " but no instance assigned");
@@ -758,6 +764,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 				return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getOutputChannelID());
 			} else {
 				// Receiver runs on a different task manager
+
 				final InstanceConnectionInfo ici = assignedInstance.getInstanceConnectionInfo();
 				final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
 
@@ -788,7 +795,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 			return ConnectionInfoLookupResponse.createReceiverNotReady();
 		}
 
-		final AbstractInstance assignedInstance = targetVertex.getAllocatedResource().getInstance();
+		final Instance assignedInstance = targetVertex.getAllocatedResource().getInstance();
 		if (assignedInstance == null) {
 			LOG.error("Cannot resolve lookup: vertex found for channel ID " + edge.getInputChannelID() + " but no instance assigned");
 			// LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 4");
@@ -877,6 +884,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 		return eventList;
 	}
 
+
 	@Override
 	public void killTask(final JobID jobID, final ManagementVertexID id) throws IOException {
 
@@ -909,10 +917,11 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 		eg.executeCommand(runnable);
 	}
 
+
 	@Override
 	public void killInstance(final StringRecord instanceName) throws IOException {
 
-		final AbstractInstance instance = this.instanceManager.getInstanceByName(instanceName.toString());
+		final Instance instance = this.instanceManager.getInstanceByName(instanceName.toString());
 		if (instance == null) {
 			LOG.error("Cannot find instance with name " + instanceName + " to kill it");
 			return;
@@ -947,16 +956,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 	}
 
 
-	public Map<InstanceType, InstanceTypeDescription> getMapOfAvailableInstanceTypes() {
-
-		// Delegate call to the instance manager
-		if (this.instanceManager != null) {
-			return this.instanceManager.getMapOfAvailableInstanceTypes();
-		}
-
-		return null;
-	}
-
 
 	@Override
 	public void jobStatusHasChanged(final ExecutionGraph executionGraph, final InternalJobStatus newJobStatus,
@@ -987,7 +986,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 			return;
 		}
 
-		final Set<AbstractInstance> allocatedInstance = new HashSet<AbstractInstance>();
+		final Set<Instance> allocatedInstance = new HashSet<Instance>();
 
 		final Iterator<ExecutionVertex> it = new ExecutionGraphIterator(eg, true);
 		while (it.hasNext()) {
@@ -995,7 +994,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 			final ExecutionVertex vertex = it.next();
 			final ExecutionState state = vertex.getExecutionState();
 			if (state == ExecutionState.RUNNING || state == ExecutionState.FINISHING) {
-				final AbstractInstance instance = vertex.getAllocatedResource().getInstance();
+				final Instance instance = vertex.getAllocatedResource().getInstance();
 
 				if (instance instanceof DummyInstance) {
 					LOG.error("Found instance of type DummyInstance for vertex " + vertex.getName() + " (state "
@@ -1013,7 +1012,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 			@Override
 			public void run() {
 
-				final Iterator<AbstractInstance> it2 = allocatedInstance.iterator();
+				final Iterator<Instance> it2 = allocatedInstance.iterator();
 
 				try {
 					while (it2.hasNext()) {
@@ -1030,9 +1029,14 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 		this.executorService.execute(requestRunnable);
 	}
 
+	@Override
+	public int getAvailableSlots() {
+		return getInstanceManager().getNumberOfSlots();
+	}
+
 
 	@Override
-	public void deploy(final JobID jobID, final AbstractInstance instance,
+	public void deploy(final JobID jobID, final Instance instance,
 			final List<ExecutionVertex> verticesToBeDeployed) {
 
 		if (verticesToBeDeployed.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java
index 5b0b30d..45506aa 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java
@@ -20,12 +20,11 @@ import java.lang.reflect.InvocationTargetException;
 import java.util.Properties;
 
 import eu.stratosphere.nephele.ExecutionMode;
-
+import eu.stratosphere.nephele.instance.InstanceManager;
+import eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler;
 import eu.stratosphere.util.StringUtils;
 
 /**
@@ -47,7 +46,7 @@ public class JobManagerUtils {
 
 	/**
 	 * Tries to locate a class with given name and to
-	 * instantiate a {@link AbstractScheduler} object from it.
+	 * instantiate a {@link eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler} object from it.
 	 * 
 	 * @param schedulerClassName
 	 *        the name of the class to instantiate the scheduler object from
@@ -55,21 +54,21 @@ public class JobManagerUtils {
 	 *        the deployment manager which shall be passed on to the scheduler
 	 * @param instanceManager
 	 *        the instance manager which shall be passed on to the scheduler
-	 * @return the {@link AbstractScheduler} object instantiated from the class with the provided name
+	 * @return the {@link eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler} object instantiated from the class with the provided name
 	 */
 	@SuppressWarnings("unchecked")
-	static AbstractScheduler loadScheduler(final String schedulerClassName, final DeploymentManager deploymentManager,
+	static DefaultScheduler loadScheduler(final String schedulerClassName, final DeploymentManager deploymentManager,
 			final InstanceManager instanceManager) {
 
-		Class<? extends AbstractScheduler> schedulerClass;
+		Class<? extends DefaultScheduler> schedulerClass;
 		try {
-			schedulerClass = (Class<? extends AbstractScheduler>) Class.forName(schedulerClassName);
+			schedulerClass = (Class<? extends DefaultScheduler>) Class.forName(schedulerClassName);
 		} catch (ClassNotFoundException e) {
 			LOG.error("Cannot find class " + schedulerClassName + ": " + StringUtils.stringifyException(e));
 			return null;
 		}
 
-		Constructor<? extends AbstractScheduler> constructor;
+		Constructor<? extends DefaultScheduler> constructor;
 
 		try {
 
@@ -83,7 +82,7 @@ public class JobManagerUtils {
 			return null;
 		}
 
-		AbstractScheduler scheduler;
+		DefaultScheduler scheduler;
 
 		try {
 			scheduler = constructor.newInstance(deploymentManager, instanceManager);
@@ -110,7 +109,7 @@ public class JobManagerUtils {
 	 * 
 	 * @param instanceManagerClassName
 	 *        the name of the class to instantiate the instance manager object from
-	 * @return the {@link InstanceManager} object instantiated from the class with the provided name
+	 * @return the {@link eu.stratosphere.nephele.instance.InstanceManager} object instantiated from the class with the provided name
 	 */
 	@SuppressWarnings("unchecked")
 	static InstanceManager loadInstanceManager(final String instanceManagerClassName) {
@@ -139,53 +138,34 @@ public class JobManagerUtils {
 	}
 
 	/**
-	 * Tries to read the class name of the {@link AbstractScheduler} implementation from the global configuration which
+	 * Tries to read the class name of the {@link eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler} implementation from the global configuration which
 	 * is set to be used for the provided execution mode.
 	 * 
 	 * @param executionMode The Nephele execution mode.
-	 * @return the class name of the {@link AbstractScheduler} implementation to be used or <code>null</code> if no
+	 * @return the class name of the {@link eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler} implementation to be used or <code>null</code> if no
 	 *         implementation is configured for the given execution mode
 	 */
 	static String getSchedulerClassName(ExecutionMode executionMode) {
-		switch (executionMode) {
-		case LOCAL:
-			return "eu.stratosphere.nephele.jobmanager.scheduler.local.LocalScheduler";
-		case CLUSTER:
-			return "eu.stratosphere.nephele.jobmanager.scheduler.queue.QueueScheduler";
-		default:
-			throw new RuntimeException("Unrecognized Execution Mode.");
-		}
-//		String modeClass = getClassStringForMode(executionMode);
-//		String instanceManagerClassNameKey = "jobmanager.scheduler." + modeClass + ".classname";
-//		String schedulerClassName = GlobalConfiguration.getString(instanceManagerClassNameKey, null);
-//
-//		if (executionMode == ExecutionMode.LOCAL && schedulerClassName == null) {
-//			schedulerClassName = ConfigConstants.DEFAULT_LOCAL_MODE_SCHEDULER;
-//		}
-//		return schedulerClassName;
+		return "eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler";
 	}
 
 	/**
-	 * Tries to read the class name of the {@link InstanceManager} implementation from the global configuration which is
+	 * Tries to read the class name of the {@link eu.stratosphere.nephele.instance.InstanceManager} implementation from the global configuration which is
 	 * set to be used for the provided execution mode.
 	 * 
 	 * @param executionMode The Nephele execution mode.
-	 * @return the class name of the {@link InstanceManager} implementation to be used or <code>null</code> if no
+	 * @return the class name of the {@link eu.stratosphere.nephele.instance.InstanceManager} implementation to be used or <code>null</code> if no
 	 *         implementation is configured for the given execution mode
 	 */
 	static String getInstanceManagerClassName(ExecutionMode executionMode) {
 		switch (executionMode) {
 		case LOCAL:
-			return "eu.stratosphere.nephele.instance.local.LocalInstanceManager";
+			return "eu.stratosphere.nephele.instance.LocalInstanceManager";
 		case CLUSTER:
-			return "eu.stratosphere.nephele.instance.cluster.ClusterManager";
+			return "eu.stratosphere.nephele.instance.DefaultInstanceManager";
 		default:
 			throw new RuntimeException("Unrecognized Execution Mode.");
 		}
-//		
-//		final String modeClass = getClassStringForMode(executionMode);
-//		final String instanceManagerClassNameKey = "jobmanager.instancemanager." + modeClass + ".classname";
-//		return GlobalConfiguration.getString(instanceManagerClassNameKey, null);
 	}
 	
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractExecutionListener.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractExecutionListener.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractExecutionListener.java
deleted file mode 100644
index 5b528c7..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractExecutionListener.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobmanager.scheduler;
-
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-import eu.stratosphere.nephele.execution.ExecutionListener;
-import eu.stratosphere.nephele.execution.ExecutionState;
-import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
-import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
-import eu.stratosphere.nephele.executiongraph.ExecutionPipeline;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
-import eu.stratosphere.nephele.executiongraph.InternalJobStatus;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-public abstract class AbstractExecutionListener implements ExecutionListener {
-
-	/**
-	 * The instance of the {@link LocalScheduler}.
-	 */
-	private final AbstractScheduler scheduler;
-
-	/**
-	 * The {@link ExecutionVertex} this wrapper object belongs to.
-	 */
-	private final ExecutionVertex executionVertex;
-
-	/**
-	 * Constructs a new wrapper object for the given {@link ExecutionVertex}.
-	 * 
-	 * @param AbstractScheduler
-	 *        the instance of the {@link AbstractScheduler}
-	 * @param executionVertex
-	 *        the {@link ExecutionVertex} the received notification refer to
-	 */
-	public AbstractExecutionListener(final AbstractScheduler scheduler, final ExecutionVertex executionVertex) {
-		this.scheduler = scheduler;
-		this.executionVertex = executionVertex;
-	}
-
-
-	@Override
-	public void executionStateChanged(final JobID jobID, final ExecutionVertexID vertexID,
-			final ExecutionState newExecutionState, final String optionalMessage) {
-
-		final ExecutionGraph eg = this.executionVertex.getExecutionGraph();
-
-		// Check if we can deploy a new pipeline.
-		if (newExecutionState == ExecutionState.FINISHING) {
-
-			final ExecutionPipeline pipeline = this.executionVertex.getExecutionPipeline();
-			if (!pipeline.isFinishing()) {
-				// Some tasks of the pipeline are still running
-				return;
-			}
-
-			// Find another vertex in the group which is still in SCHEDULED state and get its pipeline.
-			final ExecutionGroupVertex groupVertex = this.executionVertex.getGroupVertex();
-			for (int i = 0; i < groupVertex.getCurrentNumberOfGroupMembers(); ++i) {
-				final ExecutionVertex groupMember = groupVertex.getGroupMember(i);
-				if (groupMember.compareAndUpdateExecutionState(ExecutionState.SCHEDULED, ExecutionState.ASSIGNED)) {
-
-					final ExecutionPipeline pipelineToBeDeployed = groupMember.getExecutionPipeline();
-					pipelineToBeDeployed.setAllocatedResource(this.executionVertex.getAllocatedResource());
-					pipelineToBeDeployed.updateExecutionState(ExecutionState.ASSIGNED);
-
-					this.scheduler.deployAssignedPipeline(pipelineToBeDeployed);
-					return;
-				}
-			}
-		}
-
-		if (newExecutionState == ExecutionState.CANCELED || newExecutionState == ExecutionState.FINISHED) {
-
-			synchronized (eg) {
-
-				if (this.scheduler.getVerticesToBeRestarted().remove(this.executionVertex.getID()) != null) {
-
-					if (eg.getJobStatus() == InternalJobStatus.FAILING) {
-						return;
-					}
-
-					this.executionVertex.updateExecutionState(ExecutionState.ASSIGNED, "Restart as part of recovery");
-
-					// Run through the deployment procedure
-					this.scheduler.deployAssignedVertices(this.executionVertex);
-					return;
-				}
-			}
-		}
-
-		if (newExecutionState == ExecutionState.FINISHED || newExecutionState == ExecutionState.CANCELED
-			|| newExecutionState == ExecutionState.FAILED) {
-			// Check if instance can be released
-			this.scheduler.checkAndReleaseAllocatedResource(eg, this.executionVertex.getAllocatedResource());
-		}
-
-		// In case of an error, check if the vertex shall be recovered
-		if (newExecutionState == ExecutionState.FAILED) {
-			if (this.executionVertex.decrementRetriesLeftAndCheck()) {
-
-				final Set<ExecutionVertex> assignedVertices = new HashSet<ExecutionVertex>();
-
-				if (RecoveryLogic.recover(this.executionVertex, this.scheduler.getVerticesToBeRestarted(),
-					assignedVertices)) {
-
-					if (RecoveryLogic.hasInstanceAssigned(this.executionVertex)) {
-						// Run through the deployment procedure
-						this.scheduler.deployAssignedVertices(assignedVertices);
-					}
-
-				} else {
-
-					// Make sure the map with the vertices to be restarted is cleaned up properly
-					synchronized (eg) {
-
-						final Iterator<ExecutionVertex> it = this.scheduler.getVerticesToBeRestarted().values()
-							.iterator();
-
-						while (it.hasNext()) {
-							if (eg.equals(it.next().getExecutionGraph())) {
-								it.remove();
-							}
-						}
-					}
-
-					// Actual cancellation of job is performed by job manager
-				}
-			}
-		}
-
-	}
-
-
-	@Override
-	public void userThreadFinished(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) {
-		// Nothing to do here
-	}
-
-
-	@Override
-	public void userThreadStarted(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) {
-		// Nothing to do here
-	}
-
-
-	@Override
-	public int getPriority() {
-
-		return 0;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractScheduler.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractScheduler.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractScheduler.java
deleted file mode 100644
index 24e2970..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractScheduler.java
+++ /dev/null
@@ -1,662 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobmanager.scheduler;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.nephele.execution.ExecutionState;
-import eu.stratosphere.nephele.executiongraph.ExecutionEdge;
-import eu.stratosphere.nephele.executiongraph.ExecutionGate;
-import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
-import eu.stratosphere.nephele.executiongraph.ExecutionGraphIterator;
-import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
-import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertexIterator;
-import eu.stratosphere.nephele.executiongraph.ExecutionPipeline;
-import eu.stratosphere.nephele.executiongraph.ExecutionStage;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
-import eu.stratosphere.nephele.executiongraph.InternalJobStatus;
-import eu.stratosphere.nephele.instance.AbstractInstance;
-import eu.stratosphere.nephele.instance.AllocatedResource;
-import eu.stratosphere.nephele.instance.AllocationID;
-import eu.stratosphere.nephele.instance.DummyInstance;
-import eu.stratosphere.nephele.instance.InstanceException;
-import eu.stratosphere.nephele.instance.InstanceListener;
-import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.nephele.instance.InstanceRequestMap;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.jobmanager.DeploymentManager;
-import eu.stratosphere.util.StringUtils;
-
-/**
- * This abstract scheduler must be extended by a scheduler implementations for Nephele. The abstract class defines the
- * fundamental methods for scheduling and removing jobs. While Nephele's
- * {@link eu.stratosphere.nephele.jobmanager.JobManager} is responsible for requesting the required instances for the
- * job at the {@link eu.stratosphere.nephele.instance.InstanceManager}, the scheduler is in charge of assigning the
- * individual tasks to the instances.
- * 
- */
-public abstract class AbstractScheduler implements InstanceListener {
-
-	/**
-	 * The LOG object to report events within the scheduler.
-	 */
-	protected static final Log LOG = LogFactory.getLog(AbstractScheduler.class);
-
-	/**
-	 * The instance manager assigned to this scheduler.
-	 */
-	private final InstanceManager instanceManager;
-
-	/**
-	 * The deployment manager assigned to this scheduler.
-	 */
-	private final DeploymentManager deploymentManager;
-
-	/**
-	 * Stores the vertices to be restarted once they have switched to the <code>CANCELED</code> state.
-	 */
-	private final Map<ExecutionVertexID, ExecutionVertex> verticesToBeRestarted = new ConcurrentHashMap<ExecutionVertexID, ExecutionVertex>();
-
-	/**
-	 * Constructs a new abstract scheduler.
-	 * 
-	 * @param deploymentManager
-	 *        the deployment manager assigned to this scheduler
-	 * @param instanceManager
-	 *        the instance manager to be used with this scheduler
-	 */
-	protected AbstractScheduler(final DeploymentManager deploymentManager, final InstanceManager instanceManager) {
-
-		this.deploymentManager = deploymentManager;
-		this.instanceManager = instanceManager;
-		this.instanceManager.setInstanceListener(this);
-	}
-
-	/**
-	 * Adds a job represented by an {@link ExecutionGraph} object to the scheduler. The job is then executed according
-	 * to the strategies of the concrete scheduler implementation.
-	 * 
-	 * @param executionGraph
-	 *        the job to be added to the scheduler
-	 * @throws SchedulingException
-	 *         thrown if an error occurs and the scheduler does not accept the new job
-	 */
-	public abstract void schedulJob(ExecutionGraph executionGraph) throws SchedulingException;
-
-	/**
-	 * Returns the execution graph which is associated with the given job ID.
-	 * 
-	 * @param jobID
-	 *        the job ID to search the execution graph for
-	 * @return the execution graph which belongs to the given job ID or <code>null</code if no such execution graph
-	 *         exists
-	 */
-	public abstract ExecutionGraph getExecutionGraphByID(JobID jobID);
-
-	/**
-	 * Returns the {@link InstanceManager} object which is used by the current scheduler.
-	 * 
-	 * @return the {@link InstanceManager} object which is used by the current scheduler
-	 */
-	public InstanceManager getInstanceManager() {
-		return this.instanceManager;
-	}
-
-	// void removeJob(JobID jobID);
-
-	/**
-	 * Shuts the scheduler down. After shut down no jobs can be added to the scheduler.
-	 */
-	public abstract void shutdown();
-
-	/**
-	 * Collects the instances required to run the job from the given {@link ExecutionStage} and requests them at the
-	 * loaded instance manager.
-	 * 
-	 * @param executionStage
-	 *        the execution stage to collect the required instances from
-	 * @throws InstanceException
-	 *         thrown if the given execution graph is already processing its final stage
-	 */
-	protected void requestInstances(final ExecutionStage executionStage) throws InstanceException {
-
-		final ExecutionGraph executionGraph = executionStage.getExecutionGraph();
-		final InstanceRequestMap instanceRequestMap = new InstanceRequestMap();
-
-		synchronized (executionStage) {
-
-			executionStage.collectRequiredInstanceTypes(instanceRequestMap, ExecutionState.CREATED);
-
-			final Iterator<Map.Entry<InstanceType, Integer>> it = instanceRequestMap.getMinimumIterator();
-			LOG.info("Requesting the following instances for job " + executionGraph.getJobID());
-			while (it.hasNext()) {
-				final Map.Entry<InstanceType, Integer> entry = it.next();
-				LOG.info(" " + entry.getKey() + " [" + entry.getValue().intValue() + ", "
-					+ instanceRequestMap.getMaximumNumberOfInstances(entry.getKey()) + "]");
-			}
-
-			if (instanceRequestMap.isEmpty()) {
-				return;
-			}
-
-			this.instanceManager.requestInstance(executionGraph.getJobID(), executionGraph.getJobConfiguration(),
-				instanceRequestMap, null);
-
-			// Switch vertex state to assigning
-			final ExecutionGraphIterator it2 = new ExecutionGraphIterator(executionGraph, executionGraph
-				.getIndexOfCurrentExecutionStage(), true, true);
-			while (it2.hasNext()) {
-
-				it2.next().compareAndUpdateExecutionState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
-			}
-		}
-	}
-
-	void findVerticesToBeDeployed(final ExecutionVertex vertex,
-			final Map<AbstractInstance, List<ExecutionVertex>> verticesToBeDeployed,
-			final Set<ExecutionVertex> alreadyVisited) {
-
-		if (!alreadyVisited.add(vertex)) {
-			return;
-		}
-
-		if (vertex.compareAndUpdateExecutionState(ExecutionState.ASSIGNED, ExecutionState.READY)) {
-			final AbstractInstance instance = vertex.getAllocatedResource().getInstance();
-
-			if (instance instanceof DummyInstance) {
-				LOG.error("Inconsistency: Vertex " + vertex + " is about to be deployed on a DummyInstance");
-			}
-
-			List<ExecutionVertex> verticesForInstance = verticesToBeDeployed.get(instance);
-			if (verticesForInstance == null) {
-				verticesForInstance = new ArrayList<ExecutionVertex>();
-				verticesToBeDeployed.put(instance, verticesForInstance);
-			}
-
-			verticesForInstance.add(vertex);
-		}
-
-		final int numberOfOutputGates = vertex.getNumberOfOutputGates();
-		for (int i = 0; i < numberOfOutputGates; ++i) {
-
-			final ExecutionGate outputGate = vertex.getOutputGate(i);
-			boolean deployTarget;
-
-			switch (outputGate.getChannelType()) {
-			case NETWORK:
-				deployTarget = false;
-				break;
-			case IN_MEMORY:
-				deployTarget = true;
-				break;
-			default:
-				throw new IllegalStateException("Unknown channel type");
-			}
-
-			if (deployTarget) {
-
-				final int numberOfOutputChannels = outputGate.getNumberOfEdges();
-				for (int j = 0; j < numberOfOutputChannels; ++j) {
-					final ExecutionEdge outputChannel = outputGate.getEdge(j);
-					final ExecutionVertex connectedVertex = outputChannel.getInputGate().getVertex();
-					findVerticesToBeDeployed(connectedVertex, verticesToBeDeployed, alreadyVisited);
-				}
-			}
-		}
-	}
-
-	/**
-	 * Collects all execution vertices with the state ASSIGNED starting from the given start vertex and
-	 * deploys them on the assigned {@link AllocatedResource} objects.
-	 * 
-	 * @param startVertex
-	 *        the execution vertex to start the deployment from
-	 */
-	public void deployAssignedVertices(final ExecutionVertex startVertex) {
-
-		final JobID jobID = startVertex.getExecutionGraph().getJobID();
-
-		final Map<AbstractInstance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<AbstractInstance, List<ExecutionVertex>>();
-		final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
-
-		findVerticesToBeDeployed(startVertex, verticesToBeDeployed, alreadyVisited);
-
-		if (!verticesToBeDeployed.isEmpty()) {
-
-			final Iterator<Map.Entry<AbstractInstance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
-				.entrySet()
-				.iterator();
-
-			while (it2.hasNext()) {
-
-				final Map.Entry<AbstractInstance, List<ExecutionVertex>> entry = it2.next();
-				this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
-			}
-		}
-	}
-
-	/**
-	 * Collects all execution vertices with the state ASSIGNED from the given pipeline and deploys them on the assigned
-	 * {@link AllocatedResource} objects.
-	 * 
-	 * @param pipeline
-	 *        the execution pipeline to be deployed
-	 */
-	public void deployAssignedPipeline(final ExecutionPipeline pipeline) {
-
-		final JobID jobID = null;
-
-		final Map<AbstractInstance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<AbstractInstance, List<ExecutionVertex>>();
-		final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
-
-		final Iterator<ExecutionVertex> it = pipeline.iterator();
-		while (it.hasNext()) {
-			findVerticesToBeDeployed(it.next(), verticesToBeDeployed, alreadyVisited);
-		}
-
-		if (!verticesToBeDeployed.isEmpty()) {
-
-			final Iterator<Map.Entry<AbstractInstance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
-				.entrySet()
-				.iterator();
-
-			while (it2.hasNext()) {
-
-				final Map.Entry<AbstractInstance, List<ExecutionVertex>> entry = it2.next();
-				this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
-			}
-		}
-	}
-
-	/**
-	 * Collects all execution vertices with the state ASSIGNED starting from the given collection of start vertices and
-	 * deploys them on the assigned {@link AllocatedResource} objects.
-	 * 
-	 * @param startVertices
-	 *        the collection of execution vertices to start the deployment from
-	 */
-	public void deployAssignedVertices(final Collection<ExecutionVertex> startVertices) {
-
-		JobID jobID = null;
-
-		final Map<AbstractInstance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<AbstractInstance, List<ExecutionVertex>>();
-		final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
-
-		for (final ExecutionVertex startVertex : startVertices) {
-
-			if (jobID == null) {
-				jobID = startVertex.getExecutionGraph().getJobID();
-			}
-
-			findVerticesToBeDeployed(startVertex, verticesToBeDeployed, alreadyVisited);
-		}
-
-		if (!verticesToBeDeployed.isEmpty()) {
-
-			final Iterator<Map.Entry<AbstractInstance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
-				.entrySet()
-				.iterator();
-
-			while (it2.hasNext()) {
-
-				final Map.Entry<AbstractInstance, List<ExecutionVertex>> entry = it2.next();
-				this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
-			}
-		}
-	}
-
-	/**
-	 * Collects all execution vertices with the state ASSIGNED starting from the input vertices of the current execution
-	 * stage and deploys them on the assigned {@link AllocatedResource} objects.
-	 * 
-	 * @param executionGraph
-	 *        the execution graph to collect the vertices from
-	 */
-	public void deployAssignedInputVertices(final ExecutionGraph executionGraph) {
-
-		final Map<AbstractInstance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<AbstractInstance, List<ExecutionVertex>>();
-		final ExecutionStage executionStage = executionGraph.getCurrentExecutionStage();
-
-		final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
-
-		for (int i = 0; i < executionStage.getNumberOfStageMembers(); ++i) {
-
-			final ExecutionGroupVertex startVertex = executionStage.getStageMember(i);
-			if (!startVertex.isInputVertex()) {
-				continue;
-			}
-
-			for (int j = 0; j < startVertex.getCurrentNumberOfGroupMembers(); ++j) {
-				final ExecutionVertex vertex = startVertex.getGroupMember(j);
-				findVerticesToBeDeployed(vertex, verticesToBeDeployed, alreadyVisited);
-			}
-		}
-
-		if (!verticesToBeDeployed.isEmpty()) {
-
-			final Iterator<Map.Entry<AbstractInstance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
-				.entrySet()
-				.iterator();
-
-			while (it2.hasNext()) {
-
-				final Map.Entry<AbstractInstance, List<ExecutionVertex>> entry = it2.next();
-				this.deploymentManager.deploy(executionGraph.getJobID(), entry.getKey(), entry.getValue());
-			}
-		}
-	}
-
-
-	@Override
-	public void resourcesAllocated(final JobID jobID, final List<AllocatedResource> allocatedResources) {
-
-		if (allocatedResources == null) {
-			LOG.error("Resource to lock is null!");
-			return;
-		}
-
-		for (final AllocatedResource allocatedResource : allocatedResources) {
-			if (allocatedResource.getInstance() instanceof DummyInstance) {
-				LOG.debug("Available instance is of type DummyInstance!");
-				return;
-			}
-		}
-
-		final ExecutionGraph eg = getExecutionGraphByID(jobID);
-
-		if (eg == null) {
-			/*
-			 * The job have have been canceled in the meantime, in this case
-			 * we release the instance immediately.
-			 */
-			try {
-				for (final AllocatedResource allocatedResource : allocatedResources) {
-					getInstanceManager().releaseAllocatedResource(jobID, null, allocatedResource);
-				}
-			} catch (InstanceException e) {
-				LOG.error(e);
-			}
-			return;
-		}
-
-		final Runnable command = new Runnable() {
-
-			/**
-			 * {@inheritDoc}
-			 */
-			@Override
-			public void run() {
-
-				final ExecutionStage stage = eg.getCurrentExecutionStage();
-
-				synchronized (stage) {
-
-					for (final AllocatedResource allocatedResource : allocatedResources) {
-
-						AllocatedResource resourceToBeReplaced = null;
-						// Important: only look for instances to be replaced in the current stage
-						final Iterator<ExecutionGroupVertex> groupIterator = new ExecutionGroupVertexIterator(eg, true,
-							stage.getStageNumber());
-						while (groupIterator.hasNext()) {
-
-							final ExecutionGroupVertex groupVertex = groupIterator.next();
-							for (int i = 0; i < groupVertex.getCurrentNumberOfGroupMembers(); ++i) {
-
-								final ExecutionVertex vertex = groupVertex.getGroupMember(i);
-
-								if (vertex.getExecutionState() == ExecutionState.SCHEDULED
-									&& vertex.getAllocatedResource() != null) {
-									// In local mode, we do not consider any topology, only the instance type
-									if (vertex.getAllocatedResource().getInstanceType().equals(
-										allocatedResource.getInstanceType())) {
-										resourceToBeReplaced = vertex.getAllocatedResource();
-										break;
-									}
-								}
-							}
-
-							if (resourceToBeReplaced != null) {
-								break;
-							}
-						}
-
-						// For some reason, we don't need this instance
-						if (resourceToBeReplaced == null) {
-							LOG.error("Instance " + allocatedResource.getInstance() + " is not required for job"
-								+ eg.getJobID());
-							try {
-								getInstanceManager().releaseAllocatedResource(jobID, eg.getJobConfiguration(),
-									allocatedResource);
-							} catch (InstanceException e) {
-								LOG.error(e);
-							}
-							return;
-						}
-
-						// Replace the selected instance
-						final Iterator<ExecutionVertex> it = resourceToBeReplaced.assignedVertices();
-						while (it.hasNext()) {
-							final ExecutionVertex vertex = it.next();
-							vertex.setAllocatedResource(allocatedResource);
-							vertex.updateExecutionState(ExecutionState.ASSIGNED);
-						}
-					}
-				}
-
-				// Deploy the assigned vertices
-				deployAssignedInputVertices(eg);
-
-			}
-
-		};
-
-		eg.executeCommand(command);
-	}
-
-	/**
-	 * Checks if the given {@link AllocatedResource} is still required for the
-	 * execution of the given execution graph. If the resource is no longer
-	 * assigned to a vertex that is either currently running or about to run
-	 * the given resource is returned to the instance manager for deallocation.
-	 * 
-	 * @param executionGraph
-	 *        the execution graph the provided resource has been used for so far
-	 * @param allocatedResource
-	 *        the allocated resource to check the assignment for
-	 */
-	public void checkAndReleaseAllocatedResource(final ExecutionGraph executionGraph,
-			final AllocatedResource allocatedResource) {
-
-		if (allocatedResource == null) {
-			LOG.error("Resource to lock is null!");
-			return;
-		}
-
-		if (allocatedResource.getInstance() instanceof DummyInstance) {
-			LOG.debug("Available instance is of type DummyInstance!");
-			return;
-		}
-
-		boolean resourceCanBeReleased = true;
-		final Iterator<ExecutionVertex> it = allocatedResource.assignedVertices();
-		while (it.hasNext()) {
-			final ExecutionVertex vertex = it.next();
-			final ExecutionState state = vertex.getExecutionState();
-
-			if (state != ExecutionState.CREATED && state != ExecutionState.FINISHED
-				&& state != ExecutionState.FAILED && state != ExecutionState.CANCELED) {
-
-				resourceCanBeReleased = false;
-				break;
-			}
-		}
-
-		if (resourceCanBeReleased) {
-
-			LOG.info("Releasing instance " + allocatedResource.getInstance());
-			try {
-				getInstanceManager().releaseAllocatedResource(executionGraph.getJobID(), executionGraph
-					.getJobConfiguration(), allocatedResource);
-			} catch (InstanceException e) {
-				LOG.error(StringUtils.stringifyException(e));
-			}
-		}
-	}
-
-	DeploymentManager getDeploymentManager() {
-		return this.deploymentManager;
-	}
-
-	protected void replayCheckpointsFromPreviousStage(final ExecutionGraph executionGraph) {
-
-		final int currentStageIndex = executionGraph.getIndexOfCurrentExecutionStage();
-		final ExecutionStage previousStage = executionGraph.getStage(currentStageIndex - 1);
-
-		final List<ExecutionVertex> verticesToBeReplayed = new ArrayList<ExecutionVertex>();
-
-		for (int i = 0; i < previousStage.getNumberOfOutputExecutionVertices(); ++i) {
-
-			final ExecutionVertex vertex = previousStage.getOutputExecutionVertex(i);
-			vertex.updateExecutionState(ExecutionState.ASSIGNED);
-			verticesToBeReplayed.add(vertex);
-		}
-
-		deployAssignedVertices(verticesToBeReplayed);
-	}
-
-	/**
-	 * Returns a map of vertices to be restarted once they have switched to their <code>CANCELED</code> state.
-	 * 
-	 * @return the map of vertices to be restarted
-	 */
-	Map<ExecutionVertexID, ExecutionVertex> getVerticesToBeRestarted() {
-
-		return this.verticesToBeRestarted;
-	}
-
-
-	@Override
-	public void allocatedResourcesDied(final JobID jobID, final List<AllocatedResource> allocatedResources) {
-
-		final ExecutionGraph eg = getExecutionGraphByID(jobID);
-
-		if (eg == null) {
-			LOG.error("Cannot find execution graph for job with ID " + jobID);
-			return;
-		}
-
-		final Runnable command = new Runnable() {
-
-			/**
-			 * {@inheritDoc}
-			 */
-			@Override
-			public void run() {
-
-				synchronized (eg) {
-
-					for (final AllocatedResource allocatedResource : allocatedResources) {
-
-						LOG.info("Resource " + allocatedResource.getInstance().getName() + " for Job " + jobID
-							+ " died.");
-
-						final ExecutionGraph executionGraph = getExecutionGraphByID(jobID);
-
-						if (executionGraph == null) {
-							LOG.error("Cannot find execution graph for job " + jobID);
-							return;
-						}
-
-						Iterator<ExecutionVertex> vertexIter = allocatedResource.assignedVertices();
-
-						// Assign vertices back to a dummy resource.
-						final DummyInstance dummyInstance = DummyInstance.createDummyInstance(allocatedResource
-							.getInstance()
-							.getType());
-						final AllocatedResource dummyResource = new AllocatedResource(dummyInstance,
-							allocatedResource.getInstanceType(), new AllocationID());
-
-						while (vertexIter.hasNext()) {
-							final ExecutionVertex vertex = vertexIter.next();
-							vertex.setAllocatedResource(dummyResource);
-						}
-
-						final String failureMessage = allocatedResource.getInstance().getName() + " died";
-
-						vertexIter = allocatedResource.assignedVertices();
-
-						while (vertexIter.hasNext()) {
-							final ExecutionVertex vertex = vertexIter.next();
-							final ExecutionState state = vertex.getExecutionState();
-
-							switch (state) {
-							case ASSIGNED:
-							case READY:
-							case STARTING:
-							case RUNNING:
-							case FINISHING:
-
-							vertex.updateExecutionState(ExecutionState.FAILED, failureMessage);
-
-							break;
-						default:
-							}
-					}
-
-					// TODO: Fix this
-					/*
-					 * try {
-					 * requestInstances(this.executionVertex.getGroupVertex().getExecutionStage());
-					 * } catch (InstanceException e) {
-					 * e.printStackTrace();
-					 * // TODO: Cancel the entire job in this case
-					 * }
-					 */
-				}
-			}
-
-			final InternalJobStatus js = eg.getJobStatus();
-			if (js != InternalJobStatus.FAILING && js != InternalJobStatus.FAILED) {
-
-				// TODO: Fix this
-				// deployAssignedVertices(eg);
-
-				final ExecutionStage stage = eg.getCurrentExecutionStage();
-
-				try {
-					requestInstances(stage);
-				} catch (InstanceException e) {
-					e.printStackTrace();
-					// TODO: Cancel the entire job in this case
-				}
-			}
-		}
-		};
-
-		eg.executeCommand(command);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/DefaultExecutionListener.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/DefaultExecutionListener.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/DefaultExecutionListener.java
new file mode 100644
index 0000000..86b3c40
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/DefaultExecutionListener.java
@@ -0,0 +1,127 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.jobmanager.scheduler;
+
+import eu.stratosphere.nephele.execution.ExecutionListener;
+import eu.stratosphere.nephele.execution.ExecutionState;
+import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
+import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
+import eu.stratosphere.nephele.executiongraph.ExecutionPipeline;
+import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
+import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
+import eu.stratosphere.nephele.executiongraph.InternalJobStatus;
+import eu.stratosphere.nephele.jobgraph.JobID;
+
+public class DefaultExecutionListener implements ExecutionListener {
+
+	/**
+	 * The instance of the {@link eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler}.
+	 */
+	private final DefaultScheduler scheduler;
+
+	/**
+	 * The {@link ExecutionVertex} this wrapper object belongs to.
+	 */
+	private final ExecutionVertex executionVertex;
+
+	/**
+	 * Constructs a new wrapper object for the given {@link ExecutionVertex}.
+	 * 
+	 * @param scheduler
+	 *        the instance of the {@link DefaultScheduler}
+	 * @param executionVertex
+	 *        the {@link ExecutionVertex} the received notification refer to
+	 */
+	public DefaultExecutionListener(final DefaultScheduler scheduler, final ExecutionVertex executionVertex) {
+		this.scheduler = scheduler;
+		this.executionVertex = executionVertex;
+	}
+
+
+	@Override
+	public void executionStateChanged(final JobID jobID, final ExecutionVertexID vertexID,
+			final ExecutionState newExecutionState, final String optionalMessage) {
+
+		final ExecutionGraph eg = this.executionVertex.getExecutionGraph();
+
+		// Check if we can deploy a new pipeline.
+		if (newExecutionState == ExecutionState.FINISHING) {
+
+			final ExecutionPipeline pipeline = this.executionVertex.getExecutionPipeline();
+			if (!pipeline.isFinishing()) {
+				// Some tasks of the pipeline are still running
+				return;
+			}
+
+			// Find another vertex in the group which is still in SCHEDULED state and get its pipeline.
+			final ExecutionGroupVertex groupVertex = this.executionVertex.getGroupVertex();
+			for (int i = 0; i < groupVertex.getCurrentNumberOfGroupMembers(); ++i) {
+				final ExecutionVertex groupMember = groupVertex.getGroupMember(i);
+				if (groupMember.compareAndUpdateExecutionState(ExecutionState.SCHEDULED, ExecutionState.ASSIGNED)) {
+
+					final ExecutionPipeline pipelineToBeDeployed = groupMember.getExecutionPipeline();
+					pipelineToBeDeployed.setAllocatedResource(this.executionVertex.getAllocatedResource());
+					pipelineToBeDeployed.updateExecutionState(ExecutionState.ASSIGNED);
+
+					this.scheduler.deployAssignedPipeline(pipelineToBeDeployed);
+					return;
+				}
+			}
+		}
+
+		if (newExecutionState == ExecutionState.CANCELED || newExecutionState == ExecutionState.FINISHED) {
+
+			synchronized (eg) {
+
+				if (this.scheduler.getVerticesToBeRestarted().remove(this.executionVertex.getID()) != null) {
+
+					if (eg.getJobStatus() == InternalJobStatus.FAILING) {
+						return;
+					}
+
+					this.executionVertex.updateExecutionState(ExecutionState.ASSIGNED, "Restart as part of recovery");
+
+					// Run through the deployment procedure
+					this.scheduler.deployAssignedVertices(this.executionVertex);
+					return;
+				}
+			}
+		}
+
+		if (newExecutionState == ExecutionState.FINISHED || newExecutionState == ExecutionState.CANCELED
+			|| newExecutionState == ExecutionState.FAILED) {
+			// Check if instance can be released
+			this.scheduler.checkAndReleaseAllocatedResource(eg, this.executionVertex.getAllocatedResource());
+		}
+	}
+
+
+	@Override
+	public void userThreadFinished(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) {
+		// Nothing to do here
+	}
+
+
+	@Override
+	public void userThreadStarted(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) {
+		// Nothing to do here
+	}
+
+
+	@Override
+	public int getPriority() {
+
+		return 0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/DefaultScheduler.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/DefaultScheduler.java
new file mode 100644
index 0000000..745b199
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/DefaultScheduler.java
@@ -0,0 +1,762 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.jobmanager.scheduler;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Deque;
+import java.util.ArrayDeque;
+
+import eu.stratosphere.nephele.executiongraph.ExecutionEdge;
+import eu.stratosphere.nephele.executiongraph.ExecutionGate;
+import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
+import eu.stratosphere.nephele.executiongraph.ExecutionGraphIterator;
+import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
+import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertexIterator;
+import eu.stratosphere.nephele.executiongraph.ExecutionPipeline;
+import eu.stratosphere.nephele.executiongraph.ExecutionStage;
+import eu.stratosphere.nephele.executiongraph.ExecutionStageListener;
+import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
+import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
+import eu.stratosphere.nephele.executiongraph.InternalJobStatus;
+import eu.stratosphere.nephele.executiongraph.JobStatusListener;
+import eu.stratosphere.nephele.instance.AllocatedResource;
+import eu.stratosphere.nephele.instance.AllocationID;
+import eu.stratosphere.nephele.instance.DummyInstance;
+import eu.stratosphere.nephele.instance.InstanceException;
+import eu.stratosphere.nephele.instance.InstanceListener;
+import eu.stratosphere.nephele.instance.InstanceManager;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import eu.stratosphere.nephele.execution.ExecutionState;
+import eu.stratosphere.nephele.instance.Instance;
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.nephele.jobmanager.DeploymentManager;
+import eu.stratosphere.util.StringUtils;
+
+/**
+ * The default scheduler for Nephele. While Nephele's
+ * {@link eu.stratosphere.nephele.jobmanager.JobManager} is responsible for requesting the required instances for the
+ * job at the {@link eu.stratosphere.nephele.instance.InstanceManager}, the scheduler is in charge of assigning the
+ * individual tasks to the instances.
+ * 
+ */
+public class DefaultScheduler implements InstanceListener, JobStatusListener, ExecutionStageListener {
+
+	/**
+	 * The LOG object to report events within the scheduler.
+	 */
+	protected static final Log LOG = LogFactory.getLog(DefaultScheduler.class);
+
+	/**
+	 * The instance manager assigned to this scheduler.
+	 */
+	private final InstanceManager instanceManager;
+
+	/**
+	 * The deployment manager assigned to this scheduler.
+	 */
+	private final DeploymentManager deploymentManager;
+
+	/**
+	 * Stores the vertices to be restarted once they have switched to the <code>CANCELED</code> state.
+	 */
+	private final Map<ExecutionVertexID, ExecutionVertex> verticesToBeRestarted = new ConcurrentHashMap<ExecutionVertexID, ExecutionVertex>();
+
+	/**
+	 * The job queue where all submitted jobs go to.
+	 */
+	private Deque<ExecutionGraph> jobQueue = new ArrayDeque<ExecutionGraph>();
+
+	/**
+	 * Constructs a new abstract scheduler.
+	 * 
+	 * @param deploymentManager
+	 *        the deployment manager assigned to this scheduler
+	 * @param instanceManager
+	 *        the instance manager to be used with this scheduler
+	 */
+	public DefaultScheduler(final DeploymentManager deploymentManager, final InstanceManager instanceManager) {
+
+		this.deploymentManager = deploymentManager;
+		this.instanceManager = instanceManager;
+		this.instanceManager.setInstanceListener(this);
+	}
+
+	/**
+	 * Removes the job represented by the given {@link ExecutionGraph} from the scheduler.
+	 *
+	 * @param executionGraphToRemove
+	 *        the job to be removed
+	 */
+	void removeJobFromSchedule(final ExecutionGraph executionGraphToRemove) {
+
+		boolean removedFromQueue = false;
+
+		synchronized (this.jobQueue) {
+
+			final Iterator<ExecutionGraph> it = this.jobQueue.iterator();
+			while (it.hasNext()) {
+
+				final ExecutionGraph executionGraph = it.next();
+				if (executionGraph.getJobID().equals(executionGraphToRemove.getJobID())) {
+					removedFromQueue = true;
+					it.remove();
+					break;
+				}
+			}
+		}
+
+		if (!removedFromQueue) {
+			LOG.error("Cannot find job " + executionGraphToRemove.getJobName() + " ("
+					+ executionGraphToRemove.getJobID() + ") to remove");
+		}
+	}
+
+	/**
+	 * Adds a job represented by an {@link ExecutionGraph} object to the scheduler. The job is then executed according
+	 * to the strategies of the concrete scheduler implementation.
+	 *
+	 * @param executionGraph
+	 *        the job to be added to the scheduler
+	 * @throws SchedulingException
+	 *         thrown if an error occurs and the scheduler does not accept the new job
+	 */
+	public void scheduleJob(final ExecutionGraph executionGraph) throws SchedulingException {
+
+		final int requiredSlots = executionGraph.getRequiredSlots();
+		final int availableSlots = this.getInstanceManager().getNumberOfSlots();
+
+		if(requiredSlots > availableSlots){
+			throw new SchedulingException("Not enough slots to schedule job " + executionGraph.getJobID());
+		}
+
+		// Subscribe to job status notifications
+		executionGraph.registerJobStatusListener(this);
+
+		// Register execution listener for each vertex
+		final ExecutionGraphIterator it2 = new ExecutionGraphIterator(executionGraph, true);
+		while (it2.hasNext()) {
+
+			final ExecutionVertex vertex = it2.next();
+			vertex.registerExecutionListener(new DefaultExecutionListener(this, vertex));
+		}
+
+		// Register the scheduler as an execution stage listener
+		executionGraph.registerExecutionStageListener(this);
+
+		// Add job to the job queue (important to add job to queue before requesting instances)
+		synchronized (this.jobQueue) {
+			this.jobQueue.add(executionGraph);
+		}
+
+		// Request resources for the first stage of the job
+
+		final ExecutionStage executionStage = executionGraph.getCurrentExecutionStage();
+		try {
+			requestInstances(executionStage);
+		} catch (InstanceException e) {
+			final String exceptionMessage = StringUtils.stringifyException(e);
+			LOG.error(exceptionMessage);
+			this.jobQueue.remove(executionGraph);
+			throw new SchedulingException(exceptionMessage);
+		}
+	}
+
+	/**
+	 * Returns the execution graph which is associated with the given job ID.
+	 *
+	 * @param jobID
+	 *        the job ID to search the execution graph for
+	 * @return the execution graph which belongs to the given job ID or <code>null</code if no such execution graph
+	 *         exists
+	 */
+	public ExecutionGraph getExecutionGraphByID(final JobID jobID) {
+
+		synchronized (this.jobQueue) {
+
+			final Iterator<ExecutionGraph> it = this.jobQueue.iterator();
+			while (it.hasNext()) {
+
+				final ExecutionGraph executionGraph = it.next();
+				if (executionGraph.getJobID().equals(jobID)) {
+					return executionGraph;
+				}
+			}
+		}
+
+		return null;
+	}
+
+	/**
+	 * Shuts the scheduler down. After shut down no jobs can be added to the scheduler.
+	 */
+	public void shutdown() {
+
+		synchronized (this.jobQueue) {
+			this.jobQueue.clear();
+		}
+
+	}
+
+	public void jobStatusHasChanged(final ExecutionGraph executionGraph, final InternalJobStatus newJobStatus,
+									final String optionalMessage) {
+
+		if (newJobStatus == InternalJobStatus.FAILED || newJobStatus == InternalJobStatus.FINISHED
+				|| newJobStatus == InternalJobStatus.CANCELED) {
+			removeJobFromSchedule(executionGraph);
+		}
+	}
+
+	public void nextExecutionStageEntered(final JobID jobID, final ExecutionStage executionStage) {
+
+		// Request new instances if necessary
+		try {
+			requestInstances(executionStage);
+		} catch (InstanceException e) {
+			// TODO: Handle error correctly
+			LOG.error(StringUtils.stringifyException(e));
+		}
+
+		// Deploy the assigned vertices
+		deployAssignedInputVertices(executionStage.getExecutionGraph());
+	}
+
+
+	/**
+	 * Returns the {@link eu.stratosphere.nephele.instance.InstanceManager} object which is used by the current scheduler.
+	 * 
+	 * @return the {@link eu.stratosphere.nephele.instance.InstanceManager} object which is used by the current scheduler
+	 */
+	public InstanceManager getInstanceManager() {
+		return this.instanceManager;
+	}
+
+
+	/**
+	 * Collects the instances required to run the job from the given {@link ExecutionStage} and requests them at the
+	 * loaded instance manager.
+	 * 
+	 * @param executionStage
+	 *        the execution stage to collect the required instances from
+	 * @throws InstanceException
+	 *         thrown if the given execution graph is already processing its final stage
+	 */
+	protected void requestInstances(final ExecutionStage executionStage) throws InstanceException {
+
+		final ExecutionGraph executionGraph = executionStage.getExecutionGraph();
+
+		synchronized (executionStage) {
+
+			final int requiredSlots = executionStage.getRequiredSlots();
+
+			LOG.info("Requesting " + requiredSlots + " for job " + executionGraph.getJobID());
+
+			this.instanceManager.requestInstance(executionGraph.getJobID(), executionGraph.getJobConfiguration(),
+				requiredSlots);
+
+			// Switch vertex state to assigning
+			final ExecutionGraphIterator it2 = new ExecutionGraphIterator(executionGraph, executionGraph
+				.getIndexOfCurrentExecutionStage(), true, true);
+			while (it2.hasNext()) {
+
+				it2.next().compareAndUpdateExecutionState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
+			}
+		}
+	}
+
+	void findVerticesToBeDeployed(final ExecutionVertex vertex,
+			final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed,
+			final Set<ExecutionVertex> alreadyVisited) {
+
+		if (!alreadyVisited.add(vertex)) {
+			return;
+		}
+
+		if (vertex.compareAndUpdateExecutionState(ExecutionState.ASSIGNED, ExecutionState.READY)) {
+			final Instance instance = vertex.getAllocatedResource().getInstance();
+
+			if (instance instanceof DummyInstance) {
+				LOG.error("Inconsistency: Vertex " + vertex + " is about to be deployed on a DummyInstance");
+			}
+
+			List<ExecutionVertex> verticesForInstance = verticesToBeDeployed.get(instance);
+			if (verticesForInstance == null) {
+				verticesForInstance = new ArrayList<ExecutionVertex>();
+				verticesToBeDeployed.put(instance, verticesForInstance);
+			}
+
+			verticesForInstance.add(vertex);
+		}
+
+		final int numberOfOutputGates = vertex.getNumberOfOutputGates();
+		for (int i = 0; i < numberOfOutputGates; ++i) {
+
+			final ExecutionGate outputGate = vertex.getOutputGate(i);
+			boolean deployTarget;
+
+			switch (outputGate.getChannelType()) {
+			case NETWORK:
+				deployTarget = false;
+				break;
+			case IN_MEMORY:
+				deployTarget = true;
+				break;
+			default:
+				throw new IllegalStateException("Unknown channel type");
+			}
+
+			if (deployTarget) {
+
+				final int numberOfOutputChannels = outputGate.getNumberOfEdges();
+				for (int j = 0; j < numberOfOutputChannels; ++j) {
+					final ExecutionEdge outputChannel = outputGate.getEdge(j);
+					final ExecutionVertex connectedVertex = outputChannel.getInputGate().getVertex();
+					findVerticesToBeDeployed(connectedVertex, verticesToBeDeployed, alreadyVisited);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Collects all execution vertices with the state ASSIGNED starting from the given start vertex and
+	 * deploys them on the assigned {@link eu.stratosphere.nephele.instance.AllocatedResource} objects.
+	 * 
+	 * @param startVertex
+	 *        the execution vertex to start the deployment from
+	 */
+	public void deployAssignedVertices(final ExecutionVertex startVertex) {
+
+		final JobID jobID = startVertex.getExecutionGraph().getJobID();
+
+		final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
+		final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
+
+		findVerticesToBeDeployed(startVertex, verticesToBeDeployed, alreadyVisited);
+
+		if (!verticesToBeDeployed.isEmpty()) {
+
+			final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
+				.entrySet()
+				.iterator();
+
+			while (it2.hasNext()) {
+
+				final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
+				this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
+			}
+		}
+	}
+
+	/**
+	 * Collects all execution vertices with the state ASSIGNED from the given pipeline and deploys them on the assigned
+	 * {@link eu.stratosphere.nephele.instance.AllocatedResource} objects.
+	 * 
+	 * @param pipeline
+	 *        the execution pipeline to be deployed
+	 */
+	public void deployAssignedPipeline(final ExecutionPipeline pipeline) {
+
+		final JobID jobID = null;
+
+		final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
+		final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
+
+		final Iterator<ExecutionVertex> it = pipeline.iterator();
+		while (it.hasNext()) {
+			findVerticesToBeDeployed(it.next(), verticesToBeDeployed, alreadyVisited);
+		}
+
+		if (!verticesToBeDeployed.isEmpty()) {
+
+			final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
+				.entrySet()
+				.iterator();
+
+			while (it2.hasNext()) {
+
+				final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
+				this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
+			}
+		}
+	}
+
+	/**
+	 * Collects all execution vertices with the state ASSIGNED starting from the given collection of start vertices and
+	 * deploys them on the assigned {@link eu.stratosphere.nephele.instance.AllocatedResource} objects.
+	 * 
+	 * @param startVertices
+	 *        the collection of execution vertices to start the deployment from
+	 */
+	public void deployAssignedVertices(final Collection<ExecutionVertex> startVertices) {
+
+		JobID jobID = null;
+
+		final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
+		final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
+
+		for (final ExecutionVertex startVertex : startVertices) {
+
+			if (jobID == null) {
+				jobID = startVertex.getExecutionGraph().getJobID();
+			}
+
+			findVerticesToBeDeployed(startVertex, verticesToBeDeployed, alreadyVisited);
+		}
+
+		if (!verticesToBeDeployed.isEmpty()) {
+
+			final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
+				.entrySet()
+				.iterator();
+
+			while (it2.hasNext()) {
+
+				final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
+				this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
+			}
+		}
+	}
+
+	/**
+	 * Collects all execution vertices with the state ASSIGNED starting from the input vertices of the current execution
+	 * stage and deploys them on the assigned {@link eu.stratosphere.nephele.instance.AllocatedResource} objects.
+	 * 
+	 * @param executionGraph
+	 *        the execution graph to collect the vertices from
+	 */
+	public void deployAssignedInputVertices(final ExecutionGraph executionGraph) {
+
+		final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
+		final ExecutionStage executionStage = executionGraph.getCurrentExecutionStage();
+
+		final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
+
+		for (int i = 0; i < executionStage.getNumberOfStageMembers(); ++i) {
+
+			final ExecutionGroupVertex startVertex = executionStage.getStageMember(i);
+			if (!startVertex.isInputVertex()) {
+				continue;
+			}
+
+			for (int j = 0; j < startVertex.getCurrentNumberOfGroupMembers(); ++j) {
+				final ExecutionVertex vertex = startVertex.getGroupMember(j);
+				findVerticesToBeDeployed(vertex, verticesToBeDeployed, alreadyVisited);
+			}
+		}
+
+		if (!verticesToBeDeployed.isEmpty()) {
+
+			final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
+				.entrySet()
+				.iterator();
+
+			while (it2.hasNext()) {
+
+				final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
+				this.deploymentManager.deploy(executionGraph.getJobID(), entry.getKey(), entry.getValue());
+			}
+		}
+	}
+
+
+	@Override
+	public void resourcesAllocated(final JobID jobID, final List<AllocatedResource> allocatedResources) {
+
+		if (allocatedResources == null) {
+			LOG.error("Resource to lock is null!");
+			return;
+		}
+
+		for (final AllocatedResource allocatedResource : allocatedResources) {
+			if (allocatedResource.getInstance() instanceof DummyInstance) {
+				LOG.debug("Available instance is of type DummyInstance!");
+				return;
+			}
+		}
+
+		final ExecutionGraph eg = getExecutionGraphByID(jobID);
+
+		if (eg == null) {
+			/*
+			 * The job have have been canceled in the meantime, in this case
+			 * we release the instance immediately.
+			 */
+			try {
+				for (final AllocatedResource allocatedResource : allocatedResources) {
+					getInstanceManager().releaseAllocatedResource(allocatedResource);
+				}
+			} catch (InstanceException e) {
+				LOG.error(e);
+			}
+			return;
+		}
+
+		final Runnable command = new Runnable() {
+
+			/**
+			 * {@inheritDoc}
+			 */
+			@Override
+			public void run() {
+
+				final ExecutionStage stage = eg.getCurrentExecutionStage();
+
+				synchronized (stage) {
+
+					for (final AllocatedResource allocatedResource : allocatedResources) {
+
+						AllocatedResource resourceToBeReplaced = null;
+						// Important: only look for instances to be replaced in the current stage
+						final Iterator<ExecutionGroupVertex> groupIterator = new ExecutionGroupVertexIterator(eg, true,
+							stage.getStageNumber());
+						while (groupIterator.hasNext()) {
+
+							final ExecutionGroupVertex groupVertex = groupIterator.next();
+							for (int i = 0; i < groupVertex.getCurrentNumberOfGroupMembers(); ++i) {
+
+								final ExecutionVertex vertex = groupVertex.getGroupMember(i);
+
+								if (vertex.getExecutionState() == ExecutionState.SCHEDULED
+									&& vertex.getAllocatedResource() != null) {
+										resourceToBeReplaced = vertex.getAllocatedResource();
+										break;
+								}
+							}
+
+							if (resourceToBeReplaced != null) {
+								break;
+							}
+						}
+
+						// For some reason, we don't need this instance
+						if (resourceToBeReplaced == null) {
+							LOG.error("Instance " + allocatedResource.getInstance() + " is not required for job"
+								+ eg.getJobID());
+							try {
+								getInstanceManager().releaseAllocatedResource(allocatedResource);
+							} catch (InstanceException e) {
+								LOG.error(e);
+							}
+							return;
+						}
+
+						// Replace the selected instance
+						final Iterator<ExecutionVertex> it = resourceToBeReplaced.assignedVertices();
+						while (it.hasNext()) {
+							final ExecutionVertex vertex = it.next();
+							vertex.setAllocatedResource(allocatedResource);
+							vertex.updateExecutionState(ExecutionState.ASSIGNED);
+						}
+					}
+				}
+
+				// Deploy the assigned vertices
+				deployAssignedInputVertices(eg);
+
+			}
+
+		};
+
+		eg.executeCommand(command);
+	}
+
+	/**
+	 * Checks if the given {@link AllocatedResource} is still required for the
+	 * execution of the given execution graph. If the resource is no longer
+	 * assigned to a vertex that is either currently running or about to run
+	 * the given resource is returned to the instance manager for deallocation.
+	 * 
+	 * @param executionGraph
+	 *        the execution graph the provided resource has been used for so far
+	 * @param allocatedResource
+	 *        the allocated resource to check the assignment for
+	 */
+	public void checkAndReleaseAllocatedResource(final ExecutionGraph executionGraph,
+			final AllocatedResource allocatedResource) {
+
+		if (allocatedResource == null) {
+			LOG.error("Resource to lock is null!");
+			return;
+		}
+
+		if (allocatedResource.getInstance() instanceof DummyInstance) {
+			LOG.debug("Available instance is of type DummyInstance!");
+			return;
+		}
+
+		boolean resourceCanBeReleased = true;
+		final Iterator<ExecutionVertex> it = allocatedResource.assignedVertices();
+		while (it.hasNext()) {
+			final ExecutionVertex vertex = it.next();
+			final ExecutionState state = vertex.getExecutionState();
+
+			if (state != ExecutionState.CREATED && state != ExecutionState.FINISHED
+				&& state != ExecutionState.FAILED && state != ExecutionState.CANCELED) {
+
+				resourceCanBeReleased = false;
+				break;
+			}
+		}
+
+		if (resourceCanBeReleased) {
+
+			LOG.info("Releasing instance " + allocatedResource.getInstance());
+			try {
+				getInstanceManager().releaseAllocatedResource(allocatedResource);
+			} catch (InstanceException e) {
+				LOG.error(StringUtils.stringifyException(e));
+			}
+		}
+	}
+
+	DeploymentManager getDeploymentManager() {
+		return this.deploymentManager;
+	}
+
+	protected void replayCheckpointsFromPreviousStage(final ExecutionGraph executionGraph) {
+
+		final int currentStageIndex = executionGraph.getIndexOfCurrentExecutionStage();
+		final ExecutionStage previousStage = executionGraph.getStage(currentStageIndex - 1);
+
+		final List<ExecutionVertex> verticesToBeReplayed = new ArrayList<ExecutionVertex>();
+
+		for (int i = 0; i < previousStage.getNumberOfOutputExecutionVertices(); ++i) {
+
+			final ExecutionVertex vertex = previousStage.getOutputExecutionVertex(i);
+			vertex.updateExecutionState(ExecutionState.ASSIGNED);
+			verticesToBeReplayed.add(vertex);
+		}
+
+		deployAssignedVertices(verticesToBeReplayed);
+	}
+
+	/**
+	 * Returns a map of vertices to be restarted once they have switched to their <code>CANCELED</code> state.
+	 * 
+	 * @return the map of vertices to be restarted
+	 */
+	Map<ExecutionVertexID, ExecutionVertex> getVerticesToBeRestarted() {
+
+		return this.verticesToBeRestarted;
+	}
+
+
+	@Override
+	public void allocatedResourcesDied(final JobID jobID, final List<AllocatedResource> allocatedResources) {
+
+		final ExecutionGraph eg = getExecutionGraphByID(jobID);
+
+		if (eg == null) {
+			LOG.error("Cannot find execution graph for job with ID " + jobID);
+			return;
+		}
+
+		final Runnable command = new Runnable() {
+
+			/**
+			 * {@inheritDoc}
+			 */
+			@Override
+			public void run() {
+
+				synchronized (eg) {
+
+					for (final AllocatedResource allocatedResource : allocatedResources) {
+
+						LOG.info("Resource " + allocatedResource.getInstance().getName() + " for Job " + jobID
+							+ " died.");
+
+						final ExecutionGraph executionGraph = getExecutionGraphByID(jobID);
+
+						if (executionGraph == null) {
+							LOG.error("Cannot find execution graph for job " + jobID);
+							return;
+						}
+
+						Iterator<ExecutionVertex> vertexIter = allocatedResource.assignedVertices();
+
+						// Assign vertices back to a dummy resource.
+						final DummyInstance dummyInstance = DummyInstance.createDummyInstance();
+						final AllocatedResource dummyResource = new AllocatedResource(dummyInstance,
+								new AllocationID());
+
+						while (vertexIter.hasNext()) {
+							final ExecutionVertex vertex = vertexIter.next();
+							vertex.setAllocatedResource(dummyResource);
+						}
+
+						final String failureMessage = allocatedResource.getInstance().getName() + " died";
+
+						vertexIter = allocatedResource.assignedVertices();
+
+						while (vertexIter.hasNext()) {
+							final ExecutionVertex vertex = vertexIter.next();
+							final ExecutionState state = vertex.getExecutionState();
+
+							switch (state) {
+							case ASSIGNED:
+							case READY:
+							case STARTING:
+							case RUNNING:
+							case FINISHING:
+
+							vertex.updateExecutionState(ExecutionState.FAILED, failureMessage);
+
+							break;
+						default:
+							}
+					}
+
+					// TODO: Fix this
+					/*
+					 * try {
+					 * requestInstances(this.executionVertex.getGroupVertex().getExecutionStage());
+					 * } catch (InstanceException e) {
+					 * e.printStackTrace();
+					 * // TODO: Cancel the entire job in this case
+					 * }
+					 */
+				}
+			}
+
+			final InternalJobStatus js = eg.getJobStatus();
+			if (js != InternalJobStatus.FAILING && js != InternalJobStatus.FAILED) {
+
+				// TODO: Fix this
+				// deployAssignedVertices(eg);
+
+				final ExecutionStage stage = eg.getCurrentExecutionStage();
+
+				try {
+					requestInstances(stage);
+				} catch (InstanceException e) {
+					e.printStackTrace();
+					// TODO: Cancel the entire job in this case
+				}
+			}
+		}
+		};
+
+		eg.executeCommand(command);
+	}
+}