You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/27 17:01:20 UTC

[2/7] flink git commit: [FLINK-1952] [jobmanager] Rework and fix slot sharing scheduler

http://git-wip-us.apache.org/repos/asf/flink/blob/017b4f0f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 9b2000b..579a6b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -36,27 +36,39 @@ import akka.dispatch.Futures;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
+
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.util.AbstractID;
+import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.instance.SharedSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceDiedException;
 import org.apache.flink.runtime.instance.InstanceListener;
 import org.apache.flink.util.ExceptionUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
- * The scheduler is responsible for distributing the ready-to-run tasks and assigning them to instances and
- * slots.
+ * The scheduler is responsible for distributing the ready-to-run tasks among instances and slots.
+ * 
+ * <p>The scheduler supports two scheduling modes:</p>
+ * <ul>
+ *     <li>Immediate scheduling: A request for a task slot immediately returns a task slot, if one is
+ *         available, or throws a {@link NoResourceAvailableException}</li>.
+ *     <li>Queued Scheduling: A request for a task slot is queued and returns a future that will be
+ *         fulfilled as soon as a slot becomes available.</li>
+ * </ul>
  */
 public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 
-	static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
+	/** Scheduler-wide logger */
+	private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
 	
 	
+	/** All modifications to the scheduler structures are performed under a global scheduler lock */
 	private final Object globalLock = new Object();
 	
 	/** All instances that the scheduler can deploy to */
@@ -71,20 +83,24 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 	/** All tasks pending to be scheduled */
 	private final Queue<QueuedTask> taskQueue = new ArrayDeque<QueuedTask>();
 	
-	private final BlockingQueue<Instance> newlyAvailableInstances;
 	
+	private final BlockingQueue<Instance> newlyAvailableInstances = new LinkedBlockingQueue<Instance>();
 	
+	/** The number of slot allocations that had no location preference */
 	private int unconstrainedAssignments;
-	
+
+	/** The number of slot allocations where locality could be respected */
 	private int localizedAssignments;
-	
+
+	/** The number of slot allocations where locality could not be respected */
 	private int nonLocalizedAssignments;
-	
-	
-	public Scheduler() {
-		this.newlyAvailableInstances = new LinkedBlockingQueue<Instance>();
-	}
-	
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new scheduler.
+	 */
+	public Scheduler() {}
 	
 	/**
 	 * Shuts the scheduler down. After shut down no more tasks can be added to the scheduler.
@@ -102,9 +118,9 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 		}
 	}
 
-	// --------------------------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
 	//  Scheduling
-	// --------------------------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
 	
 	public SimpleSlot scheduleImmediately(ScheduledUnit task) throws NoResourceAvailableException {
 		Object ret = scheduleTask(task, false);
@@ -130,13 +146,12 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 	}
 	
 	/**
-	 * Returns either an {@link org.apache.flink.runtime.instance.SimpleSlot}, or an {@link SlotAllocationFuture}.
+	 * Returns either a {@link org.apache.flink.runtime.instance.SimpleSlot}, or a {@link SlotAllocationFuture}.
 	 */
 	private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {
 		if (task == null) {
-			throw new IllegalArgumentException();
+			throw new NullPointerException();
 		}
-		
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Scheduling task " + task);
 		}
@@ -148,14 +163,16 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 									preferredLocations != null && preferredLocations.iterator().hasNext();
 	
 		synchronized (globalLock) {
-		
-			// 1)  === If the task has a slot sharing group, schedule with shared slots ===
 			
 			SlotSharingGroup sharingUnit = task.getSlotSharingGroup();
+			
 			if (sharingUnit != null) {
+
+				// 1)  === If the task has a slot sharing group, schedule with shared slots ===
 				
 				if (queueIfNoResource) {
-					throw new IllegalArgumentException("A task with a vertex sharing group was scheduled in a queued fashion.");
+					throw new IllegalArgumentException(
+							"A task with a vertex sharing group was scheduled in a queued fashion.");
 				}
 				
 				final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
@@ -163,12 +180,12 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 				
 				// sanity check that we do not use an externally forced location and a co-location constraint together
 				if (constraint != null && forceExternalLocation) {
-					throw new IllegalArgumentException("The scheduling cannot be contrained simultaneously by a "
-							+ "co-location constriaint and an external location constraint.");
+					throw new IllegalArgumentException("The scheduling cannot be constrained simultaneously by a "
+							+ "co-location constraint and an external location constraint.");
 				}
 				
 				// get a slot from the group, if the group has one for us (and can fulfill the constraint)
-				SimpleSlot slotFromGroup;
+				final SimpleSlot slotFromGroup;
 				if (constraint == null) {
 					slotFromGroup = assignment.getSlotForTask(vertex);
 				}
@@ -182,74 +199,87 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 				// the following needs to make sure any allocated slot is released in case of an error
 				try {
 					
-					// check whether the slot from the group is already what we want
-					if (slotFromGroup != null) {
-						// local (or unconstrained in the current group)
-						if (slotFromGroup.getLocality() != Locality.NON_LOCAL) {
-							updateLocalityCounters(slotFromGroup.getLocality());
-							return slotFromGroup;
+					// check whether the slot from the group is already what we want.
+					// any slot that is local, or where the assignment was unconstrained is good!
+					if (slotFromGroup != null && slotFromGroup.getLocality() != Locality.NON_LOCAL) {
+						
+						// if this is the first slot for the co-location constraint, we lock
+						// the location, because we are quite happy with the slot
+						if (constraint != null && !constraint.isAssigned()) {
+							constraint.lockLocation();
 						}
+						
+						updateLocalityCounters(slotFromGroup.getLocality(), vertex, slotFromGroup.getInstance());
+						return slotFromGroup;
 					}
 					
-					final Iterable<Instance> locations = (constraint == null || constraint.isUnassigned()) ?
-							vertex.getPreferredLocations() : Collections.singleton(constraint.getLocation());
+					// the group did not have a local slot for us. see if we can one (or a better one)
+					
+					// our location preference is either determined by the location constraint, or by the
+					// vertex's preferred locations
+					final Iterable<Instance> locations;
+					final boolean localOnly;
+					if (constraint != null && constraint.isAssigned()) {
+						locations = Collections.singleton(constraint.getLocation());
+						localOnly = true;
+					}
+					else {
+						locations = vertex.getPreferredLocations();
+						localOnly = forceExternalLocation;
+					}
 					
-					// get a new slot, since we could not place it into the group, or we could not place it locally
-					newSlot = getFreeSubSlotForTask(vertex, locations, assignment, constraint, forceExternalLocation);
+					newSlot = getNewSlotForSharingGroup(vertex, locations, assignment, constraint, localOnly);
 
 					if (newSlot == null) {
 						if (slotFromGroup == null) {
-							// both null
-							if (constraint == null || constraint.isUnassigned()) {
-								if (forceExternalLocation) {
-									// could not satisfy the external location constraint
-									String hosts = getHostnamesFromInstances(preferredLocations);
-									throw new NoResourceAvailableException("Could not schedule task " + vertex
-											+ " to any of the required hosts: " + hosts);
-								}
-								else {
-									// simply nothing is available
-									throw new NoResourceAvailableException(task, getNumberOfAvailableInstances(),
-											getTotalNumberOfSlots(), getNumberOfAvailableSlots());
-								}
+							// both null, which means there is nothing available at all
+							
+							if (constraint != null && constraint.isAssigned()) {
+								// nothing is available on the node where the co-location constraint forces us to
+								throw new NoResourceAvailableException("Could not allocate a slot on instance " +
+										constraint.getLocation() + ", as required by the co-location constraint.");
+							}
+							else if (forceExternalLocation) {
+								// could not satisfy the external location constraint
+								String hosts = getHostnamesFromInstances(preferredLocations);
+								throw new NoResourceAvailableException("Could not schedule task " + vertex
+										+ " to any of the required hosts: " + hosts);
 							}
 							else {
-								// nothing is available on the node where the co-location constraint pushes us
-								throw new NoResourceAvailableException("Could not allocate a slot on instance " + 
-											constraint.getLocation() + ", as required by the co-location constraint.");
+								// simply nothing is available
+								throw new NoResourceAvailableException(task, getNumberOfAvailableInstances(),
+										getTotalNumberOfSlots(), getNumberOfAvailableSlots());
 							}
-						} else {
-							// got a non-local from the group, and no new one
+						}
+						else {
+							// got a non-local from the group, and no new one, so we use the non-local
+							// slot from the sharing group
 							toUse = slotFromGroup;
 						}
 					}
-					else if (slotFromGroup == null || newSlot.getLocality() == Locality.LOCAL) {
-						// new slot is preferable
+					else if (slotFromGroup == null || !slotFromGroup.isAlive() || newSlot.getLocality() == Locality.LOCAL) {
+						// if there is no slot from the group, or the new slot is local,
+						// then we use the new slot
 						if (slotFromGroup != null) {
 							slotFromGroup.releaseSlot();
 						}
-						
 						toUse = newSlot;
 					}
 					else {
-						// both are available and usable. neither is local
+						// both are available and usable. neither is local. in that case, we may
+						// as well use the slot from the sharing group, to minimize the number of
+						// instances that the job occupies
 						newSlot.releaseSlot();
 						toUse = slotFromGroup;
 					}
-					
-					// assign to the co-location hint, if we have one and it is unassigned
-					// if it was assigned before and the new one is not local, it is a fail
-					if (constraint != null) {
-						if (constraint.isUnassigned() || toUse.getLocality() == Locality.LOCAL) {
-							constraint.setSharedSlot(toUse.getParent());
-						} else {
-							// the fail
-							throw new NoResourceAvailableException("Could not allocate a slot on instance " + 
-									constraint.getLocation() + ", as required by the co-location constraint.");
-						}
+
+					// if this is the first slot for the co-location constraint, we lock
+					// the location, because we are going to use that slot
+					if (constraint != null && !constraint.isAssigned()) {
+						constraint.lockLocation();
 					}
 					
-					updateLocalityCounters(toUse.getLocality());
+					updateLocalityCounters(toUse.getLocality(), vertex, toUse.getInstance());
 				}
 				catch (NoResourceAvailableException e) {
 					throw e;
@@ -266,11 +296,14 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 				}
 
 				return toUse;
-			} else {
+			}
+			else {
+				
 				// 2) === schedule without hints and sharing ===
+				
 				SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation);
 				if (slot != null) {
-					updateLocalityCounters(slot.getLocality());
+					updateLocalityCounters(slot.getLocality(), vertex, slot.getInstance());
 					return slot;
 				}
 				else {
@@ -286,30 +319,14 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 								+ " to any of the required hosts: " + hosts);
 					}
 					else {
-						throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots(), getNumberOfAvailableSlots());
+						throw new NoResourceAvailableException(getNumberOfAvailableInstances(),
+								getTotalNumberOfSlots(), getNumberOfAvailableSlots());
 					}
 				}
 			}
 		}
 	}
 	
