You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:13:11 UTC

[47/63] [abbrv] git commit: Add options strict co-location constraints to scheduler

Add options strict co-location constraints to scheduler


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/91871757
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/91871757
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/91871757

Branch: refs/heads/master
Commit: 91871757bebf7404324334d79755e3f117752966
Parents: b3c30ca
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 15 00:39:00 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:02:49 2014 +0200

----------------------------------------------------------------------
 .../scheduler/CoLocationConstraint.java         |  64 ++++++
 .../jobmanager/scheduler/ScheduledUnit.java     |  36 +--
 .../runtime/jobmanager/scheduler/Scheduler.java |  58 ++++-
 .../jobmanager/scheduler/SharedSlot.java        |  27 ++-
 .../runtime/jobmanager/scheduler/SubSlot.java   |  11 +-
 .../ScheduleWithCoLocationHintTest.java         | 225 +++++++++++++++++++
 6 files changed, 400 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/91871757/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
new file mode 100644
index 0000000..26332c8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+public class CoLocationConstraint {
+	
+	private static final AtomicReferenceFieldUpdater<CoLocationConstraint, SharedSlot> UPDATER =
+			AtomicReferenceFieldUpdater.newUpdater(CoLocationConstraint.class, SharedSlot.class, "slot");
+	
+	private volatile SharedSlot slot;
+
+	
+	public boolean isUnassigned() {
+		return slot == null;
+	}
+	
+	public SharedSlot getSlot() {
+		return slot;
+	}
+	
+	public SharedSlot swapInNewSlot(AllocatedSlot newSlot) {
+		SharedSlot newShared = new SharedSlot(newSlot);
+		
+		// atomic swap/release-other to prevent resource leaks
+		while (true) {
+			SharedSlot current = this.slot;
+			if (UPDATER.compareAndSet(this, current, newShared)) {
+				if (current != null) {
+					current.rease();
+				}
+				return newShared;
+			}
+		}
+	}
+	
+	public SubSlot allocateSubSlot(JobVertexID jid) {
+		if (this.slot == null) {
+			throw new IllegalStateException("Location constraint has not yet been assigned a slot.");
+		}
+		
+		return slot.allocateSubSlot(jid);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/91871757/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
index 10190f5..28fd916 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
@@ -21,35 +21,40 @@ package org.apache.flink.runtime.jobmanager.scheduler;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
+import com.google.common.base.Preconditions;
+
 public class ScheduledUnit {
 	
 	private final Execution vertexExecution;
 	
 	private final SlotSharingGroup sharingGroup;
 	
+	private final CoLocationConstraint locationConstraint;
+	
 	// --------------------------------------------------------------------------------------------
 	
-	public ScheduledUnit(Execution taskVertex) {
-		if (taskVertex == null) {
-			throw new NullPointerException();
-		}
+	public ScheduledUnit(Execution task) {
+		Preconditions.checkNotNull(task);
 		
-		this.vertexExecution = taskVertex;
+		this.vertexExecution = task;
 		this.sharingGroup = null;
+		this.locationConstraint = null;
 	}
 	
-	public ScheduledUnit(Execution taskVertex, SlotSharingGroup sharingUnit) {
-		if (taskVertex == null) {
-			throw new NullPointerException();
-		}
+	public ScheduledUnit(Execution task, SlotSharingGroup sharingUnit) {
+		Preconditions.checkNotNull(task);
 		
-		this.vertexExecution = taskVertex;
+		this.vertexExecution = task;
 		this.sharingGroup = sharingUnit;
+		this.locationConstraint = null;
 	}
 	
-	ScheduledUnit() {
-		this.vertexExecution = null;
+	public ScheduledUnit(Execution task, CoLocationConstraint locationConstraint) {
+		Preconditions.checkNotNull(task);
+		
+		this.vertexExecution = task;
 		this.sharingGroup = null;
+		this.locationConstraint = locationConstraint;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -65,11 +70,16 @@ public class ScheduledUnit {
 	public SlotSharingGroup getSlotSharingGroup() {
 		return sharingGroup;
 	}
+	
+	public CoLocationConstraint getLocationConstraint() {
+		return locationConstraint;
+	}
 
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
 	public String toString() {
-		return "{vertex=" + vertexExecution.getVertexWithAttempt() + ", sharingUnit=" + sharingGroup + '}';
+		return "{task=" + vertexExecution.getVertexWithAttempt() + ", sharingUnit=" + sharingGroup + 
+				", locationConstraint=" + locationConstraint + '}';
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/91871757/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index cec8fb7..eb2c0a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ExecutorService;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
@@ -149,13 +148,66 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
 		final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
 	
 		synchronized (globalLock) {
+			
 			// 1)  === If the task has a strict co-schedule hint, obey it ===
-
+			
+			CoLocationConstraint locationConstraint = task.getLocationConstraint();
+			if (locationConstraint != null) {
+				// location constraints can never be scheduled in a queued fashion
+				if (queueIfNoResource) {
+					throw new IllegalArgumentException("A task with a location constraint was scheduled in a queued fashion.");
+				}
+				
+				// since we are inside the global lock scope, we can check, allocate, and assign
+				// in one atomic action. however, slots may die and be deallocated
+				
+				// (a) is the constraint has not yet has a slot, get one
+				if (locationConstraint.isUnassigned()) {
+					// try and get a slot
+					AllocatedSlot newSlot = getFreeSlotForTask(vertex);
+					if (newSlot == null) {
+						throw new NoResourceAvailableException();
+					}
+					SharedSlot sl = locationConstraint.swapInNewSlot(newSlot);
+					SubSlot slot = sl.allocateSubSlot(vertex.getJobvertexId());
+					
+					updateLocalityCounters(newSlot.getLocality());
+					return slot;
+				}
+				else {
+					// try to get a subslot. returns null, if the location's slot has been released
+					// in the meantime
+					SubSlot slot = locationConstraint.allocateSubSlot(vertex.getJobvertexId());
+					if (slot == null) {
+						// get a new slot. at the same instance!!!
+						Instance location = locationConstraint.getSlot().getAllocatedSlot().getInstance();
+						AllocatedSlot newSlot;
+						try {
+							newSlot = location.allocateSlot(vertex.getJobId());
+						} catch (InstanceDiedException e) {
+							throw new NoResourceAvailableException("The instance of the required location died.");
+						}
+						if (newSlot == null) {
+							throw new NoResourceAvailableException();
+						}
+						SharedSlot sharedSlot = locationConstraint.swapInNewSlot(newSlot);
+						slot = sharedSlot.allocateSubSlot(vertex.getJobvertexId());
+					}
+					
+					updateLocalityCounters(Locality.LOCAL);
+					return slot;
+				}
+			}
 		
 			// 2)  === If the task has a slot sharing group, schedule with shared slots ===
 			
 			SlotSharingGroup sharingUnit = task.getSlotSharingGroup();
 			if (sharingUnit != null) {
+				
+				if (queueIfNoResource) {
+					throw new IllegalArgumentException("A task with a vertex sharing group was scheduled in a queued fashion.");
+				}
+				
 				final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
 				
 				AllocatedSlot newSlot = null;
@@ -263,7 +315,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
 					}
 				}
 				
-				if (instanceToUse == null) {					
+				if (instanceToUse == null) {
 					instanceToUse = this.instancesWithAvailableResources.poll();
 					locality = Locality.NON_LOCAL;
 					if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/91871757/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
index 36d8a8b..0f3687a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
@@ -48,6 +48,16 @@ public class SharedSlot {
 		this.subSlots = new HashSet<SubSlot>();
 	}
 	
+	public SharedSlot(AllocatedSlot allocatedSlot) {
+		if (allocatedSlot == null) {
+			throw new NullPointerException();
+		}
+		
+		this.allocatedSlot = allocatedSlot;
+		this.assignmentGroup = null;;
+		this.subSlots = new HashSet<SubSlot>();
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	
 	public AllocatedSlot getAllocatedSlot() {
@@ -76,6 +86,17 @@ public class SharedSlot {
 		}
 	}
 	
+	public void rease() {
+		synchronized (this.subSlots) {
+			disposed = true;
+			for (SubSlot ss : subSlots) {
+				ss.releaseSlot();
+			}
+		}
+		
+		allocatedSlot.releaseSlot();
+	}
+	
 	void returnAllocatedSlot(SubSlot slot) {
 		boolean release;
 		
@@ -84,7 +105,11 @@ public class SharedSlot {
 				throw new IllegalArgumentException("Wrong shared slot for subslot.");
 			}
 			
-			release = assignmentGroup.sharedSlotAvailableForJid(this, slot.getJobVertexId(), this.subSlots.isEmpty());
+			if (assignmentGroup != null) {
+				release = assignmentGroup.sharedSlotAvailableForJid(this, slot.getJobVertexId(), this.subSlots.isEmpty());
+			} else {
+				release = subSlots.isEmpty();
+			}
 			
 			if (release) {
 				disposed = true;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/91871757/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
index 003239d..ca2fb5e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
@@ -45,10 +45,13 @@ public class SubSlot extends AllocatedSlot {
 	public void releaseSlot() {
 		// cancel everything, if there is something. since this is atomically status based,
 		// it will not happen twice if another attempt happened before or concurrently
-		cancel();
-		
-		if (markReleased()) {
-			this.sharedSlot.returnAllocatedSlot(this);
+		try {
+			cancel();
+		}
+		finally {
+			if (markReleased()) {
+				this.sharedSlot.returnAllocatedSlot(this);
+			}
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/91871757/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
new file mode 100644
index 0000000..0ee9346
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
+import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getTestVertex;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Test;
+
+public class ScheduleWithCoLocationHintTest {
+
+	@Test
+	public void schedule() {
+		try {
+			JobVertexID jid1 = new JobVertexID();
+			JobVertexID jid2 = new JobVertexID();
+			
+			Scheduler scheduler = new Scheduler();
+			
+			Instance i1 = getRandomInstance(2);
+			Instance i2 = getRandomInstance(2);
+			Instance i3 = getRandomInstance(2);
+			
+			scheduler.newInstanceAvailable(i1);
+			scheduler.newInstanceAvailable(i2);
+			scheduler.newInstanceAvailable(i3);
+			
+			assertEquals(6, scheduler.getNumberOfAvailableSlots());
+			
+			CoLocationConstraint c1 = new CoLocationConstraint();
+			CoLocationConstraint c2 = new CoLocationConstraint();
+			CoLocationConstraint c3 = new CoLocationConstraint();
+			CoLocationConstraint c4 = new CoLocationConstraint();
+			CoLocationConstraint c5 = new CoLocationConstraint();
+			CoLocationConstraint c6 = new CoLocationConstraint();
+			
+			// schedule 4 tasks from the first vertex group
+			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 6), c1));
+			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 6), c2));
+			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 6), c3));
+			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 6), c4));
+			AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 6), c1));
+			AllocatedSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 6), c2));
+			AllocatedSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 6), c3));
+			AllocatedSlot s8 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 6), c5));
+			AllocatedSlot s9 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 5, 6), c6));
+			AllocatedSlot s10 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 6), c4));
+			AllocatedSlot s11 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 6), c5));
+			AllocatedSlot s12 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 5, 6), c6));
+
+			assertNotNull(s1);
+			assertNotNull(s2);
+			assertNotNull(s3);
+			assertNotNull(s4);
+			assertNotNull(s5);
+			assertNotNull(s6);
+			assertNotNull(s7);
+			assertNotNull(s8);
+			assertNotNull(s9);
+			assertNotNull(s10);
+			assertNotNull(s11);
+			assertNotNull(s12);
+			
+			assertEquals(s1.getInstance(), s5.getInstance());
+			assertEquals(s2.getInstance(), s6.getInstance());
+			assertEquals(s3.getInstance(), s7.getInstance());
+			assertEquals(s4.getInstance(), s10.getInstance());
+			assertEquals(s8.getInstance(), s11.getInstance());
+			assertEquals(s9.getInstance(), s12.getInstance());
+			
+			assertEquals(c1.getSlot().getAllocatedSlot().getInstance(), s1.getInstance());
+			assertEquals(c2.getSlot().getAllocatedSlot().getInstance(), s2.getInstance());
+			assertEquals(c3.getSlot().getAllocatedSlot().getInstance(), s3.getInstance());
+			assertEquals(c4.getSlot().getAllocatedSlot().getInstance(), s4.getInstance());
+			assertEquals(c5.getSlot().getAllocatedSlot().getInstance(), s8.getInstance());
+			assertEquals(c6.getSlot().getAllocatedSlot().getInstance(), s9.getInstance());
+			
+			// check the scheduler's bookkeeping
+			assertEquals(0, scheduler.getNumberOfAvailableSlots());
+			
+			// the first assignments are unconstrained, co.-schedulings are constrained
+			assertEquals(6, scheduler.getNumberOfLocalizedAssignments());
+			assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
+			assertEquals(6, scheduler.getNumberOfUnconstrainedAssignments());
+			
+			// release some slots, be sure that new available ones come up
+			s4.releaseSlot();
+			s10.releaseSlot();
+			assertEquals(1, scheduler.getNumberOfAvailableSlots());
+			
+			AllocatedSlot single = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1)));
+			assertNotNull(single);
+			
+			s1.releaseSlot();
+			s2.releaseSlot();
+			s3.releaseSlot();
+			s5.releaseSlot();
+			s6.releaseSlot();
+			s7.releaseSlot();
+			s8.releaseSlot();
+			s9.releaseSlot();
+			s11.releaseSlot();
+			s12.releaseSlot();
+			
+			assertEquals(5, scheduler.getNumberOfAvailableSlots());
+			
+			assertEquals(6, scheduler.getNumberOfLocalizedAssignments());
+			assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
+			assertEquals(7, scheduler.getNumberOfUnconstrainedAssignments());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void scheduleWithIntermediateRelease() {
+		try {
+			JobVertexID jid1 = new JobVertexID();
+			JobVertexID jid2 = new JobVertexID();
+			JobVertexID jid3 = new JobVertexID();
+			JobVertexID jid4 = new JobVertexID();
+			
+			Scheduler scheduler = new Scheduler();
+			
+			Instance i1 = getRandomInstance(1);
+			Instance i2 = getRandomInstance(1);
+			
+			scheduler.newInstanceAvailable(i1);
+			scheduler.newInstanceAvailable(i2);
+			
+			assertEquals(2, scheduler.getNumberOfAvailableSlots());
+			
+			CoLocationConstraint c1 = new CoLocationConstraint();
+			
+			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), c1));
+			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1), c1));
+			
+			AllocatedSlot sSolo = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 1)));
+			
+			Instance loc = s1.getInstance();
+			
+			s1.releaseSlot();
+			s2.releaseSlot();
+			sSolo.releaseSlot();
+			
+			AllocatedSlot sNew = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), c1));
+			assertEquals(loc, sNew.getInstance());
+			
+			assertEquals(2, scheduler.getNumberOfLocalizedAssignments());
+			assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
+			assertEquals(2, scheduler.getNumberOfUnconstrainedAssignments());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void scheduleWithReleaseNoResource() {
+		try {
+			JobVertexID jid1 = new JobVertexID();
+			JobVertexID jid2 = new JobVertexID();
+			JobVertexID jid3 = new JobVertexID();
+			
+			Scheduler scheduler = new Scheduler();
+			
+			Instance i1 = getRandomInstance(1);
+			Instance i2 = getRandomInstance(1);
+			
+			scheduler.newInstanceAvailable(i1);
+			scheduler.newInstanceAvailable(i2);
+			
+			assertEquals(2, scheduler.getNumberOfAvailableSlots());
+			
+			CoLocationConstraint c1 = new CoLocationConstraint();
+			
+			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), c1));
+			s1.releaseSlot();
+			
+			scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1)));
+			scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 2)));
+			
+			
+			try {
+				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), c1));
+				fail("Scheduled even though no resource was available.");
+			} catch (NoResourceAvailableException e) {
+				// expected
+			}
+			
+			assertEquals(0, scheduler.getNumberOfLocalizedAssignments());
+			assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
+			assertEquals(3, scheduler.getNumberOfUnconstrainedAssignments());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}