You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:54 UTC

[30/63] [abbrv] Redesign Scheduler from pre-assignment to more flexible schedule-on-demand model

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
index 701f802..d23d35f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
@@ -16,757 +16,963 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.jobmanager.scheduler;
 
-import java.util.ArrayList;
-import java.util.Collection;
+import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.Deque;
-import java.util.ArrayDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionEdge;
-import org.apache.flink.runtime.executiongraph.ExecutionGate;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraphIterator;
-import org.apache.flink.runtime.executiongraph.ExecutionGroupVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionGroupVertexIterator;
-import org.apache.flink.runtime.executiongraph.ExecutionPipeline;
-import org.apache.flink.runtime.executiongraph.ExecutionStage;
-import org.apache.flink.runtime.executiongraph.ExecutionStageListener;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
-import org.apache.flink.runtime.executiongraph.InternalJobStatus;
-import org.apache.flink.runtime.executiongraph.JobStatusListener;
-import org.apache.flink.runtime.instance.AllocatedResource;
-import org.apache.flink.runtime.instance.AllocationID;
-import org.apache.flink.runtime.instance.DummyInstance;
+
+import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceException;
+import org.apache.flink.runtime.instance.InstanceDiedException;
 import org.apache.flink.runtime.instance.InstanceListener;
-import org.apache.flink.runtime.instance.InstanceManager;
 import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobmanager.DeploymentManager;
-import org.apache.flink.util.StringUtils;
 
 /**
- * The default scheduler for Nephele. While Nephele's
- * {@link org.apache.flink.runtime.jobmanager.JobManager} is responsible for requesting the required instances for the
- * job at the {@link org.apache.flink.runtime.instance.InstanceManager}, the scheduler is in charge of assigning the
- * individual tasks to the instances.
- * 
+ * The scheduler is responsible for distributing the ready-to-run tasks and assigning them to instances and
+ * slots.
+ * <p>
+ * The scheduler's bookkeeping on the available instances is lazy: It is not modified once an
+ * instance is dead, but it will lazily remove the instance from its pool as soon as it tries
+ * to allocate a resource on that instance and it fails with an {@link InstanceDiedException}.
  */