-	private String getHostnamesFromInstances(Iterable<Instance> instances) {
-		StringBuilder bld = new StringBuilder();
-		
-		for (Instance i : instances) {
-			bld.append(i.getInstanceConnectionInfo().getHostname());
-			bld.append(", ");
-		}
-		
-		if (bld.length() == 0) {
-			return "";
-		}
-		else {
-			bld.setLength(bld.length() - 2);
-			return bld.toString();
-		}
-	}
-	
 	/**
 	 * Gets a suitable instance to schedule the vertex execution to.
 	 * <p>
@@ -318,8 +335,9 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 	 * @param vertex The task to run. 
 	 * @return The instance to run the vertex on, it {@code null}, if no instance is available.
 	 */
-	protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex, Iterable<Instance> requestedLocations, boolean localOnly) {
-		
+	protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex,
+											Iterable<Instance> requestedLocations,
+											boolean localOnly) {
 		// we need potentially to loop multiple times, because there may be false positives
 		// in the set-with-available-instances
 		while (true) {
@@ -332,18 +350,8 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 			Instance instanceToUse = instanceLocalityPair.getLeft();
 			Locality locality = instanceLocalityPair.getRight();
 
-			if (LOG.isDebugEnabled()){
-				if(locality == Locality.LOCAL){
-					LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
-				}else if(locality == Locality.NON_LOCAL){
-					LOG.debug("Non-local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
-				}else if(locality == Locality.UNCONSTRAINED) {
-					LOG.debug("Unconstrained assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
-				}
-			}
-
 			try {
-				SimpleSlot slot = instanceToUse.allocateSimpleSlot(vertex.getJobId(), vertex.getJobvertexId());
+				SimpleSlot slot = instanceToUse.allocateSimpleSlot(vertex.getJobId());
 				
 				// if the instance has further available slots, re-add it to the set of available resources.
 				if (instanceToUse.hasResourcesAvailable()) {
@@ -365,54 +373,64 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 		}
 	}
 
-	protected SimpleSlot getFreeSubSlotForTask(ExecutionVertex vertex,
-											Iterable<Instance> requestedLocations,
-											SlotSharingGroupAssignment groupAssignment,
-											CoLocationConstraint constraint,
-											boolean localOnly) {
+	/**
+	 * Tries to allocate a new slot for a vertex that is part of a slot sharing group. If one
+	 * of the instances has a slot available, the method will allocate it as a shared slot, add that
+	 * shared slot to the sharing group, and allocate a simple slot from that shared slot.
+	 * 
+	 * <p>This method will try to allocate a slot from one of the local instances, and fall back to
+	 * non-local instances, if permitted.</p>
+	 * 
+	 * @param vertex The vertex to allocate the slot for.
+	 * @param requestedLocations The locations that are considered local. May be null or empty, if the
+	 *                           vertex has no location preferences.
+	 * @param groupAssignment The slot sharing group of the vertex. Mandatory parameter.
+	 * @param constraint The co-location constraint of the vertex. May be null.
+	 * @param localOnly Flag to indicate if non-local choices are acceptable.
+	 * 
+	 * @return A sub-slot for the given vertex, or {@code null}, if no slot is available.
+	 */
+	protected SimpleSlot getNewSlotForSharingGroup(ExecutionVertex vertex,
+													Iterable<Instance> requestedLocations,
+													SlotSharingGroupAssignment groupAssignment,
+													CoLocationConstraint constraint,
+													boolean localOnly)
+	{
 		// we need potentially to loop multiple times, because there may be false positives
 		// in the set-with-available-instances
 		while (true) {
 			Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly);
-
+			
 			if (instanceLocalityPair == null) {
+				// nothing is available
 				return null;
 			}
 
-			Instance instanceToUse = instanceLocalityPair.getLeft();
-			Locality locality = instanceLocalityPair.getRight();
-
-			if (LOG.isDebugEnabled()) {
-				if (locality == Locality.LOCAL) {
-					LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
-				} else if(locality == Locality.NON_LOCAL) {
-					LOG.debug("Non-local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
-				} else if(locality == Locality.UNCONSTRAINED) {
-					LOG.debug("Unconstrained assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
-				}
-			}
+			final Instance instanceToUse = instanceLocalityPair.getLeft();
+			final Locality locality = instanceLocalityPair.getRight();
 
 			try {
-				AbstractID groupID = constraint == null ? vertex.getJobvertexId() : constraint.getGroupId();
-
-				// root SharedSlot
-				SharedSlot sharedSlot = instanceToUse.allocateSharedSlot(vertex.getJobId(), groupAssignment, groupID);
+				JobVertexID groupID = vertex.getJobvertexId();
+				
+				// allocate a shared slot from the instance
+				SharedSlot sharedSlot = instanceToUse.allocateSharedSlot(vertex.getJobId(), groupAssignment);
 
 				// if the instance has further available slots, re-add it to the set of available resources.
 				if (instanceToUse.hasResourcesAvailable()) {
 					this.instancesWithAvailableResources.add(instanceToUse);
 				}
 
-				if(sharedSlot != null){
-					// If constraint != null, then slot nested in a SharedSlot nested in sharedSlot
-					// If constraint == null, then slot nested in sharedSlot
-					SimpleSlot slot = groupAssignment.addSharedSlotAndAllocateSubSlot(sharedSlot,
-							locality, groupID, constraint);
+				if (sharedSlot != null) {
+					// add the shared slot to the assignment group and allocate a sub-slot
+					SimpleSlot slot = constraint == null ?
+							groupAssignment.addSharedSlotAndAllocateSubSlot(sharedSlot, locality, groupID) :
+							groupAssignment.addSharedSlotAndAllocateSubSlot(sharedSlot, locality, constraint);
 
-					if(slot != null){
+					if (slot != null) {
 						return slot;
-					} else {
-						// release shared slot
+					}
+					else {
+						// could not add and allocate the sub-slot, so release shared slot
 						sharedSlot.releaseSlot();
 					}
 				}
@@ -428,58 +446,56 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 	}
 
 	/**
-	 * NOTE: This method is not thread-safe, it needs to be synchronized by the caller.
-	 *
 	 * Tries to find a requested instance. If no such instance is available it will return a non-
 	 * local instance. If no such instance exists (all slots occupied), then return null.
+	 * 
+	 * <p><b>NOTE:</b> This method is not thread-safe, it needs to be synchronized by the caller.</p>
 	 *
-	 * @param requestedLocations
+	 * @param requestedLocations The list of preferred instances. May be null or empty, which indicates that
+	 *                           no locality preference exists.   
+	 * @param localOnly Flag to indicate whether only one of the exact local instances can be chosen.  
 	 */
 	private Pair<Instance, Locality> findInstance(Iterable<Instance> requestedLocations, boolean localOnly){
 		
-		if (this.instancesWithAvailableResources.isEmpty()) {
-			// check if the asynchronous calls did not yet return the queues
+		// drain the queue of newly available instances
+		while (this.newlyAvailableInstances.size() > 0) {
 			Instance queuedInstance = this.newlyAvailableInstances.poll();
-			if (queuedInstance == null) {
-				return null;
-			} else {
+			if (queuedInstance != null) {
 				this.instancesWithAvailableResources.add(queuedInstance);
 			}
 		}
+		
+		// if nothing is available at all, return null
+		if (this.instancesWithAvailableResources.isEmpty()) {
+			return null;
+		}
 
 		Iterator<Instance> locations = requestedLocations == null ? null : requestedLocations.iterator();
 
-		Instance instanceToUse = null;
-		Locality locality = Locality.UNCONSTRAINED;
-
 		if (locations != null && locations.hasNext()) {
 			// we have a locality preference
 
 			while (locations.hasNext()) {
 				Instance location = locations.next();
-
 				if (location != null && this.instancesWithAvailableResources.remove(location)) {
-					instanceToUse = location;
-					locality = Locality.LOCAL;
-					break;
+					return new ImmutablePair<Instance, Locality>(location, Locality.LOCAL);
 				}
 			}
-
-			if (instanceToUse == null) {
-				if (localOnly) {
-					return null;
-				}
-				else {
-					instanceToUse = this.instancesWithAvailableResources.poll();
-					locality = Locality.NON_LOCAL;
-				}
+			
+			// no local instance available
+			if (localOnly) {
+				return null;
+			}
+			else {
+				Instance instanceToUse = this.instancesWithAvailableResources.poll();
+				return new ImmutablePair<Instance, Locality>(instanceToUse, Locality.NON_LOCAL);
 			}
 		}
 		else {
-			instanceToUse = this.instancesWithAvailableResources.poll();
+			// no location preference, so use some instance
+			Instance instanceToUse = this.instancesWithAvailableResources.poll();
+			return new ImmutablePair<Instance, Locality>(instanceToUse, Locality.UNCONSTRAINED);
 		}
-
-		return new ImmutablePair<Instance, Locality>(instanceToUse, locality);
 	}
 	
 	@Override
@@ -524,7 +540,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 				ExecutionVertex vertex = task.getTaskToExecute().getVertex();
 				
 				try {
-					SimpleSlot newSlot = instance.allocateSimpleSlot(vertex.getJobId(), vertex.getJobvertexId());
+					SimpleSlot newSlot = instance.allocateSimpleSlot(vertex.getJobId());
 					if (newSlot != null) {
 						
 						// success, remove from the task queue and notify the future
@@ -554,7 +570,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 		}
 	}
 	
-	private void updateLocalityCounters(Locality locality) {
+	private void updateLocalityCounters(Locality locality, ExecutionVertex vertex, Instance location) {
 		switch (locality) {
 		case UNCONSTRAINED:
 			this.unconstrainedAssignments++;
@@ -568,12 +584,22 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 		default:
 			throw new RuntimeException(locality.name());
 		}
+		
+		if (LOG.isDebugEnabled()) {
+			switch (locality) {
+				case UNCONSTRAINED:
+					LOG.debug("Unconstrained assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + location);
+					break;
+				case LOCAL:
+					LOG.debug("Local assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + location);
+					break;
+				case NON_LOCAL:
+					LOG.debug("Non-local assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + location);
+					break;
+			}
+		}
 	}
 	
-	
-	
-	
-	
 	// --------------------------------------------------------------------------------------------
 	//  Instance Availability
 	// --------------------------------------------------------------------------------------------
@@ -646,7 +672,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 		if (instance == null) {
 			throw new NullPointerException();
 		}
-		
+
 		allInstances.remove(instance);
 		instancesWithAvailableResources.remove(instance);
 		
@@ -755,7 +781,36 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 			}
 		}
 	}
+
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private static String getHostnamesFromInstances(Iterable<Instance> instances) {
+		StringBuilder bld = new StringBuilder();
+
+		boolean successive = false;
+		for (Instance i : instances) {
+			if (successive) {
+				bld.append(", ");
+			} else {
+				successive = true;
+			}
+			bld.append(i.getInstanceConnectionInfo().getHostname());
+		}
+
+		return bld.toString();
+	}
 	
+	// ------------------------------------------------------------------------
+	//  Nested members
+	// ------------------------------------------------------------------------
+
+	/**
+	 * An entry in the queue of schedule requests. Contains the task to be scheduled and
+	 * the future that tracks the completion.
+	 */
 	private static final class QueuedTask {
 		
 		private final ScheduledUnit task;

http://git-wip-us.apache.org/repos/asf/flink/blob/017b4f0f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
index dcde6b2..0fa1362 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.Set;
 import java.util.TreeSet;
 
+import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 /**
@@ -47,7 +48,8 @@ public class SlotSharingGroup implements java.io.Serializable {
 			this.ids.add(id);
 		}
 	}
-	
+
+	// --------------------------------------------------------------------------------------------
 	
 	public void addVertexToGroup(JobVertexID id) {
 		this.ids.add(id);
@@ -79,7 +81,9 @@ public class SlotSharingGroup implements java.io.Serializable {
 		this.taskAssignment = null;
 	}
 	
-	// --------------------------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
 	
 	@Override
 	public String toString() {

http://git-wip-us.apache.org/repos/asf/flink/blob/017b4f0f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
deleted file mode 100644
index 3c292f7..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
+++ /dev/null
@@ -1,485 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.flink.util.AbstractID;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.SharedSlot;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.Slot;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.slf4j.Logger;
-
-
-public class SlotSharingGroupAssignment {
-
-	private static final Logger LOG = Scheduler.LOG;
-
-	private final Object lock = new Object();
-
-	/** All slots currently allocated to this sharing group */
-	private final Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>();
-
-	/** The slots available per vertex type (jid), keyed by instance, to make them locatable */
-	private final Map<AbstractID, Map<Instance, List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<AbstractID, Map<Instance, List<SharedSlot>>>();
-
-	// --------------------------------------------------------------------------------------------
-
-	public SimpleSlot addSharedSlotAndAllocateSubSlot(SharedSlot sharedSlot, Locality locality,
-													AbstractID groupId, CoLocationConstraint constraint) {
-
-		final Instance location = sharedSlot.getInstance();
-
-		synchronized (lock) {
-			// add to the total bookkeeping
-			allSlots.add(sharedSlot);
-
-			SimpleSlot subSlot = null;
-
-			if (constraint == null) {
-				// allocate us a sub slot to return
-				subSlot = sharedSlot.allocateSubSlot(groupId);
-			} else {
-				// we need a colocation slot --> a SimpleSlot nested in a SharedSlot to host other colocated tasks
-				SharedSlot constraintGroupSlot = sharedSlot.allocateSharedSlot(groupId);
-
-				if(constraintGroupSlot == null) {
-					subSlot = null;
-				} else {
-					subSlot = constraintGroupSlot.allocateSubSlot(null);
-
-					// could not create a sub slot --> release constraintGroupSlot
-					if(subSlot == null){
-						constraintGroupSlot.releaseSlot();
-					}
-				}
-			}
-
-			// if sharedSlot is dead, but this should never happen since we just created a fresh
-			// SharedSlot in the caller
-			if(subSlot == null) {
-				LOG.warn("Could not allocate a sub slot.");
-
-				return null;
-			} else {
-				// preserve the locality information
-				subSlot.setLocality(locality);
-
-				boolean entryForNewJidExists = false;
-
-				// let the other vertex types know about this one as well
-				for (Map.Entry<AbstractID, Map<Instance, List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) {
-
-					if (entry.getKey().equals(groupId)) {
-						entryForNewJidExists = true;
-						continue;
-					}
-
-					Map<Instance, List<SharedSlot>> available = entry.getValue();
-					putIntoMultiMap(available, location, sharedSlot);
-				}
-
-				// make sure an empty entry exists for this group, if no other entry exists
-				if (!entryForNewJidExists) {
-					availableSlotsPerJid.put(groupId, new LinkedHashMap<Instance, List<SharedSlot>>());
-				}
-
-				return subSlot;
-			}
-		}
-	}
-
-	/**
-	 * Gets a slot suitable for the given task vertex. This method will prefer slots that are local
-	 * (with respect to {@link ExecutionVertex#getPreferredLocations()}), but will return non local
-	 * slots if no local slot is available. The method returns null, when no slot is available for the
-	 * given JobVertexID at all.
-	 *
-	 * @param vertex
-	 *
-	 * @return A task vertex for a task with the given JobVertexID, or null, if none is available.
-	 */
-	public SimpleSlot getSlotForTask(ExecutionVertex vertex) {
-		synchronized (lock) {
-			Pair<SharedSlot, Locality> p = getSlotForTaskInternal(vertex.getJobvertexId(), vertex, vertex.getPreferredLocations(), false);
-
-			if (p != null) {
-				SharedSlot ss = p.getLeft();
-				SimpleSlot slot = ss.allocateSubSlot(vertex.getJobvertexId());
-				slot.setLocality(p.getRight());
-				return slot;
-			}
-			else {
-				return null;
-			}
-		}
-
-	}
-
-	public SimpleSlot getSlotForTask(ExecutionVertex vertex, CoLocationConstraint constraint) {
-
-		synchronized (lock) {
-			SharedSlot shared = constraint.getSharedSlot();
-
-			if (shared != null && !shared.isDead()) {
-				// initialized and set
-				SimpleSlot subslot = shared.allocateSubSlot(null);
-				subslot.setLocality(Locality.LOCAL);
-				return subslot;
-			}
-			else if (shared == null) {
-				// not initialized, grab a new slot. preferred locations are defined by the vertex
-				// we only associate the slot with the constraint, if it was a local match
-				Pair<SharedSlot, Locality> p = getSlotForTaskInternal(constraint.getGroupId(), vertex, vertex.getPreferredLocations(), false);
-
-				if (p == null) {
-					return null;
-				} else {
-					shared = p.getLeft();
-					Locality l = p.getRight();
-
-					// we need a colocation slot --> SimpleSlot nested in a SharedSlot to host other colocated tasks
-					SharedSlot constraintGroupSlot = shared.allocateSharedSlot(constraint.getGroupId());
-					// Depth=3 => groupID==null
-					SimpleSlot sub = constraintGroupSlot.allocateSubSlot(null);
-					sub.setLocality(l);
-
-					if (l != Locality.NON_LOCAL) {
-						constraint.setSharedSlot(constraintGroupSlot);
-					}
-					return sub;
-				}
-			}
-			else {
-				// disposed. get a new slot on the same instance
-				Instance location = shared.getInstance();
-				Pair<SharedSlot, Locality> p = getSlotForTaskInternal(constraint.getGroupId(), vertex, Collections.singleton(location), true);
-
-				if (p == null) {
-					return null;
-				} else {
-					shared = p.getLeft();
-					// we need colocation slot --> SimpleSlot nested in a SharedSlot to host other colocated tasks
-					SharedSlot constraintGroupSlot = shared.allocateSharedSlot(constraint.getGroupId());
-					constraint.setSharedSlot(constraintGroupSlot);
-					SimpleSlot subSlot = constraintGroupSlot.allocateSubSlot(null);
-					subSlot.setLocality(Locality.LOCAL);
-					return subSlot;
-				}
-			}
-		}
-	}
-
-	/**
-	 * NOTE: This method is not synchronized by itself, needs to be synchronized externally.
-	 *
-	 * @return An allocated sub slot, or {@code null}, if no slot is available.
-	 */
-	private Pair<SharedSlot, Locality> getSlotForTaskInternal(AbstractID groupId, ExecutionVertex vertex, Iterable<Instance> preferredLocations, boolean localOnly) {
-		Map<Instance, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId);
-
-		if (allSlots.isEmpty()) {
-			return null;
-		}
-
-		// get the available slots for the group
-		if (slotsForGroup == null) {
-			// no task is yet scheduled for that group, so all slots are available
-			slotsForGroup = new LinkedHashMap<Instance, List<SharedSlot>>();
-			availableSlotsPerJid.put(groupId, slotsForGroup);
-
-			for (SharedSlot availableSlot : allSlots) {
-				putIntoMultiMap(slotsForGroup, availableSlot.getInstance(), availableSlot);
-			}
-		}
-		else if (slotsForGroup.isEmpty()) {
-			return null;
-		}
-
-		// check whether we can schedule the task to a preferred location
-		boolean didNotGetPreferred = false;
-
-		if (preferredLocations != null) {
-			for (Instance location : preferredLocations) {
-
-				// set the flag that we failed a preferred location. If one will be found,
-				// we return early anyways and skip the flag evaluation
-				didNotGetPreferred = true;
-
-				SharedSlot slot = removeFromMultiMap(slotsForGroup, location);
-				if (slot != null && !slot.isDead()) {
-					if (LOG.isDebugEnabled()) {
-						LOG.debug("Local assignment in shared group : " + vertex + " --> " + slot);
-					}
-
-					return new ImmutablePair<SharedSlot, Locality>(slot, Locality.LOCAL);
-				}
-			}
-		}
-
-		// if we want only local assignments, exit now with a "not found" result
-		if (didNotGetPreferred && localOnly) {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("No local assignment in shared possible for " + vertex);
-			}
-			return null;
-		}
-
-		// schedule the task to any available location
-		SharedSlot slot = pollFromMultiMap(slotsForGroup);
-		if (slot != null && !slot.isDead()) {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug((didNotGetPreferred ? "Non-local" : "Unconstrained") + " assignment in shared group : " + vertex + " --> " + slot);
-			}
-
-			return new ImmutablePair<SharedSlot, Locality>(slot, didNotGetPreferred ? Locality.NON_LOCAL : Locality.UNCONSTRAINED);
-		}
-		else {
-			return null;
-		}
-	}
-
-	/**
-	 * Removes the shared slot from the assignment group.
-	 *
-	 * @param sharedSlot
-	 */
-	private void removeSharedSlot(SharedSlot sharedSlot){
-		if (!allSlots.contains(sharedSlot)) {
-			throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup before.");
-		}
-
-		allSlots.remove(sharedSlot);
-
-		Instance location = sharedSlot.getInstance();
-
-		for(Map.Entry<AbstractID, Map<Instance, List<SharedSlot>>> mapEntry: availableSlotsPerJid.entrySet()){
-			Map<Instance, List<SharedSlot>> map = mapEntry.getValue();
-
-			List<SharedSlot> list = map.get(location);
-
-			if(list == null || !list.remove(sharedSlot)){
-				throw new IllegalStateException("Bug: SharedSlot was not available to another vertex type that it was not allocated for before.");
-			}
-
-			if(list.isEmpty()){
-				map.remove(location);
-			}
-		}
-
-		sharedSlot.markCancelled();
-
-		returnAllocatedSlot(sharedSlot);
-	}
-
-	/**
-	 * Releases the shared slot from the assignment group.
-	 * @param sharedSlot The SharedSlot to be released
-	 */
-	public void releaseSharedSlot(SharedSlot sharedSlot){
-		synchronized (lock) {
-			Set<Slot> subSlots = sharedSlot.getSubSlots();
-
-			for(Slot subSlot: subSlots) {
-
-				subSlot.markDisposed();
-
-				if(subSlot instanceof SharedSlot){
-					releaseSharedSlot((SharedSlot) subSlot);
-				}else if(subSlot instanceof SimpleSlot){
-					releaseSimpleSlot((SimpleSlot) subSlot);
-				}
-			}
-
-			subSlots.clear();
-
-			returnSlot(sharedSlot);
-		}
-	}
-
-	/**
-	 * Releases the simple slot from the assignment group.
-	 * @param simpleSlot The SimpleSlot to be released
-	 */
-	public void releaseSimpleSlot(SimpleSlot simpleSlot){
-		synchronized (lock) {
-			simpleSlot.cancel();
-
-			returnSlot(simpleSlot);
-		}
-
-	}
-
-	/**
-	 * Removes the given slot from the assignment group. If the slot is a root object, then it has
-	 * to be a SharedSlot and it is removed from the availableSlotsPerJid field and the slot is
-	 * returned to the instance. If the slot is a sub slot of the root slot, then this sub slot
-	 * is marked available again for tasks of the same group. Otherwise, the slot is simply removed
-	 * from its parent if it is not already marked as disposed. If a slot is already marked to be
-	 * disposed, then the releasing was called from a parent slot which will take care of the
-	 * disposal.
-	 *
-	 * IMPORTANT: The method is not synchronized. The caller is responsible for that.
-	 *
-	 * @param slot The slot to be returned.
-	 */
-	private void returnSlot(Slot slot){
-		// each slot can only be returned once, if a slot is returned then it should no longer be used --> markDead
-		if(slot.markDead()) {
-			// slot is a root slot
-			if(slot.getParent() == null){
-				// only SharedSlots are allowed to be root slots in a SlotSharingGroupAssignment
-				if(slot instanceof SharedSlot){
-					removeSharedSlot((SharedSlot) slot);
-				} else {
-					throw new IllegalStateException("Simple slot cannot be returned from SlotSharingGroupAssignment.");
-				}
-			} else {
-				AbstractID groupID = slot.getGroupID();
-				SharedSlot parent = slot.getParent();
-
-				// Only colocation constraint slots (SimpleSlot nested in a SharedSlot nested in a SharedSlot) have a groupID==null
-				// One can also say, all nested slots deeper than 2 have a groupID==null
-				if(groupID != null){
-					if (!allSlots.contains(parent)) {
-						throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup before.");
-					}
-
-					// make the shared slot available to tasks within the group it available to
-					Map<Instance, List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(groupID);
-
-					// sanity check
-					if (slotsForJid == null) {
-						throw new IllegalStateException("Trying to return a slot for group " + groupID +
-								" when available slots indicated that all slots were available.");
-					}
-
-					putIntoMultiMap(slotsForJid, parent.getInstance(), parent);
-				}
-
-				// if no one else takes care of disposal, then remove the slot from the parent
-				if(slot.markDisposed()) {
-					if (slot.getParent().freeSubSlot(slot) == 0) {
-						releaseSharedSlot(slot.getParent());
-					}
-				}
-			}
-		}
-	}
-
-	private void returnAllocatedSlot(SharedSlot slot){
-		slot.getInstance().returnAllocatedSlot(slot);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  State
-	// --------------------------------------------------------------------------------------------
-	
-	public int getNumberOfSlots() {
-		return allSlots.size();
-	}
-	
-	public int getNumberOfAvailableSlotsForJid(JobVertexID jid) {
-		synchronized (lock) {
-			Map<Instance, List<SharedSlot>> available = availableSlotsPerJid.get(jid);
-			
-			if (available != null) {
-				Set<SharedSlot> set = new HashSet<SharedSlot>();
-				
-				for (List<SharedSlot> list : available.values()) {
-					for (SharedSlot slot : list) {
-						set.add(slot);
-					}
-				}
-				
-				return set.size();
-			} else {
-				return allSlots.size();
-			}
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	
-	
-	// --------------------------------------------------------------------------------------------
-	//  Utilities
-	// --------------------------------------------------------------------------------------------
-
-	private static final void putIntoMultiMap(Map<Instance, List<SharedSlot>> map, Instance location, SharedSlot slot) {
-		List<SharedSlot> slotsForInstance = map.get(location);
-		if (slotsForInstance == null) {
-			slotsForInstance = new ArrayList<SharedSlot>();
-			map.put(location, slotsForInstance);
-		}
-		slotsForInstance.add(slot);
-	}
-	
-	private static final SharedSlot removeFromMultiMap(Map<Instance, List<SharedSlot>> map, Instance location) {
-		List<SharedSlot> slotsForLocation = map.get(location);
-		
-		if (slotsForLocation == null) {
-			return null;
-		}
-		else {
-			SharedSlot slot = slotsForLocation.remove(slotsForLocation.size() - 1);
-			if (slotsForLocation.isEmpty()) {
-				map.remove(location);
-			}
-			
-			return slot;
-		}
-	}
-	
-	private static final SharedSlot pollFromMultiMap(Map<Instance, List<SharedSlot>> map) {
-		Iterator<Map.Entry<Instance, List<SharedSlot>>> iter = map.entrySet().iterator();
-		
-		while (iter.hasNext()) {
-			List<SharedSlot> slots = iter.next().getValue();
-			
-			if (slots.isEmpty()) {
-				iter.remove();
-			}
-			else if (slots.size() == 1) {
-				SharedSlot slot = slots.remove(0);
-				iter.remove();
-				return slot;
-			}
-			else {
-				return slots.remove(slots.size() - 1);
-			}
-		}
-		
-		return null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/017b4f0f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 42c3f84..a53c318 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -348,4 +348,4 @@ public class ExecutionGraphDeploymentTest {
 			throw new Exception();
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/017b4f0f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 1e78ac5..733ad11 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -60,31 +60,31 @@ public class ExecutionStateProgressTest {
 		try {
 			final JobID jid = new JobID();
 			final JobVertexID vid = new JobVertexID();
-			
+
 			AbstractJobVertex ajv = new AbstractJobVertex("TestVertex", vid);
 			ajv.setParallelism(3);
 			ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
-			
+
 			ExecutionGraph graph = new ExecutionGraph(jid, "test job", new Configuration(),
 					AkkaUtils.getDefaultTimeout());
 			graph.attachJobGraph(Arrays.asList(ajv));
-			
+
 			setGraphStatus(graph, JobStatus.RUNNING);
-			
+
 			ExecutionJobVertex ejv = graph.getJobVertex(vid);
-			
+
 			// mock resources and mock taskmanager
 			ActorRef taskManager = system.actorOf(Props.create(SimpleAcknowledgingTaskManager.class));
 			for (ExecutionVertex ee : ejv.getTaskVertices()) {
 				SimpleSlot slot = getInstance(taskManager).allocateSimpleSlot(jid);
 				ee.deployToSlot(slot);
 			}
-			
+
 			// finish all
 			for (ExecutionVertex ee : ejv.getTaskVertices()) {
 				ee.executionFinished();
 			}
-			
+
 			assertTrue(ejv.isInFinalState());
 			assertEquals(JobStatus.FINISHED, graph.getState());
 		}
@@ -93,4 +93,4 @@ public class ExecutionStateProgressTest {
 			fail(e.getMessage());
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/017b4f0f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 757976d..eb0aab4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -45,13 +45,14 @@ import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 @SuppressWarnings("serial")
 public class ExecutionVertexCancelTest {
-	
+
 	private static ActorSystem system;
 
 	@BeforeClass
@@ -68,24 +69,24 @@ public class ExecutionVertexCancelTest {
 	// --------------------------------------------------------------------------------------------
 	//  Canceling in different states
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Test
 	public void testCancelFromCreated() {
 		try {
 			final JobVertexID jid = new JobVertexID();
 			final ExecutionJobVertex ejv = getExecutionVertex(jid);
-			
+
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
-			
+
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
-			
+
 			vertex.cancel();
-			
+
 			assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
-			
+
 			assertNull(vertex.getFailureCause());
-			
+
 			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
 			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
 			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
@@ -95,25 +96,25 @@ public class ExecutionVertexCancelTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testCancelFromScheduled() {
 		try {
 			final JobVertexID jid = new JobVertexID();
 			final ExecutionJobVertex ejv = getExecutionVertex(jid);
-			
+
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
-			
+
 			setVertexState(vertex, ExecutionState.SCHEDULED);
 			assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
-			
+
 			vertex.cancel();
-			
+
 			assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
-			
+
 			assertNull(vertex.getFailureCause());
-			
+
 			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
 			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
 			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
@@ -123,7 +124,7 @@ public class ExecutionVertexCancelTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testCancelConcurrentlyToDeploying_CallsNotOvertaking() {
 		new JavaTestKit(system){{
@@ -148,7 +149,7 @@ public class ExecutionVertexCancelTest {
 						new TaskOperationResult(execId, false))));
 
 				Instance instance = getInstance(taskManager);
-			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 				vertex.deployToSlot(slot);
 
@@ -187,15 +188,17 @@ public class ExecutionVertexCancelTest {
 				assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
 				assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
 				assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
-			}catch(Exception e){
+			}
+			catch(Exception e) {
 				e.printStackTrace();
 				fail(e.getMessage());
-			}finally{
+			}
+			finally {
 				TestingUtils.setGlobalExecutionContext();
 			}
 		}};
 	}
-	
+
 	@Test
 	public void testCancelConcurrentlyToDeploying_CallsOvertaking() {
 		new JavaTestKit(system){
@@ -272,7 +275,7 @@ public class ExecutionVertexCancelTest {
 			}
 		};
 	}
-	
+
 	@Test
 	public void testCancelFromRunning() {
 		new JavaTestKit(system) {
@@ -291,7 +294,7 @@ public class ExecutionVertexCancelTest {
 									TaskOperationResult(execId, true))));
 
 					Instance instance = getInstance(taskManager);
-			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+					SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 					setVertexState(vertex, ExecutionState.RUNNING);
 					setVertexResource(vertex, slot);
@@ -319,7 +322,7 @@ public class ExecutionVertexCancelTest {
 			}
 		};
 	}
-	
+
 	@Test
 	public void testRepeatedCancelFromRunning() {
 		new JavaTestKit(system) {
@@ -339,7 +342,7 @@ public class ExecutionVertexCancelTest {
 							TaskOperationResult(execId, true))));
 
 					Instance instance = getInstance(taskManager);
-			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+					SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 					setVertexState(vertex, ExecutionState.RUNNING);
 					setVertexResource(vertex, slot);
@@ -375,7 +378,7 @@ public class ExecutionVertexCancelTest {
 			}
 		};
 	}
-	
+
 	@Test
 	public void testCancelFromRunningDidNotFindTask() {
 		// this may happen when the task finished or failed while the call was in progress
@@ -395,7 +398,7 @@ public class ExecutionVertexCancelTest {
 							TaskOperationResult(execId, false))));
 
 					Instance instance = getInstance(taskManager);
-			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+					SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 					setVertexState(vertex, ExecutionState.RUNNING);
 					setVertexResource(vertex, slot);
@@ -419,7 +422,7 @@ public class ExecutionVertexCancelTest {
 			}
 		};
 	}
-	
+
 	@Test
 	public void testCancelCallFails() {
 		new JavaTestKit(system) {
@@ -436,7 +439,7 @@ public class ExecutionVertexCancelTest {
 							CancelSequenceTaskManagerCreator()));
 
 					Instance instance = getInstance(taskManager);
-			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+					SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 					setVertexState(vertex, ExecutionState.RUNNING);
 					setVertexResource(vertex, slot);
@@ -463,7 +466,7 @@ public class ExecutionVertexCancelTest {
 			}
 		};
 	}
-	
+
 	@Test
 	public void testSendCancelAndReceiveFail() {
 		new JavaTestKit(system) {
@@ -482,7 +485,7 @@ public class ExecutionVertexCancelTest {
 							)));
 
 					Instance instance = getInstance(taskManager);
-			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+					SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 					setVertexState(vertex, ExecutionState.RUNNING);
 					setVertexResource(vertex, slot);
@@ -507,11 +510,11 @@ public class ExecutionVertexCancelTest {
 			}
 		};
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Actions after a vertex has been canceled or while canceling
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Test
 	public void testScheduleOrDeployAfterCancel() {
 		try {
@@ -521,26 +524,26 @@ public class ExecutionVertexCancelTest {
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
 			setVertexState(vertex, ExecutionState.CANCELED);
-			
+
 			assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
-			
+
 			// 1)
 			// scheduling after being created should be tolerated (no exception) because
 			// it can occur as the result of races
 			{
 				Scheduler scheduler = mock(Scheduler.class);
 				vertex.scheduleForExecution(scheduler, false);
-				
+
 				assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 			}
-			
+
 			// 2)
 			// deploying after canceling from CREATED needs to raise an exception, because
 			// the scheduler (or any caller) needs to know that the slot should be released
 			try {
 				Instance instance = getInstance(ActorRef.noSender());
 				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-				
+
 				vertex.deployToSlot(slot);
 				fail("Method should throw an exception");
 			}
@@ -553,60 +556,62 @@ public class ExecutionVertexCancelTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testActionsWhileCancelling() {
-		
+
 		try {
 			final JobVertexID jid = new JobVertexID();
 			final ExecutionJobVertex ejv = getExecutionVertex(jid);
-			
+
 			// scheduling while canceling is an illegal state transition
 			try {
 				ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 						AkkaUtils.getDefaultTimeout());
 				setVertexState(vertex, ExecutionState.CANCELING);
-				
+
 				Scheduler scheduler = mock(Scheduler.class);
 				vertex.scheduleForExecution(scheduler, false);
 			}
 			catch (Exception e) {
 				fail("should not throw an exception");
 			}
-			
-			
+
+
 			// deploying while in canceling state is illegal (should immediately go to canceled)
 			try {
 				ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 						AkkaUtils.getDefaultTimeout());
 				setVertexState(vertex, ExecutionState.CANCELING);
-				
+
 				Instance instance = getInstance(ActorRef.noSender());
 				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-				
+
 				vertex.deployToSlot(slot);
 				fail("Method should throw an exception");
 			}
-			catch (IllegalStateException e) {}
-			
-			
+			catch (IllegalStateException e) {
+				// that is what we expect
+			}
+
+
 			// fail while canceling
 			{
 				ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 						AkkaUtils.getDefaultTimeout());
-				
+
 				Instance instance = getInstance(ActorRef.noSender());
 				SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-				
+
 				setVertexResource(vertex, slot);
 				setVertexState(vertex, ExecutionState.CANCELING);
-				
+
 				Exception failureCause = new Exception("test exception");
-				
+
 				vertex.fail(failureCause);
 				assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
 				assertEquals(failureCause, vertex.getFailureCause());
-				
+
 				assertTrue(slot.isReleased());
 			}
 		}
@@ -651,4 +656,4 @@ public class ExecutionVertexCancelTest {
 			}
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/017b4f0f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index c08ae01..eadf328 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -28,19 +28,21 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class ExecutionVertexDeploymentTest {
+
 	private static ActorSystem system;
 
 	@BeforeClass
@@ -61,41 +63,43 @@ public class ExecutionVertexDeploymentTest {
 			TestingUtils.setCallingThreadDispatcher(system);
 			ActorRef tm = TestActorRef.create(system, Props.create(SimpleAcknowledgingTaskManager
 					.class));
-			
+
+			final ExecutionJobVertex ejv = getExecutionVertex(jid);
+
 			// mock taskmanager to simply accept the call
 			Instance instance = getInstance(tm);
+			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
-			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-			
-			final ExecutionJobVertex ejv = getExecutionVertex(jid);
-			
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
-			
+
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			vertex.deployToSlot(slot);
 			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
-			
+
 			// no repeated scheduling
 			try {
 				vertex.deployToSlot(slot);
 				fail("Scheduled from wrong state");
 			}
-			catch (IllegalStateException e) {}
-			
+			catch (IllegalStateException e) {
+				// as expected
+			}
+
 			assertNull(vertex.getFailureCause());
-			
+
 			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
 			assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
-		}finally{
+		}
+		finally {
 			TestingUtils.setGlobalExecutionContext();
 		}
 	}
-	
+
 	@Test
 	public void testDeployWithSynchronousAnswer() {
 		try {
@@ -105,31 +109,32 @@ public class ExecutionVertexDeploymentTest {
 
 			final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
 					Props.create(SimpleAcknowledgingTaskManager.class));
-			
-			final Instance instance = getInstance(simpleTaskManager);
-			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
-			
 			final ExecutionJobVertex ejv = getExecutionVertex(jid);
-			
+
+			final Instance instance = getInstance(simpleTaskManager);
+			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
-			
+
 			vertex.deployToSlot(slot);
-			
+
 			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
-			
+
 			// no repeated scheduling
 			try {
 				vertex.deployToSlot(slot);
 				fail("Scheduled from wrong state");
 			}
-			catch (IllegalStateException e) {}
-			
+			catch (IllegalStateException e) {
+				// as expected
+			}
+
 			assertNull(vertex.getFailureCause());
-			
+
 			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
 			assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
 			assertTrue(vertex.getStateTimestamp(ExecutionState.RUNNING) == 0);
@@ -137,48 +142,51 @@ public class ExecutionVertexDeploymentTest {
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
-		}finally{
+		}
+		finally {
 			TestingUtils.setGlobalExecutionContext();
 		}
 	}
-	
+
 	@Test
 	public void testDeployWithAsynchronousAnswer() {
 		try {
 			final JobVertexID jid = new JobVertexID();
+			final ExecutionJobVertex ejv = getExecutionVertex(jid);
+
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+					AkkaUtils.getDefaultTimeout());
 
 			final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
 					Props.create(SimpleAcknowledgingTaskManager.class));
-			
-			final Instance instance = getInstance(simpleTaskManager);
 
-			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-			
-			final ExecutionJobVertex ejv = getExecutionVertex(jid);
-			
-			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
-					AkkaUtils.getDefaultTimeout());
+			final Instance instance = getInstance(simpleTaskManager);
+			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
-			
+
 			vertex.deployToSlot(slot);
-			
+
 			// no repeated scheduling
 			try {
 				vertex.deployToSlot(slot);
 				fail("Scheduled from wrong state");
 			}
-			catch (IllegalStateException e) {}
+			catch (IllegalStateException e) {
+				// as expected
+			}
 
 			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
-			
+
 			// no repeated scheduling
 			try {
 				vertex.deployToSlot(slot);
 				fail("Scheduled from wrong state");
 			}
-			catch (IllegalStateException e) {}
-			
+			catch (IllegalStateException e) {
+				// as expected
+			}
+
 			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
 			assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
 			assertTrue(vertex.getStateTimestamp(ExecutionState.RUNNING) == 0);
@@ -188,33 +196,32 @@ public class ExecutionVertexDeploymentTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testDeployFailedSynchronous() {
 		try {
 			TestingUtils.setCallingThreadDispatcher(system);
 
 			final JobVertexID jid = new JobVertexID();
+			final ExecutionJobVertex ejv = getExecutionVertex(jid);
+
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+					AkkaUtils.getDefaultTimeout());
 
 			final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
 					Props.create(SimpleFailingTaskManager.class));
-			
+
 			final Instance instance = getInstance(simpleTaskManager);
-			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-			
-			final ExecutionJobVertex ejv = getExecutionVertex(jid);
-			
-			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
-					AkkaUtils.getDefaultTimeout());
+			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
-			
+
 			vertex.deployToSlot(slot);
-			
+
 			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
 			assertNotNull(vertex.getFailureCause());
 			assertTrue(vertex.getFailureCause().getMessage().contains(ERROR_MESSAGE));
-			
+
 			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
 			assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
 			assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
@@ -222,31 +229,30 @@ public class ExecutionVertexDeploymentTest {
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
-		}finally{
+		}
+		finally {
 			TestingUtils.setGlobalExecutionContext();
 		}
 	}
-	
+
 	@Test
 	public void testDeployFailedAsynchronously() {
 		try {
 			final JobVertexID jid = new JobVertexID();
+			final ExecutionJobVertex ejv = getExecutionVertex(jid);
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+					AkkaUtils.getDefaultTimeout());
 
 			final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
 					Props.create(SimpleFailingTaskManager.class));
-			
-			final Instance instance = getInstance(simpleTaskManager);
-			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-
 
-			final ExecutionJobVertex ejv = getExecutionVertex(jid);
-			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
-					AkkaUtils.getDefaultTimeout());
+			final Instance instance = getInstance(simpleTaskManager);
+			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
-			
+
 			vertex.deployToSlot(slot);
-			
+
 			// wait until the state transition must be done
 			for (int i = 0; i < 100; i++) {
 				if (vertex.getExecutionState() == ExecutionState.FAILED && vertex.getFailureCause() != null) {
@@ -255,11 +261,11 @@ public class ExecutionVertexDeploymentTest {
 					Thread.sleep(10);
 				}
 			}
-			
+
 			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
 			assertNotNull(vertex.getFailureCause());
 			assertTrue(vertex.getFailureCause().getMessage().contains(ERROR_MESSAGE));
-			
+
 			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
 			assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
 			assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
@@ -269,11 +275,15 @@ public class ExecutionVertexDeploymentTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testFailExternallyDuringDeploy() {
 		try {
 			final JobVertexID jid = new JobVertexID();
+			final ExecutionJobVertex ejv = getExecutionVertex(jid);
+
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+					AkkaUtils.getDefaultTimeout());
 
 			final ActionQueue queue = new ActionQueue();
 			final TestingUtils.QueuedActionExecutionContext ec = new TestingUtils
@@ -284,12 +294,7 @@ public class ExecutionVertexDeploymentTest {
 			final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
 					Props.create(SimpleAcknowledgingTaskManager.class));
 			final Instance instance = getInstance(simpleTaskManager);
-			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-
-			final ExecutionJobVertex ejv = getExecutionVertex(jid);
-
-			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
-					AkkaUtils.getDefaultTimeout());
+			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			vertex.deployToSlot(slot);
@@ -306,17 +311,19 @@ public class ExecutionVertexDeploymentTest {
 			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
 			assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
 			assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
-		} catch (Exception e) {
+		}
+		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
-		}finally{
+		}
+		finally {
 			TestingUtils.setGlobalExecutionContext();
 		}
 	}
-	
+
 	@Test
 	public void testFailCallOvertakesDeploymentAnswer() {
-		
+
 		try {
 			ActionQueue queue = new ActionQueue();
 			TestingUtils.QueuedActionExecutionContext context = new TestingUtils
@@ -329,6 +336,7 @@ public class ExecutionVertexDeploymentTest {
 			final ExecutionJobVertex ejv = getExecutionVertex(jid);
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
+
 			final ExecutionAttemptID eid = vertex.getCurrentExecutionAttempt().getAttemptId();
 
 			final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system, Props.create(new
@@ -336,16 +344,16 @@ public class ExecutionVertexDeploymentTest {
 					TaskOperationResult(eid, false), new TaskOperationResult(eid, true))));
 
 			final Instance instance = getInstance(simpleTaskManager);
-			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 
 			vertex.deployToSlot(slot);
 			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
-			
+
 			Exception testError = new Exception("test error");
 			vertex.fail(testError);
-			
+
 			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
 
 			// cancel call overtakes deploy call
@@ -361,7 +369,7 @@ public class ExecutionVertexDeploymentTest {
 			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
 
 			assertEquals(testError, vertex.getFailureCause());
-			
+
 			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
 			assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
 			assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
@@ -371,8 +379,9 @@ public class ExecutionVertexDeploymentTest {
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
-		}finally{
+		}
+		finally {
 			TestingUtils.setGlobalExecutionContext();
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/017b4f0f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 06f0e9d..1e9c30b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -28,17 +28,17 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
-
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -46,6 +46,7 @@ import org.junit.Test;
 import org.mockito.Matchers;
 
 public class ExecutionVertexSchedulingTest {
+
 	private static ActorSystem system;
 
 	@BeforeClass
@@ -58,93 +59,91 @@ public class ExecutionVertexSchedulingTest {
 		JavaTestKit.shutdownActorSystem(system);
 		system = null;
 	}
-	
+
 	@Test
 	public void testSlotReleasedWhenScheduledImmediately() {
-		
 		try {
-			// a slot than cannot be deployed to
-			final Instance instance = getInstance(ActorRef.noSender());
-			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-			slot.cancel();
-			assertFalse(slot.isReleased());
-			
 			final ExecutionJobVertex ejv = getExecutionVertex(new JobVertexID());
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
+
+			// a slot than cannot be deployed to
+			final Instance instance = getInstance(ActorRef.noSender());
+			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 			
+			slot.releaseSlot();
+			assertTrue(slot.isReleased());
+
 			Scheduler scheduler = mock(Scheduler.class);
 			when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(slot);
-			
+
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			// try to deploy to the slot
 			vertex.scheduleForExecution(scheduler, false);
-			
+
 			// will have failed
 			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
-			assertTrue(slot.isReleased());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testSlotReleasedWhenScheduledQueued() {
-
 		try {
-			// a slot than cannot be deployed to
-			final Instance instance = getInstance(ActorRef.noSender());
-			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-			slot.cancel();
-			assertFalse(slot.isReleased());
-			
-			final SlotAllocationFuture future = new SlotAllocationFuture();
-			
 			final ExecutionJobVertex ejv = getExecutionVertex(new JobVertexID());
 			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
 					AkkaUtils.getDefaultTimeout());
-			
+
+			// a slot than cannot be deployed to
+			final Instance instance = getInstance(ActorRef.noSender());
+			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+
+			slot.releaseSlot();
+			assertTrue(slot.isReleased());
+
+			final SlotAllocationFuture future = new SlotAllocationFuture();
+
 			Scheduler scheduler = mock(Scheduler.class);
 			when(scheduler.scheduleQueued(Matchers.any(ScheduledUnit.class))).thenReturn(future);
-			
+
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			// try to deploy to the slot
 			vertex.scheduleForExecution(scheduler, true);
-			
+
 			// future has not yet a slot
 			assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
-			
+
 			future.setSlot(slot);
-			
+
 			// will have failed
 			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
-			assertTrue(slot.isReleased());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testScheduleToDeploying() {
 		try {
+			final ExecutionJobVertex ejv = getExecutionVertex(new JobVertexID());
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+					AkkaUtils.getDefaultTimeout());
+
 			TestingUtils.setCallingThreadDispatcher(system);
 			ActorRef tm = TestActorRef.create(system, Props.create(ExecutionGraphTestUtils
 					.SimpleAcknowledgingTaskManager.class));
 
 			final Instance instance = getInstance(tm);
-			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-			
-			final ExecutionJobVertex ejv = getExecutionVertex(new JobVertexID());
-			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
-					AkkaUtils.getDefaultTimeout());
-			
+			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+
 			Scheduler scheduler = mock(Scheduler.class);
 			when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(slot);
-			
+
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 
 			// try to deploy to the slot
@@ -158,4 +157,4 @@ public class ExecutionVertexSchedulingTest {
 			TestingUtils.setGlobalExecutionContext();
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/017b4f0f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index c0ed629..595ac7e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -38,57 +38,52 @@ public class InstanceTest {
 			HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
 			InetAddress address = InetAddress.getByName("127.0.0.1");
 			InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
-			
+
 			Instance instance = new Instance(ActorRef.noSender(), connection, new InstanceID(), hardwareDescription, 4);
-			
+
 			assertEquals(4, instance.getTotalNumberOfSlots());
 			assertEquals(4, instance.getNumberOfAvailableSlots());
 			assertEquals(0, instance.getNumberOfAllocatedSlots());
-			
+
 			SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
 			SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
 			SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
 			SimpleSlot slot4 = instance.allocateSimpleSlot(new JobID());
-			
+
 			assertNotNull(slot1);
 			assertNotNull(slot2);
 			assertNotNull(slot3);
 			assertNotNull(slot4);
-			
+
 			assertEquals(0, instance.getNumberOfAvailableSlots());
 			assertEquals(4, instance.getNumberOfAllocatedSlots());
-			assertEquals(6, slot1.getSlotNumber() + slot2.getSlotNumber() + 
+			assertEquals(6, slot1.getSlotNumber() + slot2.getSlotNumber() +
 					slot3.getSlotNumber() + slot4.getSlotNumber());
-			
+
 			// no more slots
 			assertNull(instance.allocateSimpleSlot(new JobID()));
 			try {
 				instance.returnAllocatedSlot(slot2);
 				fail("instance accepted a non-cancelled slot.");
-			} catch (IllegalArgumentException e) {
+			}
+			catch (IllegalArgumentException e) {
 				// good
 			}
-			
+
 			// release the slots. this returns them to the instance
 			slot1.releaseSlot();
 			slot2.releaseSlot();
-			assertFalse(instance.returnAllocatedSlot(slot1));
-			assertFalse(instance.returnAllocatedSlot(slot2));
-			
-			// cancel some slots. this does not release them, yet
-			slot3.cancel();
-			slot4.cancel();
-			assertTrue(instance.returnAllocatedSlot(slot3));
-			assertTrue(instance.returnAllocatedSlot(slot4));
-			
+			slot3.releaseSlot();
+			slot4.releaseSlot();
+
 			assertEquals(4, instance.getNumberOfAvailableSlots());
 			assertEquals(0, instance.getNumberOfAllocatedSlots());
-			
+
 			assertFalse(instance.returnAllocatedSlot(slot1));
 			assertFalse(instance.returnAllocatedSlot(slot2));
 			assertFalse(instance.returnAllocatedSlot(slot3));
 			assertFalse(instance.returnAllocatedSlot(slot4));
-			
+
 			assertEquals(4, instance.getNumberOfAvailableSlots());
 			assertEquals(0, instance.getNumberOfAllocatedSlots());
 		}
@@ -97,27 +92,27 @@ public class InstanceTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testInstanceDies() {
 		try {
 			HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
 			InetAddress address = InetAddress.getByName("127.0.0.1");
 			InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
-			
+
 			Instance instance = new Instance(ActorRef.noSender(), connection, new InstanceID(), hardwareDescription, 3);
-			
+
 			assertEquals(3, instance.getNumberOfAvailableSlots());
-			
+
 			SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
 			SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
 			SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
-			
+
 			instance.markDead();
-			
+
 			assertEquals(0, instance.getNumberOfAllocatedSlots());
 			assertEquals(0, instance.getNumberOfAvailableSlots());
-			
+
 			assertTrue(slot1.isCanceled());
 			assertTrue(slot2.isCanceled());
 			assertTrue(slot3.isCanceled());
@@ -127,26 +122,26 @@ public class InstanceTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testCancelAllSlots() {
 		try {
 			HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
 			InetAddress address = InetAddress.getByName("127.0.0.1");
 			InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
-			
+
 			Instance instance = new Instance(ActorRef.noSender(), connection, new InstanceID(), hardwareDescription, 3);
-			
+
 			assertEquals(3, instance.getNumberOfAvailableSlots());
-			
+
 			SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
 			SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
 			SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
-			
+
 			instance.cancelAndReleaseAllSlots();
-			
+
 			assertEquals(3, instance.getNumberOfAvailableSlots());
-			
+
 			assertTrue(slot1.isCanceled());
 			assertTrue(slot2.isCanceled());
 			assertTrue(slot3.isCanceled());
@@ -156,7 +151,7 @@ public class InstanceTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	/**
 	 * It is crucial for some portions of the code that instance objects do not override equals and
 	 * are only considered equal, if the references are equal.
@@ -172,4 +167,4 @@ public class InstanceTest {
 			fail(e.getMessage());
 		}
 	}
-}
+}
\ No newline at end of file