You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:13:19 UTC

[55/63] [abbrv] git commit: Make Co-Location constraints resilient aginst out of order scheduling and depply integrate them with slot sharing Fix miscellaneous checkstyle errors/warnings

Make Co-Location constraints resilient aginst out of order scheduling and depply integrate them with slot sharing
Fix miscellaneous checkstyle errors/warnings


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/cdee8750
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/cdee8750
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/cdee8750

Branch: refs/heads/master
Commit: cdee87501762c092c216adf35fceeea339e0c4c4
Parents: 8e7216a
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 17 05:22:24 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:21:01 2014 +0200

----------------------------------------------------------------------
 .../flink/client/CliFrontendListCancelTest.java |   7 +-
 .../plantranslate/NepheleJobGraphGenerator.java |   7 +-
 .../flink/runtime/jobmanager/JobManager.java    |   7 +-
 .../scheduler/CoLocationConstraint.java         |  46 ++-
 .../jobmanager/scheduler/CoLocationGroup.java   |  10 +-
 .../runtime/jobmanager/scheduler/Scheduler.java | 102 ++----
 .../jobmanager/scheduler/SharedSlot.java        |  93 +++---
 .../scheduler/SlotSharingGroupAssignment.java   | 258 ++++++++++-----
 .../runtime/jobmanager/scheduler/SubSlot.java   |  12 +-
 .../protocols/ExtendedManagementProtocol.java   |   4 +-
 .../runtime/jobgraph/JobManagerTestUtils.java   |   4 +-
 .../runtime/jobmanager/JobManagerITCase.java    |  14 +-
 .../ScheduleWithCoLocationHintTest.java         | 322 ++++++++++++++++++-
 .../scheduler/SchedulerTestUtils.java           |   1 +
 .../jobmanager/scheduler/SharedSlotsTest.java   |  32 +-
 .../runtime/testutils/ServerTestUtils.java      |   2 +-
 16 files changed, 655 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index 2f31181..40d10b6 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -183,7 +183,12 @@ public class CliFrontendListCancelTest {
 		}
 
 		@Override