-public class DefaultScheduler implements InstanceListener, JobStatusListener, ExecutionStageListener {
+public class DefaultScheduler implements InstanceListener {
 
-	/**
-	 * The LOG object to report events within the scheduler.
-	 */
 	protected static final Logger LOG = LoggerFactory.getLogger(DefaultScheduler.class);
 
-	/**
-	 * The instance manager assigned to this scheduler.
-	 */
-	private final InstanceManager instanceManager;
-
-	/**
-	 * The deployment manager assigned to this scheduler.
-	 */
-	private final DeploymentManager deploymentManager;
-
-	/**
-	 * Stores the vertices to be restarted once they have switched to the <code>CANCELED</code> state.
-	 */
-	private final Map<ExecutionVertexID, ExecutionVertex> verticesToBeRestarted = new ConcurrentHashMap<ExecutionVertexID, ExecutionVertex>();
-
-	/**
-	 * The job queue where all submitted jobs go to.
-	 */
-	private Deque<ExecutionGraph> jobQueue = new ArrayDeque<ExecutionGraph>();
-
-	/**
-	 * Constructs a new abstract scheduler.
-	 * 
-	 * @param deploymentManager
-	 *        the deployment manager assigned to this scheduler
-	 * @param instanceManager
-	 *        the instance manager to be used with this scheduler
-	 */
-	public DefaultScheduler(final DeploymentManager deploymentManager, final InstanceManager instanceManager) {
-
-		this.deploymentManager = deploymentManager;
-		this.instanceManager = instanceManager;
-		this.instanceManager.setInstanceListener(this);
+	
+	private final Object lock = new Object();
+	
+	/** All instances that the scheduler can deploy to */
+	private final Set<Instance> allInstances = new HashSet<Instance>();
+	
+	/** All instances that still have available resources */
+	private final Queue<Instance> instancesWithAvailableResources = new LifoSetQueue<Instance>();
+
+	
+	private final ConcurrentHashMap<ResourceId, AllocatedSlot> allocatedSlots = new ConcurrentHashMap<ResourceId, AllocatedSlot>();
+	
+//	/** A cache that remembers the last resource IDs it has seen, to co-locate future
+//	 *  deployments of tasks with the same resource ID to the same instance.
+//	 */
+//	private final Cache<ResourceId, Instance> ghostCache;
+	
+	
+	/** All tasks pending to be scheduled */
+	private final LinkedBlockingQueue<ScheduledUnit> taskQueue = new LinkedBlockingQueue<ScheduledUnit>();
+
+	
+	/** The thread that runs the scheduling loop, picking up tasks to be scheduled and scheduling them. */
+	private final Thread schedulerThread;
+	
+	
+	/** Atomic flag to safely control the shutdown */
+	private final AtomicBoolean shutdown = new AtomicBoolean(false);
+	
+	/** Flag indicating whether the scheduler should reject a unit if it cannot find a resource
+	 * for it at the time of scheduling */
+	private final boolean rejectIfNoResourceAvailable;
+	
+
+	
+	public DefaultScheduler() {
+		this(true);
 	}
-
-	/**
-	 * Removes the job represented by the given {@link ExecutionGraph} from the scheduler.
-	 *
-	 * @param executionGraphToRemove
-	 *        the job to be removed
-	 */
-	void removeJobFromSchedule(final ExecutionGraph executionGraphToRemove) {
-
-		boolean removedFromQueue = false;
-
-		synchronized (this.jobQueue) {
-
-			final Iterator<ExecutionGraph> it = this.jobQueue.iterator();
-			while (it.hasNext()) {
-
-				final ExecutionGraph executionGraph = it.next();
-				if (executionGraph.getJobID().equals(executionGraphToRemove.getJobID())) {
-					removedFromQueue = true;
-					it.remove();
-					break;
-				}
+	
+	public DefaultScheduler(boolean rejectIfNoResourceAvailable) {
+		this.rejectIfNoResourceAvailable = rejectIfNoResourceAvailable;
+		
+		
+//		this.ghostCache = CacheBuilder.newBuilder()
+//				.initialCapacity(64)	// easy start
+//				.maximumSize(1024)		// retain some history
+//				.weakValues()			// do not prevent dead instances from being collected
+//				.build();
+		
+		// set up (but do not start) the scheduling thread
+		Runnable loopRunner = new Runnable() {
+			@Override
+			public void run() {
+				runSchedulerLoop();
 			}
-		}
-
-		if (!removedFromQueue) {
-			LOG.error("Cannot find job " + executionGraphToRemove.getJobName() + " ("
-					+ executionGraphToRemove.getJobID() + ") to remove");
-		}
+		};
+		this.schedulerThread = new Thread(loopRunner, "Scheduling Thread");
 	}
-
-	/**
-	 * Adds a job represented by an {@link ExecutionGraph} object to the scheduler. The job is then executed according
-	 * to the strategies of the concrete scheduler implementation.
-	 *
-	 * @param executionGraph
-	 *        the job to be added to the scheduler
-	 * @throws SchedulingException
-	 *         thrown if an error occurs and the scheduler does not accept the new job
-	 */
-	public void scheduleJob(final ExecutionGraph executionGraph) throws SchedulingException {
-
-		final int requiredSlots = executionGraph.getRequiredSlots();
-		final int availableSlots = this.getInstanceManager().getNumberOfSlots();
-
-		if(requiredSlots > availableSlots){
-			throw new SchedulingException(String.format(
-					"Not enough available task slots to run job %s (%s). Required: %d Available: %d . "
-					+ "Either reduce the parallelism of your program, wait for other programs to finish, or increase "
-					+ "the number of task slots in the cluster by adding more machines or increasing the number of slots "
-					+ "per machine in conf/flink-conf.yaml .", 
-					executionGraph.getJobName(), executionGraph.getJobID(), requiredSlots, availableSlots));
-		}
-
-		// Subscribe to job status notifications
-		executionGraph.registerJobStatusListener(this);
-
-		// Register execution listener for each vertex
-		final ExecutionGraphIterator it2 = new ExecutionGraphIterator(executionGraph, true);
-		while (it2.hasNext()) {
-
-			final ExecutionVertex vertex = it2.next();
-			vertex.registerExecutionListener(new DefaultExecutionListener(this, vertex));
-		}
-
-		// Register the scheduler as an execution stage listener
-		executionGraph.registerExecutionStageListener(this);
-
-		// Add job to the job queue (important to add job to queue before requesting instances)
-		synchronized (this.jobQueue) {
-			this.jobQueue.add(executionGraph);
+	
+	public void start() {
+		if (shutdown.get()) {
+			throw new IllegalStateException("Scheduler has been shut down.");
 		}
-
-		// Request resources for the first stage of the job
-
-		final ExecutionStage executionStage = executionGraph.getCurrentExecutionStage();
+		
 		try {
-			requestInstances(executionStage);
-		} catch (InstanceException e) {
-			final String exceptionMessage = StringUtils.stringifyException(e);
-			LOG.error(exceptionMessage);
-			this.jobQueue.remove(executionGraph);
-			throw new SchedulingException(exceptionMessage);
+			this.schedulerThread.start();
 		}
-	}
-
-	/**
-	 * Returns the execution graph which is associated with the given job ID.
-	 *
-	 * @param jobID
-	 *        the job ID to search the execution graph for
-	 * @return the execution graph which belongs to the given job ID or <code>null</code if no such execution graph
-	 *         exists
-	 */
-	public ExecutionGraph getExecutionGraphByID(final JobID jobID) {
-
-		synchronized (this.jobQueue) {
-
-			final Iterator<ExecutionGraph> it = this.jobQueue.iterator();
-			while (it.hasNext()) {
-
-				final ExecutionGraph executionGraph = it.next();
-				if (executionGraph.getJobID().equals(jobID)) {
-					return executionGraph;
-				}
-			}
+		catch (IllegalThreadStateException e) {
+			throw new IllegalStateException("The scheduler has already been started.");
 		}
-
-		return null;
 	}
-
+	
 	/**
-	 * Shuts the scheduler down. After shut down no jobs can be added to the scheduler.
+	 * Shuts the scheduler down. After shut down no more tasks can be added to the scheduler.
 	 */
 	public void shutdown() {
-
-		synchronized (this.jobQueue) {
-			this.jobQueue.clear();
+		if (this.shutdown.compareAndSet(false, true)) {
+			// clear the task queue and add the termination signal, to let
+			// the scheduling loop know that things are done
+			this.taskQueue.clear();
+			this.taskQueue.add(TERMINATION_SIGNAL);
+			
+			// interrupt the scheduling thread, in case it was waiting for resources to
+			// show up to deploy a task
+			this.schedulerThread.interrupt();
 		}
-
 	}
-
-	public void jobStatusHasChanged(final ExecutionGraph executionGraph, final InternalJobStatus newJobStatus,
-									final String optionalMessage) {
-
-		if (newJobStatus == InternalJobStatus.FAILED || newJobStatus == InternalJobStatus.FINISHED
-				|| newJobStatus == InternalJobStatus.CANCELED) {
-			removeJobFromSchedule(executionGraph);
-		}
-	}
-
-	public void nextExecutionStageEntered(final JobID jobID, final ExecutionStage executionStage) {
-
-		// Request new instances if necessary
-		try {
-			requestInstances(executionStage);
-		} catch (InstanceException e) {
-			// TODO: Handle error correctly
-			LOG.error(StringUtils.stringifyException(e));
+	
+	public void setUncaughtExceptionHandler(UncaughtExceptionHandler handler) {
+		if (this.schedulerThread.getState() != Thread.State.NEW) {
+			throw new IllegalStateException("Can only add exception handler before starting the scheduler.");
 		}
-
-		// Deploy the assigned vertices
-		deployAssignedInputVertices(executionStage.getExecutionGraph());
+		this.schedulerThread.setUncaughtExceptionHandler(handler);
 	}
 
-
-	/**
-	 * Returns the {@link org.apache.flink.runtime.instance.InstanceManager} object which is used by the current scheduler.
-	 * 
-	 * @return the {@link org.apache.flink.runtime.instance.InstanceManager} object which is used by the current scheduler
-	 */
-	public InstanceManager getInstanceManager() {
-		return this.instanceManager;
+//	/**
+//	 * Removes the job represented by the given {@link ExecutionGraph} from the scheduler.
+//	 *
+//	 * @param executionGraphToRemove
+//	 *        the job to be removed
+//	 */
+//	void removeJobFromSchedule(final ExecutionGraph executionGraphToRemove) {
+//
+//		boolean removedFromQueue = false;
+//
+//		synchronized (this.jobQueue) {
+//
+//			final Iterator<ExecutionGraph> it = this.jobQueue.iterator();
+//			while (it.hasNext()) {
+//
+//				final ExecutionGraph executionGraph = it.next();
+//				if (executionGraph.getJobID().equals(executionGraphToRemove.getJobID())) {
+//					removedFromQueue = true;
+//					it.remove();
+//					break;
+//				}
+//			}
+//		}
+//
+//		if (!removedFromQueue) {
+//			LOG.error("Cannot find job " + executionGraphToRemove.getJobName() + " ("
+//					+ executionGraphToRemove.getJobID() + ") to remove");
+//		}
+//	}
+//
+//	/**
+//	 * Adds a job represented by an {@link ExecutionGraph} object to the scheduler. The job is then executed according
+//	 * to the strategies of the concrete scheduler implementation.
+//	 *
+//	 * @param executionGraph
+//	 *        the job to be added to the scheduler
+//	 * @throws SchedulingException
+//	 *         thrown if an error occurs and the scheduler does not accept the new job
+//	 */
+//	public void scheduleJob(final ExecutionGraph executionGraph) throws SchedulingException {
+//
+//		final int requiredSlots = executionGraph.getRequiredSlots();
+//		final int availableSlots = this.getInstanceManager().getNumberOfSlots();
+//
+//		if(requiredSlots > availableSlots){
+//			throw new SchedulingException(String.format(
+//					"Not enough available task slots to run job %s (%s). Required: %d Available: %d . "
+//					+ "Either reduce the parallelism of your program, wait for other programs to finish, or increase "
+//					+ "the number of task slots in the cluster by adding more machines or increasing the number of slots "
+//					+ "per machine in conf/flink-conf.yaml .", 
+//					executionGraph.getJobName(), executionGraph.getJobID(), requiredSlots, availableSlots));
+//		}
+//
+//		// Subscribe to job status notifications
+//		executionGraph.registerJobStatusListener(this);
+//
+//		// Register execution listener for each vertex
+//		final ExecutionGraphIterator it2 = new ExecutionGraphIterator(executionGraph, true);
+//		while (it2.hasNext()) {
+//
+//			final ExecutionVertex vertex = it2.next();
+//			vertex.registerExecutionListener(new DefaultExecutionListener(this, vertex));
+//		}
+//
+//		// Register the scheduler as an execution stage listener
+//		executionGraph.registerExecutionStageListener(this);
+//
+//		// Add job to the job queue (important to add job to queue before requesting instances)
+//		synchronized (this.jobQueue) {
+//			this.jobQueue.add(executionGraph);
+//		}
+//
+//		// Request resources for the first stage of the job
+//
+//		final ExecutionStage executionStage = executionGraph.getCurrentExecutionStage();
+//		try {
+//			requestInstances(executionStage);
+//		} catch (InstanceException e) {
+//			final String exceptionMessage = StringUtils.stringifyException(e);
+//			LOG.error(exceptionMessage);
+//			this.jobQueue.remove(executionGraph);
+//			throw new SchedulingException(exceptionMessage);
+//		}
+//	}
+//
+//	/**
+//	 * Returns the execution graph which is associated with the given job ID.
+//	 *
+//	 * @param jobID
+//	 *        the job ID to search the execution graph for
+//	 * @return the execution graph which belongs to the given job ID or <code>null</code if no such execution graph
+//	 *         exists
+//	 */
+//	public ExecutionGraph getExecutionGraphByID(final JobID jobID) {
+//
+//		synchronized (this.jobQueue) {
+//
+//			final Iterator<ExecutionGraph> it = this.jobQueue.iterator();
+//			while (it.hasNext()) {
+//
+//				final ExecutionGraph executionGraph = it.next();
+//				if (executionGraph.getJobID().equals(jobID)) {
+//					return executionGraph;
+//				}
+//			}
+//		}
+//
+//		return null;
+//	}
+//
+//
+//
+//	public void jobStatusHasChanged(final ExecutionGraph executionGraph, final InternalJobStatus newJobStatus,
+//									final String optionalMessage) {
+//
+//		if (newJobStatus == InternalJobStatus.FAILED || newJobStatus == InternalJobStatus.FINISHED
+//				|| newJobStatus == InternalJobStatus.CANCELED) {
+//			removeJobFromSchedule(executionGraph);
+//		}
+//	}
+//
+//	public void nextExecutionStageEntered(final JobID jobID, final ExecutionStage executionStage) {
+//
+//		// Request new instances if necessary
+//		try {
+//			requestInstances(executionStage);
+//		} catch (InstanceException e) {
+//			// TODO: Handle error correctly
+//			LOG.error(StringUtils.stringifyException(e));
+//		}
+//
+//		// Deploy the assigned vertices
+//		deployAssignedInputVertices(executionStage.getExecutionGraph());
+//	}
+//
+//
+//	/**
+//	 * Returns the {@link org.apache.flink.runtime.instance.InstanceManager} object which is used by the current scheduler.
+//	 * 
+//	 * @return the {@link org.apache.flink.runtime.instance.InstanceManager} object which is used by the current scheduler
+//	 */
+//	public InstanceManager getInstanceManager() {
+//		return this.instanceManager;
+//	}
+//
+//
+//	/**
+//	 * Collects the instances required to run the job from the given {@link ExecutionStage} and requests them at the
+//	 * loaded instance manager.
+//	 * 
+//	 * @param executionStage
+//	 *        the execution stage to collect the required instances from
+//	 * @throws InstanceException
+//	 *         thrown if the given execution graph is already processing its final stage
+//	 */
+//	protected void requestInstances(final ExecutionStage executionStage) throws InstanceException {
+//
+//		final ExecutionGraph executionGraph = executionStage.getExecutionGraph();
+//
+//		synchronized (executionStage) {
+//
+//			final int requiredSlots = executionStage.getRequiredSlots();
+//
+//			LOG.info("Requesting " + requiredSlots + " slots for job " + executionGraph.getJobID());
+//
+//			this.instanceManager.requestInstance(executionGraph.getJobID(), executionGraph.getJobConfiguration(),
+//				requiredSlots);
+//
+//			// Switch vertex state to assigning
+//			final ExecutionGraphIterator it2 = new ExecutionGraphIterator(executionGraph, executionGraph
+//				.getIndexOfCurrentExecutionStage(), true, true);
+//			while (it2.hasNext()) {
+//
+//				it2.next().compareAndUpdateExecutionState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
+//			}
+//		}
+//	}
+//
+//	void findVerticesToBeDeployed(final ExecutionVertex vertex,
+//			final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed,
+//			final Set<ExecutionVertex> alreadyVisited) {
+//
+//		if (!alreadyVisited.add(vertex)) {
+//			return;
+//		}
+//
+//		if (vertex.compareAndUpdateExecutionState(ExecutionState.ASSIGNED, ExecutionState.READY)) {
+//			final Instance instance = vertex.getAllocatedResource().getInstance();
+//
+//			if (instance instanceof DummyInstance) {
+//				LOG.error("Inconsistency: Vertex " + vertex + " is about to be deployed on a DummyInstance");
+//			}
+//
+//			List<ExecutionVertex> verticesForInstance = verticesToBeDeployed.get(instance);
+//			if (verticesForInstance == null) {
+//				verticesForInstance = new ArrayList<ExecutionVertex>();
+//				verticesToBeDeployed.put(instance, verticesForInstance);
+//			}
+//
+//			verticesForInstance.add(vertex);
+//		}
+//
+//		final int numberOfOutputGates = vertex.getNumberOfOutputGates();
+//		for (int i = 0; i < numberOfOutputGates; ++i) {
+//
+//			final ExecutionGate outputGate = vertex.getOutputGate(i);
+//			boolean deployTarget;
+//
+//			switch (outputGate.getChannelType()) {
+//			case NETWORK:
+//				deployTarget = false;
+//				break;
+//			case IN_MEMORY:
+//				deployTarget = true;
+//				break;
+//			default:
+//				throw new IllegalStateException("Unknown channel type");
+//			}
+//
+//			if (deployTarget) {
+//
+//				final int numberOfOutputChannels = outputGate.getNumberOfEdges();
+//				for (int j = 0; j < numberOfOutputChannels; ++j) {
+//					final ExecutionEdge outputChannel = outputGate.getEdge(j);
+//					final ExecutionVertex connectedVertex = outputChannel.getInputGate().getVertex();
+//					findVerticesToBeDeployed(connectedVertex, verticesToBeDeployed, alreadyVisited);
+//				}
+//			}
+//		}
+//	}
+//
+//	/**
+//	 * Collects all execution vertices with the state ASSIGNED starting from the given start vertex and
+//	 * deploys them on the assigned {@link org.apache.flink.runtime.instance.AllocatedResource} objects.
+//	 * 
+//	 * @param startVertex
+//	 *        the execution vertex to start the deployment from
+//	 */
+//	public void deployAssignedVertices(final ExecutionVertex startVertex) {
+//
+//		final JobID jobID = startVertex.getExecutionGraph().getJobID();
+//
+//		final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
+//		final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
+//
+//		findVerticesToBeDeployed(startVertex, verticesToBeDeployed, alreadyVisited);
+//
+//		if (!verticesToBeDeployed.isEmpty()) {
+//
+//			final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
+//				.entrySet()
+//				.iterator();
+//
+//			while (it2.hasNext()) {
+//
+//				final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
+//				this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
+//			}
+//		}
+//	}
+//
+//	/**
+//	 * Collects all execution vertices with the state ASSIGNED from the given pipeline and deploys them on the assigned
+//	 * {@link org.apache.flink.runtime.instance.AllocatedResource} objects.
+//	 * 
+//	 * @param pipeline
+//	 *        the execution pipeline to be deployed
+//	 */
+//	public void deployAssignedPipeline(final ExecutionPipeline pipeline) {
+//
+//		final JobID jobID = null;
+//
+//		final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
+//		final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
+//
+//		final Iterator<ExecutionVertex> it = pipeline.iterator();
+//		while (it.hasNext()) {
+//			findVerticesToBeDeployed(it.next(), verticesToBeDeployed, alreadyVisited);
+//		}
+//
+//		if (!verticesToBeDeployed.isEmpty()) {
+//
+//			final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
+//				.entrySet()
+//				.iterator();
+//
+//			while (it2.hasNext()) {
+//
+//				final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
+//				this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
+//			}
+//		}
+//	}
+//
+//	/**
+//	 * Collects all execution vertices with the state ASSIGNED starting from the given collection of start vertices and
+//	 * deploys them on the assigned {@link org.apache.flink.runtime.instance.AllocatedResource} objects.
+//	 * 
+//	 * @param startVertices
+//	 *        the collection of execution vertices to start the deployment from
+//	 */
+//	public void deployAssignedVertices(final Collection<ExecutionVertex> startVertices) {
+//
+//		JobID jobID = null;
+//
+//		final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
+//		final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
+//
+//		for (final ExecutionVertex startVertex : startVertices) {
+//
+//			if (jobID == null) {
+//				jobID = startVertex.getExecutionGraph().getJobID();
+//			}
+//
+//			findVerticesToBeDeployed(startVertex, verticesToBeDeployed, alreadyVisited);
+//		}
+//
+//		if (!verticesToBeDeployed.isEmpty()) {
+//
+//			final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
+//				.entrySet()
+//				.iterator();
+//
+//			while (it2.hasNext()) {
+//
+//				final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
+//				this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
+//			}
+//		}
+//	}
+//
+//	/**
+//	 * Collects all execution vertices with the state ASSIGNED starting from the input vertices of the current execution
+//	 * stage and deploys them on the assigned {@link org.apache.flink.runtime.instance.AllocatedResource} objects.
+//	 * 
+//	 * @param executionGraph
+//	 *        the execution graph to collect the vertices from
+//	 */
+//	public void deployAssignedInputVertices(final ExecutionGraph executionGraph) {
+//
+//		final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
+//		final ExecutionStage executionStage = executionGraph.getCurrentExecutionStage();
+//
+//		final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
+//
+//		for (int i = 0; i < executionStage.getNumberOfStageMembers(); ++i) {
+//
+//			final ExecutionGroupVertex startVertex = executionStage.getStageMember(i);
+//			if (!startVertex.isInputVertex()) {
+//				continue;
+//			}
+//
+//			for (int j = 0; j < startVertex.getCurrentNumberOfGroupMembers(); ++j) {
+//				final ExecutionVertex vertex = startVertex.getGroupMember(j);
+//				findVerticesToBeDeployed(vertex, verticesToBeDeployed, alreadyVisited);
+//			}
+//		}
+//
+//		if (!verticesToBeDeployed.isEmpty()) {
+//
+//			final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
+//				.entrySet()
+//				.iterator();
+//
+//			while (it2.hasNext()) {
+//
+//				final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
+//				this.deploymentManager.deploy(executionGraph.getJobID(), entry.getKey(), entry.getValue());
+//			}
+//		}
+//	}
+//
+//
+//	@Override
+//	public void resourcesAllocated(final JobID jobID, final List<AllocatedResource> allocatedResources) {
+//
+//		if (allocatedResources == null) {
+//			LOG.error("Resource to lock is null!");
+//			return;
+//		}
+//
+//		for (final AllocatedResource allocatedResource : allocatedResources) {
+//			if (allocatedResource.getInstance() instanceof DummyInstance) {
+//				LOG.debug("Available instance is of type DummyInstance!");
+//				return;
+//			}
+//		}
+//
+//		final ExecutionGraph eg = getExecutionGraphByID(jobID);
+//
+//		if (eg == null) {
+//			/*
+//			 * The job have have been canceled in the meantime, in this case
+//			 * we release the instance immediately.
+//			 */
+//			try {
+//				for (final AllocatedResource allocatedResource : allocatedResources) {
+//					getInstanceManager().releaseAllocatedResource(allocatedResource);
+//				}
+//			} catch (InstanceException e) {
+//				LOG.error(e);
+//			}
+//			return;
+//		}
+//
+//		final Runnable command = new Runnable() {
+//
+//			/**
+//			 * {@inheritDoc}
+//			 */
+//			@Override
+//			public void run() {
+//
+//				final ExecutionStage stage = eg.getCurrentExecutionStage();
+//
+//				synchronized (stage) {
+//
+//					for (final AllocatedResource allocatedResource : allocatedResources) {
+//
+//						AllocatedResource resourceToBeReplaced = null;
+//						// Important: only look for instances to be replaced in the current stage
+//						final Iterator<ExecutionGroupVertex> groupIterator = new ExecutionGroupVertexIterator(eg, true,
+//							stage.getStageNumber());
+//						while (groupIterator.hasNext()) {
+//
+//							final ExecutionGroupVertex groupVertex = groupIterator.next();
+//							for (int i = 0; i < groupVertex.getCurrentNumberOfGroupMembers(); ++i) {
+//
+//								final ExecutionVertex vertex = groupVertex.getGroupMember(i);
+//
+//								if (vertex.getExecutionState() == ExecutionState.SCHEDULED
+//									&& vertex.getAllocatedResource() != null) {
+//										resourceToBeReplaced = vertex.getAllocatedResource();
+//										break;
+//								}
+//							}
+//
+//							if (resourceToBeReplaced != null) {
+//								break;
+//							}
+//						}
+//
+//						// For some reason, we don't need this instance
+//						if (resourceToBeReplaced == null) {
+//							LOG.error("Instance " + allocatedResource.getInstance() + " is not required for job"
+//								+ eg.getJobID());
+//							try {
+//								getInstanceManager().releaseAllocatedResource(allocatedResource);
+//							} catch (InstanceException e) {
+//								LOG.error(e);
+//							}
+//							return;
+//						}
+//
+//						// Replace the selected instance
+//						final Iterator<ExecutionVertex> it = resourceToBeReplaced.assignedVertices();
+//						while (it.hasNext()) {
+//							final ExecutionVertex vertex = it.next();
+//							vertex.setAllocatedResource(allocatedResource);
+//							vertex.updateExecutionState(ExecutionState.ASSIGNED);
+//						}
+//					}
+//				}
+//
+//				// Deploy the assigned vertices
+//				deployAssignedInputVertices(eg);
+//
+//			}
+//
+//		};
+//
+//		eg.executeCommand(command);
+//	}
+//
+//	/**
+//	 * Checks if the given {@link AllocatedResource} is still required for the
+//	 * execution of the given execution graph. If the resource is no longer
+//	 * assigned to a vertex that is either currently running or about to run
+//	 * the given resource is returned to the instance manager for deallocation.
+//	 * 
+//	 * @param executionGraph
+//	 *        the execution graph the provided resource has been used for so far
+//	 * @param allocatedResource
+//	 *        the allocated resource to check the assignment for
+//	 */
+//	public void checkAndReleaseAllocatedResource(final ExecutionGraph executionGraph,
+//			final AllocatedResource allocatedResource) {
+//
+//		if (allocatedResource == null) {
+//			LOG.error("Resource to lock is null!");
+//			return;
+//		}
+//
+//		if (allocatedResource.getInstance() instanceof DummyInstance) {
+//			LOG.debug("Available instance is of type DummyInstance!");
+//			return;
+//		}
+//
+//		boolean resourceCanBeReleased = true;
+//		final Iterator<ExecutionVertex> it = allocatedResource.assignedVertices();
+//		while (it.hasNext()) {
+//			final ExecutionVertex vertex = it.next();
+//			final ExecutionState state = vertex.getExecutionState();
+//
+//			if (state != ExecutionState.CREATED && state != ExecutionState.FINISHED
+//				&& state != ExecutionState.FAILED && state != ExecutionState.CANCELED) {
+//
+//				resourceCanBeReleased = false;
+//				break;
+//			}
+//		}
+//
+//		if (resourceCanBeReleased) {
+//
+//			LOG.info("Releasing instance " + allocatedResource.getInstance());
+//			try {
+//				getInstanceManager().releaseAllocatedResource(allocatedResource);
+//			} catch (InstanceException e) {
+//				LOG.error(StringUtils.stringifyException(e));
+//			}
+//		}
+//	}
+//
+//	DeploymentManager getDeploymentManager() {
+//		return this.deploymentManager;
+//	}
+//
+//
+//
+//	@Override
+//	public void allocatedResourcesDied(final JobID jobID, final List<AllocatedResource> allocatedResources) {
+//
+//		final ExecutionGraph eg = getExecutionGraphByID(jobID);
+//
+//		if (eg == null) {
+//			LOG.error("Cannot find execution graph for job with ID " + jobID);
+//			return;
+//		}
+//
+//		final Runnable command = new Runnable() {
+//
+//			/**
+//			 * {@inheritDoc}
+//			 */
+//			@Override
+//			public void run() {
+//
+//				synchronized (eg) {
+//
+//					for (final AllocatedResource allocatedResource : allocatedResources) {
+//
+//						LOG.info("Resource " + allocatedResource.getInstance().getName() + " for Job " + jobID
+//							+ " died.");
+//
+//						final ExecutionGraph executionGraph = getExecutionGraphByID(jobID);
+//
+//						if (executionGraph == null) {
+//							LOG.error("Cannot find execution graph for job " + jobID);
+//							return;
+//						}
+//
+//						Iterator<ExecutionVertex> vertexIter = allocatedResource.assignedVertices();
+//
+//						// Assign vertices back to a dummy resource.
+//						final DummyInstance dummyInstance = DummyInstance.createDummyInstance();
+//						final AllocatedResource dummyResource = new AllocatedResource(dummyInstance,
+//								new AllocationID());
+//
+//						while (vertexIter.hasNext()) {
+//							final ExecutionVertex vertex = vertexIter.next();
+//							vertex.setAllocatedResource(dummyResource);
+//						}
+//
+//						final String failureMessage = allocatedResource.getInstance().getName() + " died";
+//
+//						vertexIter = allocatedResource.assignedVertices();
+//
+//						while (vertexIter.hasNext()) {
+//							final ExecutionVertex vertex = vertexIter.next();
+//							final ExecutionState state = vertex.getExecutionState();
+//
+//							switch (state) {
+//							case ASSIGNED:
+//							case READY:
+//							case STARTING:
+//							case RUNNING:
+//							case FINISHING:
+//
+//							vertex.updateExecutionState(ExecutionState.FAILED, failureMessage);
+//
+//							break;
+//						default:
+//							}
+//					}
+//
+//					// TODO: Fix this
+//					/*
+//					 * try {
+//					 * requestInstances(this.executionVertex.getGroupVertex().getExecutionStage());
+//					 * } catch (InstanceException e) {
+//					 * e.printStackTrace();
+//					 * // TODO: Cancel the entire job in this case
+//					 * }
+//					 */
+//				}
+//			}
+//
+//			final InternalJobStatus js = eg.getJobStatus();
+//			if (js != InternalJobStatus.FAILING && js != InternalJobStatus.FAILED) {
+//
+//				// TODO: Fix this
+//				// deployAssignedVertices(eg);
+//
+//				final ExecutionStage stage = eg.getCurrentExecutionStage();
+//
+//				try {
+//					requestInstances(stage);
+//				} catch (InstanceException e) {
+//					e.printStackTrace();
+//					// TODO: Cancel the entire job in this case
+//				}
+//			}
+//		}
+//		};
+//
+//		eg.executeCommand(command);
+//	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Canceling
+	// --------------------------------------------------------------------------------------------
+	
+	public void removeAllTasksForJob(JobID job) {
+		
 	}
 
-
-	/**
-	 * Collects the instances required to run the job from the given {@link ExecutionStage} and requests them at the
-	 * loaded instance manager.
-	 * 
-	 * @param executionStage
-	 *        the execution stage to collect the required instances from
-	 * @throws InstanceException
-	 *         thrown if the given execution graph is already processing its final stage
-	 */
-	protected void requestInstances(final ExecutionStage executionStage) throws InstanceException {
-
-		final ExecutionGraph executionGraph = executionStage.getExecutionGraph();
-
-		synchronized (executionStage) {
-
-			final int requiredSlots = executionStage.getRequiredSlots();
-
-			LOG.info("Requesting " + requiredSlots + " slots for job " + executionGraph.getJobID());
-
-			this.instanceManager.requestInstance(executionGraph.getJobID(), executionGraph.getJobConfiguration(),
-				requiredSlots);
-
-			// Switch vertex state to assigning
-			final ExecutionGraphIterator it2 = new ExecutionGraphIterator(executionGraph, executionGraph
-				.getIndexOfCurrentExecutionStage(), true, true);
-			while (it2.hasNext()) {
-
-				it2.next().compareAndUpdateExecutionState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
-			}
+	// --------------------------------------------------------------------------------------------
+	//  Instance Availability
+	// --------------------------------------------------------------------------------------------
+	
+	
+	@Override
+	public void newInstanceAvailable(Instance instance) {
+		if (instance == null) {
+			throw new IllegalArgumentException();
 		}
-	}
-
-	void findVerticesToBeDeployed(final ExecutionVertex vertex,
-			final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed,
-			final Set<ExecutionVertex> alreadyVisited) {
-
-		if (!alreadyVisited.add(vertex)) {
-			return;
+		if (instance.getNumberOfAvailableSlots() <= 0) {
+			throw new IllegalArgumentException("The given instance has no resources.");
 		}
-
-		if (vertex.compareAndUpdateExecutionState(ExecutionState.ASSIGNED, ExecutionState.READY)) {
-			final Instance instance = vertex.getAllocatedResource().getInstance();
-
-			if (instance instanceof DummyInstance) {
-				LOG.error("Inconsistency: Vertex " + vertex + " is about to be deployed on a DummyInstance");
-			}
-
-			List<ExecutionVertex> verticesForInstance = verticesToBeDeployed.get(instance);
-			if (verticesForInstance == null) {
-				verticesForInstance = new ArrayList<ExecutionVertex>();
-				verticesToBeDeployed.put(instance, verticesForInstance);
-			}
-
-			verticesForInstance.add(vertex);
+		if (!instance.isAlive()) {
+			throw new IllegalArgumentException("The instance is not alive.");
 		}
-
-		final int numberOfOutputGates = vertex.getNumberOfOutputGates();
-		for (int i = 0; i < numberOfOutputGates; ++i) {
-
-			final ExecutionGate outputGate = vertex.getOutputGate(i);
-			boolean deployTarget;
-
-			switch (outputGate.getChannelType()) {
-			case NETWORK:
-				deployTarget = false;
-				break;
-			case IN_MEMORY:
-				deployTarget = true;
-				break;
-			default:
-				throw new IllegalStateException("Unknown channel type");
-			}
-
-			if (deployTarget) {
-
-				final int numberOfOutputChannels = outputGate.getNumberOfEdges();
-				for (int j = 0; j < numberOfOutputChannels; ++j) {
-					final ExecutionEdge outputChannel = outputGate.getEdge(j);
-					final ExecutionVertex connectedVertex = outputChannel.getInputGate().getVertex();
-					findVerticesToBeDeployed(connectedVertex, verticesToBeDeployed, alreadyVisited);
-				}
+		
+		// synchronize globally for instance changes
+		synchronized (this.lock) {
+			// check we do not already use this instance
+			if (!this.allInstances.add(instance)) {
+				throw new IllegalArgumentException("The instance is already contained.");
 			}
+			
+			// add it to the available resources and let potential waiters know
+			this.instancesWithAvailableResources.add(instance);
+			this.lock.notifyAll();
 		}
 	}
-
-	/**
-	 * Collects all execution vertices with the state ASSIGNED starting from the given start vertex and
-	 * deploys them on the assigned {@link org.apache.flink.runtime.instance.AllocatedResource} objects.
-	 * 
-	 * @param startVertex
-	 *        the execution vertex to start the deployment from
-	 */
-	public void deployAssignedVertices(final ExecutionVertex startVertex) {
-
-		final JobID jobID = startVertex.getExecutionGraph().getJobID();
-
-		final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
-		final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
-
-		findVerticesToBeDeployed(startVertex, verticesToBeDeployed, alreadyVisited);
-
-		if (!verticesToBeDeployed.isEmpty()) {
-
-			final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
-				.entrySet()
-				.iterator();
-
-			while (it2.hasNext()) {
-
-				final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
-				this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
-			}
+	
+	@Override
+	public void instanceDied(Instance instance) {
+		if (instance == null) {
+			throw new IllegalArgumentException();
 		}
-	}
-
-	/**
-	 * Collects all execution vertices with the state ASSIGNED from the given pipeline and deploys them on the assigned
-	 * {@link org.apache.flink.runtime.instance.AllocatedResource} objects.
-	 * 
-	 * @param pipeline
-	 *        the execution pipeline to be deployed
-	 */
-	public void deployAssignedPipeline(final ExecutionPipeline pipeline) {
-
-		final JobID jobID = null;
-
-		final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
-		final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
-
-		final Iterator<ExecutionVertex> it = pipeline.iterator();
-		while (it.hasNext()) {
-			findVerticesToBeDeployed(it.next(), verticesToBeDeployed, alreadyVisited);
+		
+		instance.markDead();
+		
+		// we only remove the instance from the pools, we do not care about the 
+		synchronized (this.lock) {
+			// the instance must not be available anywhere any more
+			this.allInstances.remove(instance);
+			this.instancesWithAvailableResources.remove(instance);
 		}
-
-		if (!verticesToBeDeployed.isEmpty()) {
-
-			final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
-				.entrySet()
-				.iterator();
-
-			while (it2.hasNext()) {
-
-				final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
-				this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
-			}
+	}
+	
+	public int getNumberOfAvailableInstances() {
+		synchronized (lock) {
+			return allInstances.size();
 		}
 	}
-
+	
+	// --------------------------------------------------------------------------------------------
+	//  Scheduling
+	// --------------------------------------------------------------------------------------------
+	
 	/**
-	 * Collects all execution vertices with the state ASSIGNED starting from the given collection of start vertices and
-	 * deploys them on the assigned {@link org.apache.flink.runtime.instance.AllocatedResource} objects.
+	 * Schedules the given unit to an available resource. This call blocks if no resource
+	 * is currently available
 	 * 
-	 * @param startVertices
-	 *        the collection of execution vertices to start the deployment from
+	 * @param unit The unit to be scheduled.
 	 */
-	public void deployAssignedVertices(final Collection<ExecutionVertex> startVertices) {
-
-		JobID jobID = null;
-
-		final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
-		final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
-
-		for (final ExecutionVertex startVertex : startVertices) {
-
-			if (jobID == null) {
-				jobID = startVertex.getExecutionGraph().getJobID();
-			}
-
-			findVerticesToBeDeployed(startVertex, verticesToBeDeployed, alreadyVisited);
+	protected void scheduleNextUnit(ScheduledUnit unit) {
+		if (unit == null) {
+			throw new IllegalArgumentException("Unit to schedule must not be null.");
 		}
-
-		if (!verticesToBeDeployed.isEmpty()) {
-
-			final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
-				.entrySet()
-				.iterator();
-
-			while (it2.hasNext()) {
-
-				final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
-				this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
+		
+		// see if the resource Id has already an assigned resource
+		AllocatedSlot resource = this.allocatedSlots.get(unit.getSharedResourceId());
+		
+		if (resource == null) {
+			// not yet allocated. find a slot to schedule to
+			try {
+				resource = getResourceToScheduleUnit(unit, this.rejectIfNoResourceAvailable);
+				if (resource == null) {
+					throw new RuntimeException("Error: The resource to schedule to is null.");
+				}
+			}
+			catch (Exception e) {
+				// we cannot go on, the task needs to know what to do now.
+				unit.getTaskVertex().handleException(e);
+				return;
 			}
 		}
+		
+		resource.runTask(unit.getTaskVertex());
 	}
-
+	
 	/**
-	 * Collects all execution vertices with the state ASSIGNED starting from the input vertices of the current execution
-	 * stage and deploys them on the assigned {@link org.apache.flink.runtime.instance.AllocatedResource} objects.
+	 * Acquires a resource to schedule the given unit to. This call may block if no
+	 * resource is currently available, or throw an exception, based on the given flag.
 	 * 
-	 * @param executionGraph
-	 *        the execution graph to collect the vertices from
+	 * @param unit The unit to find a resource for.
+	 * @param exceptionOnNoAvailability If true, this call should not block is no resource is available,
+	 *                                  but throw a {@link NoResourceAvailableException}.
+	 * @return The resource to schedule the execution of the given unit on.
+	 * 
+	 * @throws NoResourceAvailableException If the {@code exceptionOnNoAvailability} flag is true and the scheduler
+	 *                                      has currently no resources available.
 	 */
-	public void deployAssignedInputVertices(final ExecutionGraph executionGraph) {
-
-		final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
-		final ExecutionStage executionStage = executionGraph.getCurrentExecutionStage();
-
-		final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
-
-		for (int i = 0; i < executionStage.getNumberOfStageMembers(); ++i) {
-
-			final ExecutionGroupVertex startVertex = executionStage.getStageMember(i);
-			if (!startVertex.isInputVertex()) {
-				continue;
-			}
-
-			for (int j = 0; j < startVertex.getCurrentNumberOfGroupMembers(); ++j) {
-				final ExecutionVertex vertex = startVertex.getGroupMember(j);
-				findVerticesToBeDeployed(vertex, verticesToBeDeployed, alreadyVisited);
-			}
-		}
-
-		if (!verticesToBeDeployed.isEmpty()) {
-
-			final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
-				.entrySet()
-				.iterator();
-
-			while (it2.hasNext()) {
-
-				final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
-				this.deploymentManager.deploy(executionGraph.getJobID(), entry.getKey(), entry.getValue());
-			}
-		}
-	}
-
-
-	@Override
-	public void resourcesAllocated(final JobID jobID, final List<AllocatedResource> allocatedResources) {
-
-		if (allocatedResources == null) {
-			LOG.error("Resource to lock is null!");
-			return;
-		}
-
-		for (final AllocatedResource allocatedResource : allocatedResources) {
-			if (allocatedResource.getInstance() instanceof DummyInstance) {
-				LOG.debug("Available instance is of type DummyInstance!");
-				return;
-			}
-		}
-
-		final ExecutionGraph eg = getExecutionGraphByID(jobID);
-
-		if (eg == null) {
-			/*
-			 * The job have have been canceled in the meantime, in this case
-			 * we release the instance immediately.
-			 */
-			try {
-				for (final AllocatedResource allocatedResource : allocatedResources) {
-					getInstanceManager().releaseAllocatedResource(allocatedResource);
-				}
-			} catch (InstanceException e) {
-				LOG.error("InstanceException while releasing allocated ressources.", e);
-			}
-			return;
-		}
-
-		final Runnable command = new Runnable() {
-
-			/**
-			 * {@inheritDoc}
-			 */
-			@Override
-			public void run() {
-
-				final ExecutionStage stage = eg.getCurrentExecutionStage();
-
-				synchronized (stage) {
-
-					for (final AllocatedResource allocatedResource : allocatedResources) {
-
-						AllocatedResource resourceToBeReplaced = null;
-						// Important: only look for instances to be replaced in the current stage
-						final Iterator<ExecutionGroupVertex> groupIterator = new ExecutionGroupVertexIterator(eg, true,
-							stage.getStageNumber());
-						while (groupIterator.hasNext()) {
-
-							final ExecutionGroupVertex groupVertex = groupIterator.next();
-							for (int i = 0; i < groupVertex.getCurrentNumberOfGroupMembers(); ++i) {
-
-								final ExecutionVertex vertex = groupVertex.getGroupMember(i);
-
-								if (vertex.getExecutionState() == ExecutionState.SCHEDULED
-									&& vertex.getAllocatedResource() != null) {
-										resourceToBeReplaced = vertex.getAllocatedResource();
-										break;
-								}
-							}
-
-							if (resourceToBeReplaced != null) {
-								break;
+	protected AllocatedSlot getResourceToScheduleUnit(ScheduledUnit unit, boolean exceptionOnNoAvailability) 
+		throws NoResourceAvailableException
+	{
+		AllocatedSlot slot = null;
+		
+		while (true) {
+			synchronized (this.lock) {
+				Instance instanceToUse = this.instancesWithAvailableResources.poll();
+				
+				// if there is nothing, throw an exception or wait, depending on what is configured
+				if (instanceToUse == null) {
+					if (exceptionOnNoAvailability) {
+						throw new NoResourceAvailableException(unit);
+					}
+					else {
+						try {
+							do {
+								this.lock.wait(2000);
 							}
+							while (!shutdown.get() && 
+									(instanceToUse = this.instancesWithAvailableResources.poll()) == null);
 						}
-
-						// For some reason, we don't need this instance
-						if (resourceToBeReplaced == null) {
-							LOG.error("Instance " + allocatedResource.getInstance() + " is not required for job"
-								+ eg.getJobID());
-							try {
-								getInstanceManager().releaseAllocatedResource(allocatedResource);
-							} catch (InstanceException e) {
-								LOG.error("InstanceException while releasing allocated ressources.", e);
-							}
-							return;
-						}
-
-						// Replace the selected instance
-						final Iterator<ExecutionVertex> it = resourceToBeReplaced.assignedVertices();
-						while (it.hasNext()) {
-							final ExecutionVertex vertex = it.next();
-							vertex.setAllocatedResource(allocatedResource);
-							vertex.updateExecutionState(ExecutionState.ASSIGNED);
+						catch (InterruptedException e) {
+							throw new NoResourceAvailableException("The scheduler was interrupted.");
 						}
 					}
 				}
-
-				// Deploy the assigned vertices
-				deployAssignedInputVertices(eg);
-
-			}
-
-		};
-
-		eg.executeCommand(command);
-	}
-
-	/**
-	 * Checks if the given {@link AllocatedResource} is still required for the
-	 * execution of the given execution graph. If the resource is no longer
-	 * assigned to a vertex that is either currently running or about to run
-	 * the given resource is returned to the instance manager for deallocation.
-	 * 
-	 * @param executionGraph
-	 *        the execution graph the provided resource has been used for so far
-	 * @param allocatedResource
-	 *        the allocated resource to check the assignment for
-	 */
-	public void checkAndReleaseAllocatedResource(final ExecutionGraph executionGraph,
-			final AllocatedResource allocatedResource) {
-
-		if (allocatedResource == null) {
-			LOG.error("Resource to lock is null!");
-			return;
-		}
-
-		if (allocatedResource.getInstance() instanceof DummyInstance) {
-			LOG.debug("Available instance is of type DummyInstance!");
-			return;
-		}
-
-		boolean resourceCanBeReleased = true;
-		final Iterator<ExecutionVertex> it = allocatedResource.assignedVertices();
-		while (it.hasNext()) {
-			final ExecutionVertex vertex = it.next();
-			final ExecutionState state = vertex.getExecutionState();
-
-			if (state != ExecutionState.CREATED && state != ExecutionState.FINISHED
-				&& state != ExecutionState.FAILED && state != ExecutionState.CANCELED) {
-
-				resourceCanBeReleased = false;
-				break;
+				
+				// at this point, we have an instance. request a slot from the instance
+				try {
+					slot = instanceToUse.allocateSlot(unit.getJobId(), unit.getSharedResourceId());
+				}
+				catch (InstanceDiedException e) {
+					// the instance died it has not yet been propagated to this scheduler
+					// remove the instance from the set of available instances
+					this.allInstances.remove(instanceToUse);
+				}
+				
+				// if the instance has further available slots, re-add it to the set of available
+				// resources.
+				// if it does not, but asynchronously 
+				if (instanceToUse.hasResourcesAvailable()) {
+					this.instancesWithAvailableResources.add(instanceToUse);
+				}
+				
+				if (slot != null) {
+					return slot;
+				}
+				// else fall through the loop
 			}
 		}
-
-		if (resourceCanBeReleased) {
-
-			LOG.info("Releasing instance " + allocatedResource.getInstance());
+	}
+	
+	protected void runSchedulerLoop() {
+		// while the scheduler is alive
+		while (!shutdown.get()) {
+			
+			// get the next unit
+			ScheduledUnit next = null;
 			try {
-				getInstanceManager().releaseAllocatedResource(allocatedResource);
-			} catch (InstanceException e) {
-				LOG.error(StringUtils.stringifyException(e));
+				next = this.taskQueue.take();
 			}
-		}
-	}
-
-	DeploymentManager getDeploymentManager() {
-		return this.deploymentManager;
-	}
-
-	protected void replayCheckpointsFromPreviousStage(final ExecutionGraph executionGraph) {
-
-		final int currentStageIndex = executionGraph.getIndexOfCurrentExecutionStage();
-		final ExecutionStage previousStage = executionGraph.getStage(currentStageIndex - 1);
-
-		final List<ExecutionVertex> verticesToBeReplayed = new ArrayList<ExecutionVertex>();
-
-		for (int i = 0; i < previousStage.getNumberOfOutputExecutionVertices(); ++i) {
-
-			final ExecutionVertex vertex = previousStage.getOutputExecutionVertex(i);
-			vertex.updateExecutionState(ExecutionState.ASSIGNED);
-			verticesToBeReplayed.add(vertex);
-		}
-
-		deployAssignedVertices(verticesToBeReplayed);
-	}
-
-	/**
-	 * Returns a map of vertices to be restarted once they have switched to their <code>CANCELED</code> state.
-	 * 
-	 * @return the map of vertices to be restarted
-	 */
-	Map<ExecutionVertexID, ExecutionVertex> getVerticesToBeRestarted() {
-
-		return this.verticesToBeRestarted;
-	}
-
-
-	@Override
-	public void allocatedResourcesDied(final JobID jobID, final List<AllocatedResource> allocatedResources) {
-
-		final ExecutionGraph eg = getExecutionGraphByID(jobID);
-
-		if (eg == null) {
-			LOG.error("Cannot find execution graph for job with ID " + jobID);
-			return;
-		}
-
-		final Runnable command = new Runnable() {
-
-			/**
-			 * {@inheritDoc}
-			 */
-			@Override
-			public void run() {
-
-				synchronized (eg) {
-
-					for (final AllocatedResource allocatedResource : allocatedResources) {
-
-						LOG.info("Resource " + allocatedResource.getInstance().getName() + " for Job " + jobID
-							+ " died.");
-
-						final ExecutionGraph executionGraph = getExecutionGraphByID(jobID);
-
-						if (executionGraph == null) {
-							LOG.error("Cannot find execution graph for job " + jobID);
-							return;
-						}
-
-						Iterator<ExecutionVertex> vertexIter = allocatedResource.assignedVertices();
-
-						// Assign vertices back to a dummy resource.
-						final DummyInstance dummyInstance = DummyInstance.createDummyInstance();
-						final AllocatedResource dummyResource = new AllocatedResource(dummyInstance,
-								new AllocationID());
-
-						while (vertexIter.hasNext()) {
-							final ExecutionVertex vertex = vertexIter.next();
-							vertex.setAllocatedResource(dummyResource);
-						}
-
-						final String failureMessage = allocatedResource.getInstance().getName() + " died";
-
-						vertexIter = allocatedResource.assignedVertices();
-
-						while (vertexIter.hasNext()) {
-							final ExecutionVertex vertex = vertexIter.next();
-							final ExecutionState state = vertex.getExecutionState();
-
-							switch (state) {
-							case ASSIGNED:
-							case READY:
-							case STARTING:
-							case RUNNING:
-							case FINISHING:
-
-							vertex.updateExecutionState(ExecutionState.FAILED, failureMessage);
-
-							break;
-						default:
-							}
-					}
-
-					// TODO: Fix this
-					/*
-					 * try {
-					 * requestInstances(this.executionVertex.getGroupVertex().getExecutionStage());
-					 * } catch (InstanceException e) {
-					 * e.printStackTrace();
-					 * // TODO: Cancel the entire job in this case
-					 * }
-					 */
+			catch (InterruptedException e) {
+				if (shutdown.get()) {
+					return;
+				} else {
+					LOG.error("Scheduling loop was interrupted.");
 				}
 			}
-
-			final InternalJobStatus js = eg.getJobStatus();
-			if (js != InternalJobStatus.FAILING && js != InternalJobStatus.FAILED) {
-
-				// TODO: Fix this
-				// deployAssignedVertices(eg);
-
-				final ExecutionStage stage = eg.getCurrentExecutionStage();
-
-				try {
-					requestInstances(stage);
-				} catch (InstanceException e) {
-					e.printStackTrace();
-					// TODO: Cancel the entire job in this case
+			
+			// if we see this special unit, it means we are done
+			if (next == TERMINATION_SIGNAL) {
+				return;
+			}
+			
+			// deploy the next scheduling unit
+			try {
+				scheduleNextUnit(next);
+			}
+			catch (Throwable t) {
+				// ignore the errors in the presence of a shutdown
+				if (!shutdown.get()) {
+					if (t instanceof Error) {
+						throw (Error) t;
+					} else if (t instanceof RuntimeException) {
+						throw (RuntimeException) t;
+					} else {
+						throw new RuntimeException("Critical error in scheduler thread.", t);
+					}
 				}
 			}
 		}
-		};
-
-		eg.executeCommand(command);
 	}
+	
+	private static final ScheduledUnit TERMINATION_SIGNAL = new ScheduledUnit();
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/InstanceFillDegreeComparator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/InstanceFillDegreeComparator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/InstanceFillDegreeComparator.java
new file mode 100644
index 0000000..47aadf9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/InstanceFillDegreeComparator.java
@@ -0,0 +1,31 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import java.util.Comparator;
+
+import org.apache.flink.runtime.instance.Instance;
+
+public class InstanceFillDegreeComparator implements Comparator<Instance> {
+
+	@Override
+	public int compare(Instance o1, Instance o2) {
+		float fraction1 = o1.getNumberOfAvailableSlots() / (float) o1.getTotalNumberOfSlots();
+		float fraction2 = o2.getNumberOfAvailableSlots() / (float) o2.getTotalNumberOfSlots();
+		
+		return fraction1 < fraction2 ? -1 : 1;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/LifoSetQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/LifoSetQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/LifoSetQueue.java
new file mode 100644
index 0000000..d8bb852
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/LifoSetQueue.java
@@ -0,0 +1,110 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import java.util.AbstractQueue;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+
+/**
+ * A queue that returns elements in LIFO order and simultaneously maintains set characteristics, i.e., elements
+ * that are already in the queue may not be added another time.
+ *
+ * @param <E> The type of the elements in the queue.
+ */
+public class LifoSetQueue<E> extends AbstractQueue<E> implements Queue<E> {
+
+	private final ArrayList<E> lifo = new ArrayList<E>();
+	
+	private final HashSet<E> set = new HashSet<E>();
+	
+	@Override
+	public boolean offer(E e) {
+		if (e == null) {
+			throw new NullPointerException();
+		}
+		
+		if (set.add(e)) {
+			lifo.add(e);
+		}
+		
+		return true;
+	}
+
+	@Override
+	public E poll() {
+		int size = lifo.size();
+		if (size > 0) {
+			E element = lifo.remove(size-1);
+			set.remove(element);
+			return element;
+		} else {
+			return null;
+		}
+	}
+
+	@Override
+	public E peek() {
+		int size = lifo.size();
+		if (size > 0) {
+			return lifo.get(size-1);
+		} else {
+			return null;
+		}
+	}
+
+	@Override
+	public Iterator<E> iterator() {
+		return new Iterator<E>() {
+			
+			private int currentPos = lifo.size() - 1;
+			private int posToRemove = -1;
+
+			@Override
+			public boolean hasNext() {
+				return currentPos >= 0;
+			}
+
+			@Override
+			public E next() {
+				if (!hasNext()) {
+					throw new NoSuchElementException();
+				} else {
+					posToRemove = currentPos;
+					return lifo.get(currentPos--);
+				}
+			}
+
+			@Override
+			public void remove() {
+				if (posToRemove == -1) {
+					throw new NoSuchElementException();
+				} else {
+					E element = lifo.remove(posToRemove);
+					set.remove(element);
+				}
+			}
+		};
+	}
+
+	@Override
+	public int size() {
+		return lifo.size();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
new file mode 100644
index 0000000..2b76d26
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
@@ -0,0 +1,33 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+public class NoResourceAvailableException extends Exception {
+
+	private static final long serialVersionUID = -2249953165298717803L;
+
+	public NoResourceAvailableException() {
+		super();
+	}
+	
+	public NoResourceAvailableException(ScheduledUnit unit) {
+		super("No resource available to schedule unit " + unit);
+	}
+
+	public NoResourceAvailableException(String message) {
+		super(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ResourceId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ResourceId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ResourceId.java
new file mode 100644
index 0000000..58833f4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ResourceId.java
@@ -0,0 +1,20 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.AbstractID;
+
+public class ResourceId extends AbstractID {}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
new file mode 100644
index 0000000..8caf64a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
+import org.apache.flink.runtime.jobgraph.JobID;
+
+public class ScheduledUnit {
+	
+	private final JobID jobId;
+	
+	private final ExecutionVertex2 taskVertex;
+	
+	private final ResourceId resourceId;
+	
+	
+	public ScheduledUnit(JobID jobId, ExecutionVertex2 taskVertex) {
+		this(jobId, taskVertex, new ResourceId());
+	}
+	
+	public ScheduledUnit(JobID jobId, ExecutionVertex2 taskVertex, ResourceId resourceId) {
+		if (jobId == null || taskVertex == null || resourceId == null) {
+			throw new NullPointerException();
+		}
+		
+		this.jobId = jobId;
+		this.taskVertex = taskVertex;
+		this.resourceId = resourceId;
+	}
+	
+	ScheduledUnit() {
+		this.jobId = null;
+		this.taskVertex = null;
+		this.resourceId = null;
+	}
+
+	
+	public JobID getJobId() {
+		return jobId;
+	}
+	
+	public ExecutionVertex2 getTaskVertex() {
+		return taskVertex;
+	}
+	
+	public ResourceId getSharedResourceId() {
+		return resourceId;
+	}
+	
+	@Override
+	public String toString() {
+		return "(job=" + jobId + ", resourceId=" + resourceId + ", vertex=" + taskVertex + ')';
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulingException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulingException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulingException.java
deleted file mode 100644
index 7187cfb..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulingException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-/**
- * Scheduling exceptions are thrown to indicate problems or errors
- * related to Nephele's scheduler.
- * 
- */
-public class SchedulingException extends Exception {
-
-	/**
-	 * Generated serial version UID.
-	 */
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Constructs a new scheduling exception object.
-	 * 
-	 * @param msg
-	 *        the error message of the exception
-	 */
-	public SchedulingException(String msg) {
-		super(msg);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulingStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulingStrategy.java
new file mode 100644
index 0000000..3bca46b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulingStrategy.java
@@ -0,0 +1,33 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+/**
+ * The scheduling strategy describes how scheduler distributes tasks across resources.
+ */
+public enum SchedulingStrategy {
+	
+	/**
+	 * This strategy tries to keep all machines utilized roughly the same.
+	 */
+	SPREAD_OUT_TASKS,
+	
+	/**
+	 * This strategy will put as many tasks on one each machine as possible, before putting
+	 * tasks on the next machine.
+	 */
+	CLUSTER_TASKS
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/JobManagerProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/JobManagerProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/JobManagerProtocol.java
index 85ede42..9c46ac5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/JobManagerProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/JobManagerProtocol.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.protocols;
 
 import java.io.IOException;
@@ -24,41 +23,34 @@ import java.io.IOException;
 import org.apache.flink.core.protocols.VersionedProtocol;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.taskmanager.transferenvelope.RegisterTaskManagerResult;
-import org.apache.flink.runtime.types.IntegerRecord;
 
 /**
  * The job manager protocol is implemented by the job manager and offers functionality
  * to task managers which allows them to register themselves, send heart beat messages
  * or to report the results of a task execution.
- * 
  */
 public interface JobManagerProtocol extends VersionedProtocol {
 
 	/**
 	 * Sends a heart beat to the job manager.
 	 * 
-	 * @param instanceConnectionInfo
-	 *        the information the job manager requires to connect to the instance's task manager
-	 * @throws IOException
-	 *         thrown if an error occurs during this remote procedure call
+	 * @param taskManagerId The ID identifying the task manager.
+	 * @throws IOException Thrown if an error occurs during this remote procedure call.
 	 */
-	void sendHeartbeat(InstanceConnectionInfo instanceConnectionInfo)
-			throws IOException;
+	boolean sendHeartbeat(InstanceID taskManagerId) throws IOException;
 
 	/**
 	 * Registers a task manager at the JobManager.
 	 *
 	 * @param instanceConnectionInfo the information the job manager requires to connect to the instance's task manager
 	 * @param hardwareDescription a hardware description with details on the instance's compute resources.
-	 * @throws IOException
+	 * @param numberOfSlots The number of task slots that the TaskManager provides.
 	 *
-	 * @return whether the task manager was successfully registered
+	 * @return The ID under which the TaskManager is registered. Null, if the JobManager does not register the TaskManager.
 	 */
-	RegisterTaskManagerResult registerTaskManager(InstanceConnectionInfo instanceConnectionInfo,
-						HardwareDescription hardwareDescription, IntegerRecord numberOfSlots)
-			throws IOException;
+	InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription, int numberOfSlots) throws IOException;
 
 	/**
 	 * Reports an update of a task's execution state to the job manager.