-		public int getAvailableSlots() {
+		public int getTotalNumberOfRegisteredSlots() {
+			return 1;
+		}
+
+		@Override
+		public int getNumberOfSlotsAvailableToScheduler() throws IOException {
 			return 1;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index a3fef17..a224324 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -493,11 +493,10 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 			
 			if (this.currentIteration != null) {
 				AbstractJobVertex head = this.iterations.get(this.currentIteration).getHeadTask();
-				if (head == null) {
-					throw new CompilerException("Found no iteration head task in the postVisit of translating a task inside an iteration");
+				// the head may still be null if we descend into the static parts first
+				if (head != null) {
+					targetVertex.setStrictlyCoLocatedWith(head);
 				}
-				
-				targetVertex.setStrictlyCoLocatedWith(head);
 			}
 			
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index f79fecb..1c02127 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -574,10 +574,15 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 	}
 
 	@Override
-	public int getAvailableSlots() {
+	public int getTotalNumberOfRegisteredSlots() {
 		return getInstanceManager().getTotalNumberOfSlots();
 	}
 	
+	@Override
+	public int getNumberOfSlotsAvailableToScheduler() {
+		return scheduler.getNumberOfAvailableSlots();
+	}
+	
 	/**
 	 * Starts the Jetty Infoserver for the Jobmanager
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
index 64f7ffc..89b644b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -18,30 +18,52 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
+import org.apache.flink.runtime.AbstractID;
 import org.apache.flink.runtime.instance.Instance;
 
+import com.google.common.base.Preconditions;
+
 public class CoLocationConstraint {
 	
-	private volatile Instance location;
+	private final CoLocationGroup group;
 	
+	private volatile SharedSlot sharedSlot;
 	
-	public void setLocation(Instance location) {
-		if (location == null) {
-			throw new IllegalArgumentException();
-		}
-		
-		if (this.location == null) {
-			this.location = location;
+	
+	CoLocationConstraint(CoLocationGroup group) {
+		Preconditions.checkNotNull(group);
+		this.group = group;
+	}
+	
+	
+	public SharedSlot getSharedSlot() {
+		return sharedSlot;
+	}
+	
+	public Instance getLocation() {
+		if (sharedSlot != null) {
+			return sharedSlot.getAllocatedSlot().getInstance();
 		} else {
-			throw new IllegalStateException("The constraint has already been assigned a location.");
+			throw new IllegalStateException("Not assigned");
 		}
 	}
 	
-	public Instance getLocation() {
-		return location;
+	public void setSharedSlot(SharedSlot sharedSlot) {
+		if (this.sharedSlot == sharedSlot) {
+			return;
+		}
+		else if (this.sharedSlot == null || this.sharedSlot.isDisposed()) {
+			this.sharedSlot = sharedSlot;
+		} else {
+			throw new IllegalStateException("Overriding shared slot that is still alive.");
+		}
 	}
 	
 	public boolean isUnassigned() {
-		return this.location == null;
+		return this.sharedSlot == null;
+	}
+	
+	public AbstractID getGroupId() {
+		return this.group.getId();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
index 2398334..84692a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager.scheduler;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.runtime.AbstractID;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 
 import com.google.common.base.Preconditions;
@@ -29,6 +30,9 @@ public class CoLocationGroup implements java.io.Serializable {
 	
 	private static final long serialVersionUID = -2605819490401895297L;
 
+	// we use a job vertex ID, because the co location group acts as a unit inside which exclusive sharing of
+	// slots is used
+	private final AbstractID id = new AbstractID();
 	
 	private final List<AbstractJobVertex> vertices = new ArrayList<AbstractJobVertex>();
 	
@@ -80,8 +84,12 @@ public class CoLocationGroup implements java.io.Serializable {
 		if (num > constraints.size()) {
 			constraints.ensureCapacity(num);
 			for (int i = constraints.size(); i < num; i++) {
-				constraints.add(new CoLocationConstraint());
+				constraints.add(new CoLocationConstraint(this));
 			}
 		}
 	}
+	
+	public AbstractID getId() {
+		return id;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index b9f83fc..20e4d17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -41,7 +41,7 @@ import org.apache.flink.util.ExceptionUtils;
  */
 public class Scheduler implements InstanceListener, SlotAvailablilityListener {
 
-	private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
+	static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
 	
 	
 	private final Object globalLock = new Object();
@@ -150,56 +150,6 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
 		final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
 	
 		synchronized (globalLock) {
-			
-//			// 1)  === If the task has a strict co-schedule hint, obey it ===
-//			
-//			CoLocationConstraint locationConstraint = task.getLocationConstraint();
-//			if (locationConstraint != null) {
-//				// location constraints can never be scheduled in a queued fashion
-//				if (queueIfNoResource) {
-//					throw new IllegalArgumentException("A task with a location constraint was scheduled in a queued fashion.");
-//				}
-//				
-//				// since we are inside the global lock scope, we can check, allocate, and assign
-//				// in one atomic action. however, slots may die and be deallocated
-//				
-//				// (a) is the constraint has not yet has a slot, get one
-//				if (locationConstraint.isUnassigned()) {
-//					// try and get a slot
-//					AllocatedSlot newSlot = getFreeSlotForTask(vertex);
-//					if (newSlot == null) {
-//						throw new NoResourceAvailableException();
-//					}
-//					SharedSlot sl = locationConstraint.swapInNewSlot(newSlot);
-//					SubSlot slot = sl.allocateSubSlot(vertex.getJobvertexId());
-//					
-//					updateLocalityCounters(newSlot.getLocality());
-//					return slot;
-//				}
-//				else {
-//					// try to get a subslot. returns null, if the location's slot has been released
-//					// in the meantime
-//					SubSlot slot = locationConstraint.allocateSubSlot(vertex.getJobvertexId());
-//					if (slot == null) {
-//						// get a new slot. at the same instance!!!
-//						Instance location = locationConstraint.getSlot().getAllocatedSlot().getInstance();
-//						AllocatedSlot newSlot;
-//						try {
-//							newSlot = location.allocateSlot(vertex.getJobId());
-//						} catch (InstanceDiedException e) {
-//							throw new NoResourceAvailableException("The instance of the required location died.");
-//						}
-//						if (newSlot == null) {
-//							throw new NoResourceAvailableException();
-//						}
-//						SharedSlot sharedSlot = locationConstraint.swapInNewSlot(newSlot);
-//						slot = sharedSlot.allocateSubSlot(vertex.getJobvertexId());
-//					}
-//					
-//					updateLocalityCounters(Locality.LOCAL);
-//					return slot;
-//				}
-//			}
 		
 			// 1)  === If the task has a slot sharing group, schedule with shared slots ===
 			
@@ -213,18 +163,17 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
 				final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
 				final CoLocationConstraint constraint = task.getLocationConstraint();
 				
-				AllocatedSlot newSlot = null;
-				
-				// get a slot from the group. obey location constraints, if existing and assigned
-				AllocatedSlot slotFromGroup;
-				if (constraint == null || constraint.isUnassigned()) {
-					slotFromGroup = assignment.getSlotForTask(vertex.getJobvertexId(), vertex);
+				// get a slot from the group, if the group has one for us (and can fulfill the constraint)
+				SubSlot slotFromGroup;
+				if (constraint == null) {
+					slotFromGroup = assignment.getSlotForTask(vertex);
 				}
 				else {
-					// this returns null, if the constraint cannot be fulfilled
-					slotFromGroup = assignment.getSlotForTask(vertex.getJobvertexId(), constraint);
+					slotFromGroup = assignment.getSlotForTask(vertex, constraint);
 				}
 				
+				AllocatedSlot newSlot = null;
+				
 				// the following needs to make sure any allocated slot is released in case of an error
 				try {
 					
@@ -232,12 +181,6 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
 					if (slotFromGroup != null) {
 						// local (or unconstrained in the current group)
 						if (slotFromGroup.getLocality() != Locality.NON_LOCAL) {
-							
-							// attach to the locality constraint
-							if (constraint != null && constraint.isUnassigned()) {
-								constraint.setLocation(slotFromGroup.getInstance());
-							}
-							
 							updateLocalityCounters(slotFromGroup.getLocality());
 							return slotFromGroup;
 						}
@@ -249,13 +192,19 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
 					// get a new slot, since we could not place it into the group, or we could not place it locally
 					newSlot = getFreeSlotForTask(vertex, locations);
 					
-					AllocatedSlot toUse;
+					SubSlot toUse;
 					
 					if (newSlot == null) {
 						if (slotFromGroup == null) {
 							// both null
-							throw new NoResourceAvailableException();
+							if (constraint == null || constraint.isUnassigned()) {
+								throw new NoResourceAvailableException();
+							} else {
+								throw new NoResourceAvailableException("Could not allocate a slot on instance " + 
+											constraint.getLocation() + ", as required by the co-location constraint.");
+							}
 						} else {
+							// got a non-local from the group, and no new one
 							toUse = slotFromGroup;
 						}
 					}
@@ -265,17 +214,28 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
 							slotFromGroup.releaseSlot();
 						}
 						
-						toUse = sharingUnit.getTaskAssignment().addSlotWithTask(newSlot, task.getJobVertexId());
+						if (constraint == null) {
+							toUse = assignment.addNewSlotWithTask(newSlot, vertex);
+						} else {
+							toUse = assignment.addNewSlotWithTask(newSlot, vertex, constraint);
+						}
 					}
 					else {
-						// both are available and potentially usable
+						// both are available and usable. neither is local
 						newSlot.releaseSlot();
 						toUse = slotFromGroup;
 					}
 					
 					// assign to the co-location hint, if we have one and it is unassigned
-					if (constraint != null && constraint.isUnassigned()) {
-						constraint.setLocation(toUse.getInstance());
+					// if it was assigned before and the new one is not local, it is a fail
+					if (constraint != null) {
+						if (constraint.isUnassigned() || toUse.getLocality() == Locality.LOCAL) {
+							constraint.setSharedSlot(toUse.getSharedSlot());
+						} else {
+							// the fail
+							throw new NoResourceAvailableException("Could not allocate a slot on instance " + 
+									constraint.getLocation() + ", as required by the co-location constraint.");
+						}
 					}
 					
 					updateLocalityCounters(toUse.getLocality());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
index 0f3687a..5d87e07 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
@@ -24,7 +24,13 @@ import java.util.Set;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
-public class SharedSlot {
+/**
+ * 
+ * NOTE: This class does no synchronization by itself and its mutating
+ *       methods may only be called from within the synchronization scope of
+ *       it associated SlotSharingGroupAssignment.
+ */
+class SharedSlot {
 
 	private final AllocatedSlot allocatedSlot;
 	
@@ -48,77 +54,58 @@ public class SharedSlot {
 		this.subSlots = new HashSet<SubSlot>();
 	}
 	
-	public SharedSlot(AllocatedSlot allocatedSlot) {
-		if (allocatedSlot == null) {
-			throw new NullPointerException();
-		}
-		
-		this.allocatedSlot = allocatedSlot;
-		this.assignmentGroup = null;;
-		this.subSlots = new HashSet<SubSlot>();
-	}
-	
 	// --------------------------------------------------------------------------------------------
 	
-	public AllocatedSlot getAllocatedSlot() {
+	AllocatedSlot getAllocatedSlot() {
 		return this.allocatedSlot;
 	}
 	
-	public boolean isDisposed() {
+	boolean isDisposed() {
 		return disposed;
 	}
 	
-	public int getNumberOfAllocatedSubSlots() {
-		synchronized (this.subSlots) {
-			return this.subSlots.size();
-		}
+	int getNumberOfAllocatedSubSlots() {
+		return this.subSlots.size();
 	}
 	
-	public SubSlot allocateSubSlot(JobVertexID jid) {
-		synchronized (this.subSlots) {
-			if (isDisposed()) {
-				return null;
-			} else {
-				SubSlot ss = new SubSlot(this, subSlotNumber++, jid);
-				this.subSlots.add(ss);
-				return ss;
-			}
+	SubSlot allocateSubSlot(JobVertexID jid) {
+		if (disposed) {
+			return null;
+		} else {
+			SubSlot ss = new SubSlot(this, subSlotNumber++, jid);
+			this.subSlots.add(ss);
+			return ss;
 		}
 	}
 	
-	public void rease() {
-		synchronized (this.subSlots) {
-			disposed = true;
-			for (SubSlot ss : subSlots) {
-				ss.releaseSlot();
-			}
+	void returnAllocatedSlot(SubSlot slot) {
+		if (!slot.isReleased()) {
+			throw new IllegalArgumentException("SubSlot is not released.");
 		}
 		
-		allocatedSlot.releaseSlot();
+		this.assignmentGroup.releaseSubSlot(slot, this);
 	}
 	
-	void returnAllocatedSlot(SubSlot slot) {
-		boolean release;
-		
-		synchronized (this.subSlots) {
-			if (!this.subSlots.remove(slot)) {
-				throw new IllegalArgumentException("Wrong shared slot for subslot.");
-			}
-			
-			if (assignmentGroup != null) {
-				release = assignmentGroup.sharedSlotAvailableForJid(this, slot.getJobVertexId(), this.subSlots.isEmpty());
-			} else {
-				release = subSlots.isEmpty();
-			}
-			
-			if (release) {
-				disposed = true;
-			}
+	int releaseSlot(SubSlot slot) {
+		if (!this.subSlots.remove(slot)) {
+			throw new IllegalArgumentException("Wrong shared slot for subslot.");
 		}
-		
-		// do this call outside the lock, because releasing the allocated slot may go into further scheduler calls
-		if (release) {
+		return subSlots.size();
+	}
+	
+	void dispose() {
+		if (subSlots.isEmpty()) {
+			disposed = true;
 			this.allocatedSlot.releaseSlot();
+		} else {
+			throw new IllegalStateException("Cannot dispose while subslots are still alive.");
 		}
 	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public String toString() {
+		return "Shared " + allocatedSlot.toString();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
index 474fdbe..68d3888 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
@@ -28,34 +28,53 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.flink.runtime.AbstractID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.slf4j.Logger;
+
 
 public class SlotSharingGroupAssignment {
 	
+	private static final Logger LOG = Scheduler.LOG;
+	
+	private final Object lock = new Object();
+	
 	/** All slots currently allocated to this sharing group */
 	private final Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>();
 	
 	/** The slots available per vertex type (jid), keyed by instance, to make them locatable */
-	private final Map<JobVertexID, Map<Instance, List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<JobVertexID, Map<Instance, List<SharedSlot>>>();
+	private final Map<AbstractID, Map<Instance, List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<AbstractID, Map<Instance, List<SharedSlot>>>();
 	
 	
 	// --------------------------------------------------------------------------------------------
 	
 	
-	public SubSlot addSlotWithTask(AllocatedSlot slot, JobVertexID jid) {
+	public SubSlot addNewSlotWithTask(AllocatedSlot slot, ExecutionVertex vertex) {
+		JobVertexID id = vertex.getJobvertexId();
+		return addNewSlotWithTask(slot, id, id);
+	}
+	
+	public SubSlot addNewSlotWithTask(AllocatedSlot slot, ExecutionVertex vertex, CoLocationConstraint constraint) {
+		AbstractID groupId = constraint.getGroupId();
+		return addNewSlotWithTask(slot, groupId, null);
+	}
+	
+	private SubSlot addNewSlotWithTask(AllocatedSlot slot, AbstractID groupId, JobVertexID vertexId) {
 		
 		final SharedSlot sharedSlot = new SharedSlot(slot, this);
 		final Instance location = slot.getInstance();
 		
-		synchronized (allSlots) {
+		synchronized (lock) {
 			// add to the total bookkeeping
 			allSlots.add(sharedSlot);
 			
 			// allocate us a sub slot to return
-			SubSlot subslot = sharedSlot.allocateSubSlot(jid);
+			SubSlot subslot = sharedSlot.allocateSubSlot(vertexId);
 			
 			// preserve the locality information
 			subslot.setLocality(slot.getLocality());
@@ -63,10 +82,9 @@ public class SlotSharingGroupAssignment {
 			boolean entryForNewJidExists = false;
 			
 			// let the other vertex types know about this one as well
-			
-			for (Map.Entry<JobVertexID, Map<Instance, List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) {
+			for (Map.Entry<AbstractID, Map<Instance, List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) {
 				
-				if (entry.getKey().equals(jid)) {
+				if (entry.getKey().equals(groupId)) {
 					entryForNewJidExists = true;
 					continue;
 				}
@@ -75,9 +93,9 @@ public class SlotSharingGroupAssignment {
 				putIntoMultiMap(available, location, sharedSlot);
 			}
 			
-			// make sure an empty entry exists for this jid, if no other entry exists
+			// make sure an empty entry exists for this group, if no other entry exists
 			if (!entryForNewJidExists) {
-				availableSlotsPerJid.put(jid, new LinkedHashMap<Instance, List<SharedSlot>>());
+				availableSlotsPerJid.put(groupId, new LinkedHashMap<Instance, List<SharedSlot>>());
 			}
 			
 			return subslot;
@@ -90,105 +108,97 @@ public class SlotSharingGroupAssignment {
 	 * slots if no local slot is available. The method returns null, when no slot is available for the
 	 * given JobVertexID at all.
 	 * 
-	 * @param jid
 	 * @param vertex
 	 * 
 	 * @return A task vertex for a task with the given JobVertexID, or null, if none is available.
 	 */
-	public AllocatedSlot getSlotForTask(JobVertexID jid, ExecutionVertex vertex) {
-		synchronized (allSlots) {
-			return getSlotForTaskInternal(jid, vertex.getPreferredLocations(), false);
-		}
-	}
-	
-	
-	public AllocatedSlot getSlotForTask(JobVertexID jid, CoLocationConstraint constraint) {
-		if (constraint.isUnassigned()) {
-			throw new IllegalArgumentException("CoLocationConstraint is unassigned");
+	public SubSlot getSlotForTask(ExecutionVertex vertex) {
+		synchronized (lock) {
+			Pair<SharedSlot, Locality> p = getSlotForTaskInternal(vertex.getJobvertexId(), vertex, vertex.getPreferredLocations(), false);
+			
+			if (p != null) {
+				SharedSlot ss = p.getLeft();
+				SubSlot slot = ss.allocateSubSlot(vertex.getJobvertexId());
+				slot.setLocality(p.getRight());
+				return slot;
+			}
+			else {
+				return null;
+			}
 		}
 		
-		synchronized (allSlots) {
-			return getSlotForTaskInternal(jid, Collections.singleton(constraint.getLocation()), true);
-		}
 	}
 	
-	
-	public boolean sharedSlotAvailableForJid(SharedSlot slot, JobVertexID jid, boolean lastSubSlot) {
-		if (slot == null || jid == null) {
-			throw new NullPointerException();
-		}
+	public SubSlot getSlotForTask(ExecutionVertex vertex, CoLocationConstraint constraint) {
 		
-		synchronized (allSlots) {
-			if (!allSlots.contains(slot)) {
-				throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup before.");
-			}
+		synchronized (lock) {
+			SharedSlot shared = constraint.getSharedSlot();
 			
-			if (lastSubSlot) {
-				// this was the last sub slot. unless there is something pending for this jid
-				// remove this from the availability list of all jids and 
-				// return that this one is good to release
-				allSlots.remove(slot);
-				
-				Instance location = slot.getAllocatedSlot().getInstance();
-				
-				for (Map.Entry<JobVertexID, Map<Instance, List<SharedSlot>>> mapEntry : availableSlotsPerJid.entrySet()) {
-					if (mapEntry.getKey().equals(jid)) {
-						continue;
-					}
+			if (shared != null && !shared.isDisposed()) {
+				// initialized and set
+				SubSlot subslot = shared.allocateSubSlot(null);
+				subslot.setLocality(Locality.LOCAL);
+				return subslot;
+			}
+			else if (shared == null) {
+				// not initialized, grab a new slot. preferred locations are defined by the vertex
+				// we only associate the slot with the constraint, if it was a local match
+				Pair<SharedSlot, Locality> p = getSlotForTaskInternal(constraint.getGroupId(), vertex, vertex.getPreferredLocations(), false);
+				if (p == null) {
+					return null;
+				} else {
+					shared = p.getLeft();
+					Locality l = p.getRight();
 					
-					Map<Instance, List<SharedSlot>> map = mapEntry.getValue();
-					List<SharedSlot> list = map.get(location);
-					if (list == null || !list.remove(slot)) {
-						throw new IllegalStateException("SharedSlot was not available to another vertex type that it was not allocated for before.");
-					}
-					if (list.isEmpty()) {
-						map.remove(location);
+					SubSlot sub = shared.allocateSubSlot(null);
+					sub.setLocality(l);
+					
+					if (l != Locality.NON_LOCAL) {
+						constraint.setSharedSlot(shared);
 					}
+					return sub;
 				}
-				
-				return true;
 			}
-			
-			Map<Instance, List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(jid);
-			
-			// sanity check
-			if (slotsForJid == null) {
-				throw new IllegalStateException("Trying to return a slot for jid " + jid + 
-						" when available slots indicated that all slots were available.");
+			else {
+				// disposed. get a new slot on the same instance
+				Instance location = shared.getAllocatedSlot().getInstance();
+				Pair<SharedSlot, Locality> p = getSlotForTaskInternal(constraint.getGroupId(), vertex, Collections.singleton(location), true);
+				if (p == null) {
+					return null;
+				} else {
+					shared = p.getLeft();
+					constraint.setSharedSlot(shared);
+					SubSlot subslot = shared.allocateSubSlot(null);
+					subslot.setLocality(Locality.LOCAL);
+					return subslot;
+				}
 			}
-			
-			putIntoMultiMap(slotsForJid, slot.getAllocatedSlot().getInstance(), slot);
-			
-			// do not release, we are still depending on this shared slot
-			return false;
 		}
 	}
 	
-	
 	/**
 	 * NOTE: This method is not synchronized by itself, needs to be synchronized externally.
 	 * 
-	 * @param jid
 	 * @return An allocated sub slot, or {@code null}, if no slot is available.
 	 */
-	private AllocatedSlot getSlotForTaskInternal(JobVertexID jid, Iterable<Instance> preferredLocations, boolean localOnly) {
+	private Pair<SharedSlot, Locality> getSlotForTaskInternal(AbstractID groupId, ExecutionVertex vertex, Iterable<Instance> preferredLocations, boolean localOnly) {
 		if (allSlots.isEmpty()) {
 			return null;
 		}
 		
-		Map<Instance, List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(jid);
+		Map<Instance, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId);
 		
-		// get the available slots for the vertex type (jid)
-		if (slotsForJid == null) {
-			// no task is yet scheduled for that jid, so all slots are available
-			slotsForJid = new LinkedHashMap<Instance, List<SharedSlot>>();
-			availableSlotsPerJid.put(jid, slotsForJid);
+		// get the available slots for the group
+		if (slotsForGroup == null) {
+			// no task is yet scheduled for that group, so all slots are available
+			slotsForGroup = new LinkedHashMap<Instance, List<SharedSlot>>();
+			availableSlotsPerJid.put(groupId, slotsForGroup);
 			
 			for (SharedSlot availableSlot : allSlots) {
-				putIntoMultiMap(slotsForJid, availableSlot.getAllocatedSlot().getInstance(), availableSlot);
+				putIntoMultiMap(slotsForGroup, availableSlot.getAllocatedSlot().getInstance(), availableSlot);
 			}
 		}
-		else if (slotsForJid.isEmpty()) {
+		else if (slotsForGroup.isEmpty()) {
 			return null;
 		}
 		
@@ -202,32 +212,102 @@ public class SlotSharingGroupAssignment {
 				// we return early anyways and skip the flag evaluation
 				didNotGetPreferred = true;
 				
-				SharedSlot slot = removeFromMultiMap(slotsForJid, location);
-				if (slot != null) {
-					SubSlot subslot = slot.allocateSubSlot(jid);
-					subslot.setLocality(Locality.LOCAL);
-					return subslot;
+				SharedSlot slot = removeFromMultiMap(slotsForGroup, location);
+				if (slot != null && !slot.isDisposed()) {
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Local assignment in shared group : " + vertex + " --> " + slot);
+					}
+					
+					return new ImmutablePair<SharedSlot, Locality>(slot, Locality.LOCAL);
 				}
 			}
 		}
 		
 		// if we want only local assignments, exit now with a "not found" result
 		if (didNotGetPreferred && localOnly) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("No local assignment in shared possible for " + vertex);
+			}
 			return null;
 		}
 		
 		// schedule the task to any available location
-		SharedSlot slot = pollFromMultiMap(slotsForJid);
-		if (slot != null) {
-			SubSlot subslot = slot.allocateSubSlot(jid);
-			subslot.setLocality(didNotGetPreferred ? Locality.NON_LOCAL : Locality.UNCONSTRAINED);
-			return subslot;
+		SharedSlot slot = pollFromMultiMap(slotsForGroup);
+		if (slot != null && !slot.isDisposed()) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug((didNotGetPreferred ? "Non-local" : "Unconstrained") + " assignment in shared group : " + vertex + " --> " + slot);
+			}
+			
+			return new ImmutablePair<SharedSlot, Locality>(slot, didNotGetPreferred ? Locality.NON_LOCAL : Locality.UNCONSTRAINED);
 		}
 		else {
 			return null;
 		}
 	}
 	
+	
+	void releaseSubSlot(SubSlot subslot, SharedSlot sharedSlot) {
+		
+		AbstractID groupId = subslot.getGroupId();
+		
+		synchronized (lock) {
+
+			if (!allSlots.contains(sharedSlot)) {
+				throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup before.");
+			}
+			
+			int slotsRemaining = sharedSlot.releaseSlot(subslot);
+			
+			if (slotsRemaining == 0) {
+				// this was the last sub slot. remove this from the availability list 
+				// and trigger disposal
+				try {
+					allSlots.remove(sharedSlot);
+					
+					Instance location = sharedSlot.getAllocatedSlot().getInstance();
+
+					if (groupId != null) {
+						for (Map.Entry<AbstractID, Map<Instance, List<SharedSlot>>> mapEntry : availableSlotsPerJid.entrySet()) {
+							AbstractID id = mapEntry.getKey();
+							
+							// hack: we identify co location hint entries by the fact that they are keyed
+							//       by an abstract id, rather than a job vertex id
+							if (id.getClass() == AbstractID.class || id.equals(groupId)) {
+								continue;
+							}
+							
+							Map<Instance, List<SharedSlot>> map = mapEntry.getValue();
+							List<SharedSlot> list = map.get(location);
+							if (list == null || !list.remove(sharedSlot)) {
+								throw new IllegalStateException("Bug: SharedSlot was not available to another vertex type that it was not allocated for before.");
+							}
+							if (list.isEmpty()) {
+								map.remove(location);
+							}
+						}
+					}
+				} finally {
+					sharedSlot.dispose();
+				}
+			}
+			else if (groupId != null) {
+				// make the shared slot available to tasks within the group it available to
+				Map<Instance, List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(groupId);
+				
+				// sanity check
+				if (slotsForJid == null) {
+					throw new IllegalStateException("Trying to return a slot for group " + groupId + 
+							" when available slots indicated that all slots were available.");
+				}
+				
+				putIntoMultiMap(slotsForJid, sharedSlot.getAllocatedSlot().getInstance(), sharedSlot);
+			}
+		}
+	}
+	
+	
+	
+	
 	// --------------------------------------------------------------------------------------------
 	//  State
 	// --------------------------------------------------------------------------------------------
@@ -237,7 +317,7 @@ public class SlotSharingGroupAssignment {
 	}
 	
 	public int getNumberOfAvailableSlotsForJid(JobVertexID jid) {
-		synchronized (allSlots) {
+		synchronized (lock) {
 			Map<Instance, List<SharedSlot>> available = availableSlotsPerJid.get(jid);
 			
 			if (available != null) {
@@ -255,6 +335,10 @@ public class SlotSharingGroupAssignment {
 			}
 		}
 	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	
 	
 	// --------------------------------------------------------------------------------------------
 	//  Utilities

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
index ca2fb5e..7ff2990 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
@@ -18,25 +18,25 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
+import org.apache.flink.runtime.AbstractID;
 import org.apache.flink.runtime.instance.AllocatedSlot;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 public class SubSlot extends AllocatedSlot {
 
 	private final SharedSlot sharedSlot;
 	
-	private final JobVertexID jid;
+	private final AbstractID groupId;
 	
 	private final int subSlotNumber;
 	
 	
-	public SubSlot(SharedSlot sharedSlot, int subSlotNumber, JobVertexID jid) {
+	public SubSlot(SharedSlot sharedSlot, int subSlotNumber, AbstractID groupId) {
 		super(sharedSlot.getAllocatedSlot().getJobID(),
 				sharedSlot.getAllocatedSlot().getInstance(),
 				sharedSlot.getAllocatedSlot().getSlotNumber());
 		
 		this.sharedSlot = sharedSlot;
-		this.jid = jid;
+		this.groupId = groupId;
 		this.subSlotNumber = subSlotNumber;
 	}
 	
@@ -59,8 +59,8 @@ public class SubSlot extends AllocatedSlot {
 		return this.sharedSlot;
 	}
 	
-	public JobVertexID getJobVertexId() {
-		return jid;
+	public AbstractID getGroupId() {
+		return groupId;
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/ExtendedManagementProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/ExtendedManagementProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/ExtendedManagementProtocol.java
index 9e3e22e..909a595 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/ExtendedManagementProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/ExtendedManagementProtocol.java
@@ -59,5 +59,7 @@ public interface ExtendedManagementProtocol extends JobManagementProtocol {
 	 * @return number of available slots
 	 * @throws IOException
 	 */
-	int getAvailableSlots() throws IOException;
+	int getTotalNumberOfRegisteredSlots() throws IOException;
+	
+	int getNumberOfSlotsAvailableToScheduler() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
index 14a73e1..1d5471e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
@@ -46,11 +46,11 @@ public class JobManagerTestUtils {
 		// max time is 5 seconds
 		long deadline = System.currentTimeMillis() + 5000;
 		
-		while (jm.getAvailableSlots() < numSlots && System.currentTimeMillis() < deadline) {
+		while (jm.getNumberOfSlotsAvailableToScheduler() < numSlots && System.currentTimeMillis() < deadline) {
 			Thread.sleep(10);
 		}
 		
-		assertEquals(numSlots, jm.getAvailableSlots());
+		assertEquals(numSlots, jm.getNumberOfSlotsAvailableToScheduler());
 		
 		return jm;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
index 70b3ad9..b07cc87 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
@@ -68,7 +68,7 @@ public class JobManagerITCase {
 			
 			try {
 				
-				assertEquals(1, jm.getAvailableSlots());
+				assertEquals(1, jm.getTotalNumberOfRegisteredSlots());
 				
 				// we need to register the job at the library cache manager (with no libraries)
 				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
@@ -141,7 +141,7 @@ public class JobManagerITCase {
 			
 			try {
 				
-				assertEquals(NUM_TASKS, jm.getAvailableSlots());
+				assertEquals(NUM_TASKS, jm.getTotalNumberOfRegisteredSlots());
 				
 				// we need to register the job at the library cache manager (with no libraries)
 				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
@@ -535,7 +535,7 @@ public class JobManagerITCase {
 					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
 			
 			try {
-				assertEquals(NUM_TASKS, jm.getAvailableSlots());
+				assertEquals(NUM_TASKS, jm.getTotalNumberOfRegisteredSlots());
 				
 				// we need to register the job at the library cache manager (with no libraries)
 				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
@@ -599,7 +599,7 @@ public class JobManagerITCase {
 					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
 			
 			try {
-				assertEquals(NUM_TASKS, jm.getAvailableSlots());
+				assertEquals(NUM_TASKS, jm.getTotalNumberOfRegisteredSlots());
 				
 				// we need to register the job at the library cache manager (with no libraries)
 				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
@@ -727,7 +727,7 @@ public class JobManagerITCase {
 					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
 			
 			try {
-				assertEquals(NUM_TASKS, jm.getAvailableSlots());
+				assertEquals(NUM_TASKS, jm.getTotalNumberOfRegisteredSlots());
 				
 				// we need to register the job at the library cache manager (with no libraries)
 				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
@@ -788,13 +788,13 @@ public class JobManagerITCase {
 			
 			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
 			
-			final JobManager jm = startJobManager(NUM_TASKS);
+			final JobManager jm = startJobManager(2*NUM_TASKS);
 			
 			final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
 					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
 			
 			try {
-				assertEquals(NUM_TASKS, jm.getAvailableSlots());
+				assertEquals(2*NUM_TASKS, jm.getNumberOfSlotsAvailableToScheduler());
 				
 				// we need to register the job at the library cache manager (with no libraries)
 				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index 0efe10b..7ea4fd5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager.scheduler;
 
 import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
 import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getTestVertex;
+import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getTestVertexWithLocation;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -48,12 +49,13 @@ public class ScheduleWithCoLocationHintTest {
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup();
 			
-			CoLocationConstraint c1 = new CoLocationConstraint();
-			CoLocationConstraint c2 = new CoLocationConstraint();
-			CoLocationConstraint c3 = new CoLocationConstraint();
-			CoLocationConstraint c4 = new CoLocationConstraint();
-			CoLocationConstraint c5 = new CoLocationConstraint();
-			CoLocationConstraint c6 = new CoLocationConstraint();
+			CoLocationGroup ccg = new CoLocationGroup();
+			CoLocationConstraint c1 = new CoLocationConstraint(ccg);
+			CoLocationConstraint c2 = new CoLocationConstraint(ccg);
+			CoLocationConstraint c3 = new CoLocationConstraint(ccg);
+			CoLocationConstraint c4 = new CoLocationConstraint(ccg);
+			CoLocationConstraint c5 = new CoLocationConstraint(ccg);
+			CoLocationConstraint c6 = new CoLocationConstraint(ccg);
 			
 			// schedule 4 tasks from the first vertex group
 			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1));
@@ -174,7 +176,7 @@ public class ScheduleWithCoLocationHintTest {
 			assertEquals(2, scheduler.getNumberOfAvailableSlots());
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup();
-			CoLocationConstraint c1 = new CoLocationConstraint();
+			CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup());
 			
 			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1));
 			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1));
@@ -218,7 +220,7 @@ public class ScheduleWithCoLocationHintTest {
 			assertEquals(2, scheduler.getNumberOfAvailableSlots());
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup();
-			CoLocationConstraint c1 = new CoLocationConstraint();
+			CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup());
 			
 			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1));
 			s1.releaseSlot();
@@ -260,10 +262,12 @@ public class ScheduleWithCoLocationHintTest {
 			
 			assertEquals(4, scheduler.getNumberOfAvailableSlots());
 			
-			CoLocationConstraint clc1 = new CoLocationConstraint();
-			CoLocationConstraint clc2 = new CoLocationConstraint();
-			CoLocationConstraint clc3 = new CoLocationConstraint();
-			CoLocationConstraint clc4 = new CoLocationConstraint();
+			CoLocationGroup grp = new CoLocationGroup();
+			CoLocationConstraint clc1 = new CoLocationConstraint(grp);
+			CoLocationConstraint clc2 = new CoLocationConstraint(grp);
+			CoLocationConstraint clc3 = new CoLocationConstraint(grp);
+			CoLocationConstraint clc4 = new CoLocationConstraint(grp);
+			
 			SlotSharingGroup shareGroup = new SlotSharingGroup();
 
 			// first wave
@@ -303,4 +307,298 @@ public class ScheduleWithCoLocationHintTest {
 			fail(e.getMessage());
 		}
 	}
+
+	
+	@Test
+	public void testGetsNonLocalFromSharingGroupFirst() {
+		try {
+			JobVertexID jid1 = new JobVertexID();
+			JobVertexID jid2 = new JobVertexID();
+			JobVertexID jid3 = new JobVertexID();
+			
+			Scheduler scheduler = new Scheduler();
+			
+			Instance i1 = getRandomInstance(1);
+			Instance i2 = getRandomInstance(1);
+			
+			scheduler.newInstanceAvailable(i2);
+			scheduler.newInstanceAvailable(i1);
+			
+			assertEquals(2, scheduler.getNumberOfAvailableSlots());
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup();
+			
+			CoLocationGroup ccg = new CoLocationGroup();
+			CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
+			CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
+
+			// schedule something into the shared group so that both instances are in the sharing group
+			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup));
+			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup));
+			
+			// schedule one locally to instance 1
+			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup, cc1));
+
+			// schedule with co location constraint (yet unassigned) and a preference for
+			// instance 1, but it can only get instance 2
+			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc2));
+			
+			// schedule something into the assigned co-location constraints and check that they override the
+			// other preferences
+			AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, i2), sharingGroup, cc1));
+			AllocatedSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, i1), sharingGroup, cc2));
+			
+			// check that each slot got three
+			assertEquals(3, ((SubSlot) s1).getSharedSlot().getNumberOfAllocatedSubSlots());
+			assertEquals(3, ((SubSlot) s2).getSharedSlot().getNumberOfAllocatedSubSlots());
+			
+			assertEquals(s1.getInstance(), s3.getInstance());
+			assertEquals(s2.getInstance(), s4.getInstance());
+			assertEquals(s1.getInstance(), s5.getInstance());
+			assertEquals(s2.getInstance(), s6.getInstance());
+			
+			// check the scheduler's bookkeeping
+			assertEquals(0, scheduler.getNumberOfAvailableSlots());
+			
+			assertEquals(5, scheduler.getNumberOfLocalizedAssignments());
+			assertEquals(1, scheduler.getNumberOfNonLocalizedAssignments());
+			assertEquals(0, scheduler.getNumberOfUnconstrainedAssignments());
+			
+			// release some slots, be sure that new available ones come up
+			s1.releaseSlot();
+			s2.releaseSlot();
+			s3.releaseSlot();
+			s4.releaseSlot();
+			s5.releaseSlot();
+			s6.releaseSlot();
+			assertEquals(2, scheduler.getNumberOfAvailableSlots());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSlotReleasedInBetween() {
+		try {
+			JobVertexID jid1 = new JobVertexID();
+			JobVertexID jid2 = new JobVertexID();
+			
+			Scheduler scheduler = new Scheduler();
+			
+			Instance i1 = getRandomInstance(1);
+			Instance i2 = getRandomInstance(1);
+			
+			scheduler.newInstanceAvailable(i2);
+			scheduler.newInstanceAvailable(i1);
+			
+			assertEquals(2, scheduler.getNumberOfAvailableSlots());
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup();
+			
+			CoLocationGroup ccg = new CoLocationGroup();
+			CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
+			CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
+
+			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
+			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2));
+			
+			s1.releaseSlot();
+			s2.releaseSlot();
+			
+			assertEquals(2, scheduler.getNumberOfAvailableSlots());
+			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
+
+			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i2), sharingGroup, cc1));
+			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc2));
+			
+			// still preserves the previous instance mapping)
+			assertEquals(i1, s3.getInstance());
+			assertEquals(i2, s4.getInstance());
+			
+			s3.releaseSlot();
+			s4.releaseSlot();
+
+			assertEquals(2, scheduler.getNumberOfAvailableSlots());
+			
+			assertEquals(4, scheduler.getNumberOfLocalizedAssignments());
+			assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
+			assertEquals(0, scheduler.getNumberOfUnconstrainedAssignments());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSlotReleasedInBetweenAndNoNewLocal() {
+		try {
+			JobVertexID jid1 = new JobVertexID();
+			JobVertexID jid2 = new JobVertexID();
+			JobVertexID jidx = new JobVertexID();
+			
+			Scheduler scheduler = new Scheduler();
+			
+			Instance i1 = getRandomInstance(1);
+			Instance i2 = getRandomInstance(1);
+			
+			scheduler.newInstanceAvailable(i2);
+			scheduler.newInstanceAvailable(i1);
+			
+			assertEquals(2, scheduler.getNumberOfAvailableSlots());
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup();
+			
+			CoLocationGroup ccg = new CoLocationGroup();
+			CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
+			CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
+
+			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
+			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2));
+			
+			s1.releaseSlot();
+			s2.releaseSlot();
+			
+			assertEquals(2, scheduler.getNumberOfAvailableSlots());
+			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
+
+			AllocatedSlot sa = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)));
+			AllocatedSlot sb = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)));
+			
+			try {
+				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i2), sharingGroup, cc1));
+				fail("should not be able to find a resource");
+			} catch (NoResourceAvailableException e) {
+				// good
+			} catch (Exception e) {
+				fail("wrong exception");
+			}
+			
+			sa.releaseSlot();
+			sb.releaseSlot();
+
+			assertEquals(2, scheduler.getNumberOfAvailableSlots());
+			
+			assertEquals(2, scheduler.getNumberOfLocalizedAssignments());
+			assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
+			assertEquals(2, scheduler.getNumberOfUnconstrainedAssignments());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testScheduleOutOfOrder() {
+		try {
+			JobVertexID jid1 = new JobVertexID();
+			JobVertexID jid2 = new JobVertexID();
+			
+			Scheduler scheduler = new Scheduler();
+			
+			Instance i1 = getRandomInstance(1);
+			Instance i2 = getRandomInstance(1);
+			
+			scheduler.newInstanceAvailable(i2);
+			scheduler.newInstanceAvailable(i1);
+			
+			assertEquals(2, scheduler.getNumberOfAvailableSlots());
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup();
+			
+			CoLocationGroup ccg = new CoLocationGroup();
+			CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
+			CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
+
+			// schedule something from the second job vertex id before the first is filled,
+			// and give locality preferences that hint at using the same shared slot for both
+			// co location constraints (which we seek to prevent)
+			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
+			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup, cc2));
+
+			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc1));
+			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i1), sharingGroup, cc2));
+			
+			// check that each slot got three
+			assertEquals(2, ((SubSlot) s1).getSharedSlot().getNumberOfAllocatedSubSlots());
+			assertEquals(2, ((SubSlot) s2).getSharedSlot().getNumberOfAllocatedSubSlots());
+			
+			assertEquals(s1.getInstance(), s3.getInstance());
+			assertEquals(s2.getInstance(), s4.getInstance());
+			
+			// check the scheduler's bookkeeping
+			assertEquals(0, scheduler.getNumberOfAvailableSlots());
+			
+			assertEquals(3, scheduler.getNumberOfLocalizedAssignments());
+			assertEquals(1, scheduler.getNumberOfNonLocalizedAssignments());
+			assertEquals(0, scheduler.getNumberOfUnconstrainedAssignments());
+			
+			// release some slots, be sure that new available ones come up
+			s1.releaseSlot();
+			s2.releaseSlot();
+			s3.releaseSlot();
+			s4.releaseSlot();
+			assertEquals(2, scheduler.getNumberOfAvailableSlots());
+			
+			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
+			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid1));
+			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid2));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void nonColocationFollowsCoLocation() {
+		try {
+			JobVertexID jid1 = new JobVertexID();
+			JobVertexID jid2 = new JobVertexID();
+			
+			Scheduler scheduler = new Scheduler();
+			
+			Instance i1 = getRandomInstance(1);
+			Instance i2 = getRandomInstance(1);
+			
+			scheduler.newInstanceAvailable(i2);
+			scheduler.newInstanceAvailable(i1);
+			
+			assertEquals(2, scheduler.getNumberOfAvailableSlots());
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup();
+			
+			CoLocationGroup ccg = new CoLocationGroup();
+			CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
+			CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
+
+			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
+			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2));
+
+			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup));
+			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup));
+			
+			// check that each slot got three
+			assertEquals(2, ((SubSlot) s1).getSharedSlot().getNumberOfAllocatedSubSlots());
+			assertEquals(2, ((SubSlot) s2).getSharedSlot().getNumberOfAllocatedSubSlots());
+			
+			s1.releaseSlot();
+			s2.releaseSlot();
+			s3.releaseSlot();
+			s4.releaseSlot();
+			
+			assertEquals(2, scheduler.getNumberOfAvailableSlots());
+			
+			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
+			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid1));
+			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid2));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index d2e7598..09a416e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -101,6 +101,7 @@ public class SchedulerTestUtils {
 		when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);
 		when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(numTasks);
 		when(vertex.toString()).thenReturn("TEST-VERTEX");
+		when(vertex.getSimpleName()).thenReturn("TEST-VERTEX");
 		
 		Execution execution = mock(Execution.class);
 		when(execution.getVertex()).thenReturn(vertex);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
index de22999..7859b33 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
@@ -19,14 +19,13 @@
 package org.apache.flink.runtime.jobmanager.scheduler;
 
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-
 import static org.junit.Assert.*;
 
 import org.junit.Test;
-
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -37,7 +36,16 @@ public class SharedSlotsTest {
 	public void createAndDoNotRelease() {
 		try {
 			SlotSharingGroupAssignment assignment = mock(SlotSharingGroupAssignment.class);
-			when(assignment.sharedSlotAvailableForJid(any(SharedSlot.class), any(JobVertexID.class), any(boolean.class))).thenReturn(false);
+			doAnswer(new Answer<Void>() {
+				@Override
+				public Void answer(InvocationOnMock invocation) throws Throwable {
+					final SubSlot sub = (SubSlot) invocation.getArguments()[0];
+					final SharedSlot shared = (SharedSlot) invocation.getArguments()[1];
+					shared.releaseSlot(sub);
+					return null;
+				}
+				
+			}).when(assignment).releaseSubSlot(any(SubSlot.class), any(SharedSlot.class));
 			
 			Instance instance = SchedulerTestUtils.getRandomInstance(1);
 			
@@ -77,8 +85,18 @@ public class SharedSlotsTest {
 	public void createAndRelease() {
 		try {
 			SlotSharingGroupAssignment assignment = mock(SlotSharingGroupAssignment.class);
-			when(assignment.sharedSlotAvailableForJid(any(SharedSlot.class), any(JobVertexID.class), eq(false))).thenReturn(false);
-			when(assignment.sharedSlotAvailableForJid(any(SharedSlot.class), any(JobVertexID.class), eq(true))).thenReturn(true);
+			doAnswer(new Answer<Void>() {
+				@Override
+				public Void answer(InvocationOnMock invocation) throws Throwable {
+					final SubSlot sub = (SubSlot) invocation.getArguments()[0];
+					final SharedSlot shared = (SharedSlot) invocation.getArguments()[1];
+					if (shared.releaseSlot(sub) == 0) {
+						shared.dispose();
+					}
+					return null;
+				}
+				
+			}).when(assignment).releaseSubSlot(any(SubSlot.class), any(SharedSlot.class));
 			
 			Instance instance = SchedulerTestUtils.getRandomInstance(1);
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ServerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ServerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ServerTestUtils.java
index 3b4f293..61ac104 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ServerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ServerTestUtils.java
@@ -184,7 +184,7 @@ public final class ServerTestUtils {
 	public static void waitForJobManagerToBecomeReady(final ExtendedManagementProtocol jobManager) throws IOException,
 			InterruptedException {
 
-		while (jobManager.getAvailableSlots() == 0) {
+		while (jobManager.getTotalNumberOfRegisteredSlots() == 0) {
 			Thread.sleep(100);
 		}
 	}