You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/04 21:10:48 UTC

[1/2] flink git commit: [FLINK-4459] [distributed runtime] Introduce SlotProvider for Scheduler

Repository: flink
Updated Branches:
  refs/heads/master 502a79d39 -> 6e40f5901


http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index fd0523b..c4121f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -62,10 +62,10 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(i2);
 			
 			// schedule 4 tasks from the first vertex group
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup));
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup));
-			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup));
-			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup));
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup), false).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup), false).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup), false).get();
 			
 			assertNotNull(s1);
 			assertNotNull(s2);
@@ -76,7 +76,7 @@ public class SchedulerSlotSharingTest {
 			
 			// we cannot schedule another task from the first vertex group
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup));
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (NoResourceAvailableException e) {
@@ -90,7 +90,7 @@ public class SchedulerSlotSharingTest {
 			s3.releaseSlot();
 			
 			// allocate another slot from that group
-			SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup));
+			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false).get();
 			assertNotNull(s5);
 			
 			// release all old slots
@@ -98,9 +98,9 @@ public class SchedulerSlotSharingTest {
 			s2.releaseSlot();
 			s4.releaseSlot();
 			
-			SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup));
-			SimpleSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup));
-			SimpleSlot s8 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup));
+			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup), false).get();
+			SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup), false).get();
+			SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup), false).get();
 			
 			assertNotNull(s6);
 			assertNotNull(s7);
@@ -135,7 +135,7 @@ public class SchedulerSlotSharingTest {
 	}
 	
 	@Test
-	public void scheduleImmediatelyWithSharing() {
+	public void allocateSlotWithSharing() {
 		try {
 			JobVertexID jid1 = new JobVertexID();
 			JobVertexID jid2 = new JobVertexID();
@@ -147,10 +147,10 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule 4 tasks from the first vertex group
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup));
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup));
-			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup));
-			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup));
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup), false).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup), false).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup), false).get();
 			
 			assertNotNull(s1);
 			assertNotNull(s2);
@@ -161,7 +161,7 @@ public class SchedulerSlotSharingTest {
 			
 			// we cannot schedule another task from the first vertex group
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup));
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (NoResourceAvailableException e) {
@@ -172,10 +172,10 @@ public class SchedulerSlotSharingTest {
 			}
 			
 			// schedule some tasks from the second ID group
-			SimpleSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup));
-			SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup));
-			SimpleSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup));
-			SimpleSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup));
+			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup), false).get();
+			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup), false).get();
+			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup), false).get();
+			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup), false).get();
 			
 			assertNotNull(s1_2);
 			assertNotNull(s2_2);
@@ -184,7 +184,7 @@ public class SchedulerSlotSharingTest {
 			
 			// we cannot schedule another task from the second vertex group
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup));
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (NoResourceAvailableException e) {
@@ -205,7 +205,7 @@ public class SchedulerSlotSharingTest {
 			
 			// we can still not schedule anything from the second group of vertices
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup));
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (NoResourceAvailableException e) {
@@ -216,7 +216,7 @@ public class SchedulerSlotSharingTest {
 			}
 			
 			// we can schedule something from the first vertex group
-			SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup));
+			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get();
 			assertNotNull(s5);
 			
 			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
@@ -226,7 +226,7 @@ public class SchedulerSlotSharingTest {
 			
 			// now we release a slot from the second vertex group and schedule another task from that group
 			s2_2.releaseSlot();
-			SimpleSlot s5_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup));
+			SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get();
 			assertNotNull(s5_2);
 			
 			// release all slots
@@ -255,7 +255,7 @@ public class SchedulerSlotSharingTest {
 	}
 	
 	@Test
-	public void scheduleImmediatelyWithIntermediateTotallyEmptySharingGroup() {
+	public void allocateSlotWithIntermediateTotallyEmptySharingGroup() {
 		try {
 			JobVertexID jid1 = new JobVertexID();
 			JobVertexID jid2 = new JobVertexID();
@@ -267,10 +267,10 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule 4 tasks from the first vertex group
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
-			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
-			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get();
 			
 			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
@@ -286,10 +286,10 @@ public class SchedulerSlotSharingTest {
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
 			
 			// schedule some tasks from the second ID group
-			SimpleSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup));
-			SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup));
-			SimpleSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup));
-			SimpleSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup));
+			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get();
+			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get();
+			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get();
+			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get();
 
 			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
 			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
@@ -319,7 +319,7 @@ public class SchedulerSlotSharingTest {
 	}
 	
 	@Test
-	public void scheduleImmediatelyWithTemprarilyEmptyVertexGroup() {
+	public void allocateSlotWithTemprarilyEmptyVertexGroup() {
 		try {
 			JobVertexID jid1 = new JobVertexID();
 			JobVertexID jid2 = new JobVertexID();
@@ -332,10 +332,10 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule 4 tasks from the first vertex group
-			SimpleSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
-			SimpleSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
-			SimpleSlot s3_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
-			SimpleSlot s4_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
+			SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get();
+			SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get();
+			SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
+			SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get();
 			
 			assertNotNull(s1_1);
 			assertNotNull(s2_1);
@@ -345,10 +345,10 @@ public class SchedulerSlotSharingTest {
 			assertTrue(areAllDistinct(s1_1, s2_1, s3_1, s4_1));
 			
 			// schedule 4 tasks from the second vertex group
-			SimpleSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup));
-			SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup));
-			SimpleSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup));
-			SimpleSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup));
+			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup), false).get();
+			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup), false).get();
+			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup), false).get();
+			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup), false).get();
 			
 			assertNotNull(s1_2);
 			assertNotNull(s2_2);
@@ -358,10 +358,10 @@ public class SchedulerSlotSharingTest {
 			assertTrue(areAllDistinct(s1_2, s2_2, s3_2, s4_2));
 			
 			// schedule 4 tasks from the third vertex group
-			SimpleSlot s1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup));
-			SimpleSlot s2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup));
-			SimpleSlot s3_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup));
-			SimpleSlot s4_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup));
+			SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup), false).get();
+			SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup), false).get();
+			SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup), false).get();
+			SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup), false).get();
 			
 			assertNotNull(s1_3);
 			assertNotNull(s2_3);
@@ -373,7 +373,7 @@ public class SchedulerSlotSharingTest {
 			
 			// we cannot schedule another task from the second vertex group
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup));
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (NoResourceAvailableException e) {
@@ -389,9 +389,9 @@ public class SchedulerSlotSharingTest {
 			s3_2.releaseSlot();
 			s4_2.releaseSlot();
 			
-			SimpleSlot s5_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup));
-			SimpleSlot s6_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup));
-			SimpleSlot s7_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup));
+			SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup), false).get();
+			SimpleSlot s6_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup), false).get();
+			SimpleSlot s7_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup), false).get();
 			
 			assertNotNull(s5_2);
 			assertNotNull(s6_2);
@@ -430,7 +430,7 @@ public class SchedulerSlotSharingTest {
 	}
 	
 	@Test
-	public void scheduleImmediatelyWithTemprarilyEmptyVertexGroup2() {
+	public void allocateSlotWithTemporarilyEmptyVertexGroup2() {
 		try {
 			JobVertexID jid1 = new JobVertexID();
 			JobVertexID jid2 = new JobVertexID();
@@ -442,9 +442,9 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule 1 tasks from the first vertex group and 2 from the second
-			SimpleSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup));
-			SimpleSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup));
-			SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup));
+			SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup), false).get();
+			SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup), false).get();
+			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup), false).get();
 			
 			assertNotNull(s1_1);
 			assertNotNull(s2_1);
@@ -460,7 +460,7 @@ public class SchedulerSlotSharingTest {
 			
 			
 			// this should free one slot so we can allocate one non-shared
-			SimpleSlot sx = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1)));
+			SimpleSlot sx = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1)), false).get();
 			assertNotNull(sx);
 			
 			assertEquals(1, sharingGroup.getTaskAssignment().getNumberOfSlots());
@@ -495,28 +495,28 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule some individual vertices
-			SimpleSlot sA1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidA, 0, 2)));
-			SimpleSlot sA2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidA, 1, 2)));
+			SimpleSlot sA1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 0, 2)), false).get();
+			SimpleSlot sA2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 1, 2)), false).get();
 			assertNotNull(sA1);
 			assertNotNull(sA2);
 			
 			// schedule some vertices in the sharing group
-			SimpleSlot s1_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
-			SimpleSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
-			SimpleSlot s2_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup));
-			SimpleSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup));
+			SimpleSlot s1_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get();
+			SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get();
+			SimpleSlot s2_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get();
+			SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get();
 			assertNotNull(s1_0);
 			assertNotNull(s1_1);
 			assertNotNull(s2_0);
 			assertNotNull(s2_1);
 			
 			// schedule another isolated vertex
-			SimpleSlot sB1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 1, 3)));
+			SimpleSlot sB1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 1, 3)), false).get();
 			assertNotNull(sB1);
 			
 			// should not be able to schedule more vertices
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (NoResourceAvailableException e) {
@@ -527,7 +527,7 @@ public class SchedulerSlotSharingTest {
 			}
 			
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup));
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (NoResourceAvailableException e) {
@@ -538,7 +538,7 @@ public class SchedulerSlotSharingTest {
 			}
 			
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 0, 3)));
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false);
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (NoResourceAvailableException e) {
@@ -549,7 +549,7 @@ public class SchedulerSlotSharingTest {
 			}
 			
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidC, 0, 1)));
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1)), false);
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (NoResourceAvailableException e) {
@@ -562,8 +562,8 @@ public class SchedulerSlotSharingTest {
 			// release some isolated task and check that the sharing group may grow
 			sA1.releaseSlot();
 			
-			SimpleSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
-			SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup));
+			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
+			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get();
 			assertNotNull(s1_2);
 			assertNotNull(s2_2);
 			
@@ -575,19 +575,19 @@ public class SchedulerSlotSharingTest {
 			assertEquals(1, scheduler.getNumberOfAvailableSlots());
 			
 			// schedule one more no-shared task
-			SimpleSlot sB0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 0, 3)));
+			SimpleSlot sB0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false).get();
 			assertNotNull(sB0);
 			
 			// release the last of the original shared slots and allocate one more non-shared slot
 			s2_1.releaseSlot();
-			SimpleSlot sB2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 2, 3)));
+			SimpleSlot sB2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 2, 3)), false).get();
 			assertNotNull(sB2);
 			
 			
 			// release on non-shared and add some shared slots
 			sA2.releaseSlot();
-			SimpleSlot s1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
-			SimpleSlot s2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup));
+			SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get();
+			SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get();
 			assertNotNull(s1_3);
 			assertNotNull(s2_3);
 			
@@ -597,8 +597,8 @@ public class SchedulerSlotSharingTest {
 			s1_3.releaseSlot();
 			s2_3.releaseSlot();
 			
-			SimpleSlot sC0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidC, 1, 2)));
-			SimpleSlot sC1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidC, 0, 2)));
+			SimpleSlot sC0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 1, 2)), false).get();
+			SimpleSlot sC1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 2)), false).get();
 			assertNotNull(sC0);
 			assertNotNull(sC1);
 			
@@ -646,8 +646,8 @@ public class SchedulerSlotSharingTest {
 			
 			
 			// schedule one to each instance
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup));
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup));
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false).get();
 			assertNotNull(s1);
 			assertNotNull(s2);
 			
@@ -656,8 +656,8 @@ public class SchedulerSlotSharingTest {
 			assertEquals(1, i2.getNumberOfAvailableSlots());
 			
 			// schedule one from the other group to each instance
-			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup));
-			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup));
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false).get();
 			assertNotNull(s3);
 			assertNotNull(s4);
 			
@@ -699,8 +699,8 @@ public class SchedulerSlotSharingTest {
 			
 			
 			// schedule one to each instance
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup));
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup));
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false).get();
 			assertNotNull(s1);
 			assertNotNull(s2);
 			
@@ -709,8 +709,8 @@ public class SchedulerSlotSharingTest {
 			assertEquals(2, i2.getNumberOfAvailableSlots());
 			
 			// schedule one from the other group to each instance
-			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup));
-			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup));
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup), false).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false).get();
 			assertNotNull(s3);
 			assertNotNull(s4);
 			
@@ -750,14 +750,14 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(i2);
 			
 			// schedule until the one instance is full
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup));
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup));
-			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, loc1), sharingGroup));
-			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, loc1), sharingGroup));
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, loc1), sharingGroup), false).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, loc1), sharingGroup), false).get();
 
 			// schedule two more with preference of same instance --> need to go to other instance
-			SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, loc1), sharingGroup));
-			SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, loc1), sharingGroup));
+			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, loc1), sharingGroup), false).get();
+			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, loc1), sharingGroup), false).get();
 			
 			assertNotNull(s1);
 			assertNotNull(s2);
@@ -803,19 +803,19 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(getRandomInstance(4));
 			
 			// allocate something from group 1 and 2 interleaved with schedule for group 3
-			SimpleSlot slot_1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
-			SimpleSlot slot_1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
+			SimpleSlot slot_1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get();
+			SimpleSlot slot_1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get();
 
-			SimpleSlot slot_2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup));
-			SimpleSlot slot_2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup));
+			SimpleSlot slot_2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get();
+			SimpleSlot slot_2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get();
 			
-			SimpleSlot slot_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup));
+			SimpleSlot slot_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false).get();
 			
-			SimpleSlot slot_1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
-			SimpleSlot slot_1_4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
+			SimpleSlot slot_1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
+			SimpleSlot slot_1_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get();
 			
-			SimpleSlot slot_2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup));
-			SimpleSlot slot_2_4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup));
+			SimpleSlot slot_2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get();
+			SimpleSlot slot_2_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get();
 			
 			// release groups 1 and 2
 			
@@ -831,10 +831,10 @@ public class SchedulerSlotSharingTest {
 			
 			// allocate group 4
 			
-			SimpleSlot slot_4_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup));
-			SimpleSlot slot_4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup));
-			SimpleSlot slot_4_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup));
-			SimpleSlot slot_4_4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup));
+			SimpleSlot slot_4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false).get();
+			SimpleSlot slot_4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false).get();
+			SimpleSlot slot_4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false).get();
+			SimpleSlot slot_4_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false).get();
 			
 			// release groups 3 and 4
 			
@@ -885,7 +885,7 @@ public class SchedulerSlotSharingTest {
 					@Override
 					public void run() {
 						try {
-							SimpleSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4), sharingGroup));
+							SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4), sharingGroup), false).get();
 
 							sleepUninterruptibly(rnd.nextInt(5));
 							slot.releaseSlot();
@@ -908,7 +908,7 @@ public class SchedulerSlotSharingTest {
 					public void run() {
 						try {
 							if (flag3.compareAndSet(false, true)) {
-								SimpleSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup));
+								SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false).get();
 								
 								sleepUninterruptibly(5);
 								
@@ -937,7 +937,7 @@ public class SchedulerSlotSharingTest {
 					@Override
 					public void run() {
 						try {
-							SimpleSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4), sharingGroup));
+							SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4), sharingGroup), false).get();
 							
 							// wait a bit till scheduling the successor
 							sleepUninterruptibly(rnd.nextInt(5));
@@ -964,7 +964,7 @@ public class SchedulerSlotSharingTest {
 					@Override
 					public void run() {
 						try {
-							SimpleSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4), sharingGroup));
+							SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4), sharingGroup), false).get();
 							
 							// wait a bit till scheduling the successor
 							sleepUninterruptibly(rnd.nextInt(5));
@@ -1039,27 +1039,27 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(getRandomInstance(4));
 			
 			// schedule one task for the first and second vertex
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup));
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup));
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup), false).get();
 			
 			assertTrue(  s1.getParent() == s2.getParent() );
 			assertEquals(3, scheduler.getNumberOfAvailableSlots());
 			
-			SimpleSlot s3_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup));
-			SimpleSlot s3_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup));
-			SimpleSlot s4_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup));
-			SimpleSlot s4_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup));
+			SimpleSlot s3_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup), false).get();
+			SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup), false).get();
+			SimpleSlot s4_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false).get();
+			SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false).get();
 			
 			s1.releaseSlot();
 			s2.releaseSlot();
 			
-			SimpleSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup));
-			SimpleSlot s3_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup));
-			SimpleSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup));
-			SimpleSlot s4_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup));
+			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup), false).get();
+			SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup), false).get();
+			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false).get();
+			SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false).get();
 			
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup));
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup), false);
 				fail("should throw an exception");
 			}
 			catch (NoResourceAvailableException e) {


[2/2] flink git commit: [FLINK-4459] [distributed runtime] Introduce SlotProvider for Scheduler

Posted by se...@apache.org.
[FLINK-4459] [distributed runtime] Introduce SlotProvider for Scheduler

This closes #2424


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e40f590
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e40f590
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e40f590

Branch: refs/heads/master
Commit: 6e40f59015dcdb8529691318e8f5a33e831252b8
Parents: 502a79d
Author: Kurt Young <yk...@gmail.com>
Authored: Fri Aug 26 17:51:40 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Sep 4 23:09:59 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |  15 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  32 +-
 .../executiongraph/ExecutionJobVertex.java      |   7 +-
 .../runtime/executiongraph/ExecutionVertex.java |   8 +-
 .../flink/runtime/instance/SlotProvider.java    |  48 +++
 .../runtime/jobmanager/scheduler/Scheduler.java |  27 +-
 .../ExecutionGraphMetricsTest.java              |   8 +-
 .../ExecutionVertexSchedulingTest.java          |  28 +-
 .../TerminalStateDeadlockTest.java              |  27 +-
 .../ScheduleWithCoLocationHintTest.java         | 303 +++++++++++--------
 .../scheduler/SchedulerIsolatedTasksTest.java   |  45 ++-
 .../scheduler/SchedulerSlotSharingTest.java     | 230 +++++++-------
 12 files changed, 430 insertions(+), 348 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 846df49..6826365 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -34,13 +34,13 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -271,15 +271,15 @@ public class Execution {
 	 *       to be scheduled immediately and no resource is available. If the task is accepted by the schedule, any
 	 *       error sets the vertex state to failed and triggers the recovery logic.
 	 * 
-	 * @param scheduler The scheduler to use to schedule this execution attempt.
+	 * @param slotProvider The slot provider to use to allocate slot for this execution attempt.
 	 * @param queued Flag to indicate whether the scheduler may queue this task if it cannot
 	 *               immediately deploy it.
 	 * 
 	 * @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling.
 	 * @throws NoResourceAvailableException Thrown is no queued scheduling is allowed and no resources are currently available.
 	 */
-	public boolean scheduleForExecution(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
-		if (scheduler == null) {
+	public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {
+		if (slotProvider == null) {
 			throw new IllegalArgumentException("Cannot send null Scheduler when scheduling execution.");
 		}
 
@@ -299,9 +299,8 @@ public class Execution {
 
 			// IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned
 			//     in all cases where the deployment failed. we use many try {} finally {} clauses to assure that
+			final SlotAllocationFuture future = slotProvider.allocateSlot(toSchedule, queued);
 			if (queued) {
-				SlotAllocationFuture future = scheduler.scheduleQueued(toSchedule);
-
 				future.setFutureAction(new SlotAllocationFutureAction() {
 					@Override
 					public void slotAllocated(SimpleSlot slot) {
@@ -319,7 +318,7 @@ public class Execution {
 				});
 			}
 			else {
-				SimpleSlot slot = scheduler.scheduleImmediately(toSchedule);
+				SimpleSlot slot = future.get();
 				try {
 					deployToSlot(slot);
 				}
@@ -560,7 +559,7 @@ public class Execution {
 					public Boolean call() throws Exception {
 						try {
 							consumerVertex.scheduleForExecution(
-									consumerVertex.getExecutionGraph().getScheduler(),
+									consumerVertex.getExecutionGraph().getSlotProvider(),
 									consumerVertex.getExecutionGraph().isQueuedSchedulingAllowed());
 						} catch (Throwable t) {
 							consumerVertex.fail(new IllegalStateException("Could not schedule consumer " +

http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 92cab41..585e9f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -47,7 +48,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializableObject;
@@ -197,8 +197,8 @@ public class ExecutionGraph {
 
 	// ------ Fields that are relevant to the execution and need to be cleared before archiving  -------
 
-	/** The scheduler to use for scheduling new tasks as they are needed */
-	private Scheduler scheduler;
+	/** The slot provider to use for allocating slots for tasks as they are needed */
+	private SlotProvider slotProvider;
 
 	/** Strategy to use for restarts */
 	private RestartStrategy restartStrategy;
@@ -470,8 +470,8 @@ public class ExecutionGraph {
 		return jsonPlan;
 	}
 
-	public Scheduler getScheduler() {
-		return scheduler;
+	public SlotProvider getSlotProvider() {
+		return slotProvider;
 	}
 
 	public JobID getJobID() {
@@ -670,17 +670,17 @@ public class ExecutionGraph {
 		}
 	}
 
-	public void scheduleForExecution(Scheduler scheduler) throws JobException {
-		if (scheduler == null) {
+	public void scheduleForExecution(SlotProvider slotProvider) throws JobException {
+		if (slotProvider == null) {
 			throw new IllegalArgumentException("Scheduler must not be null.");
 		}
 
-		if (this.scheduler != null && this.scheduler != scheduler) {
-			throw new IllegalArgumentException("Cannot use different schedulers for the same job");
+		if (this.slotProvider != null && this.slotProvider != slotProvider) {
+			throw new IllegalArgumentException("Cannot use different slot providers for the same job");
 		}
 
 		if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
-			this.scheduler = scheduler;
+			this.slotProvider = slotProvider;
 
 			switch (scheduleMode) {
 
@@ -688,14 +688,14 @@ public class ExecutionGraph {
 					// simply take the vertices without inputs.
 					for (ExecutionJobVertex ejv : this.tasks.values()) {
 						if (ejv.getJobVertex().isInputVertex()) {
-							ejv.scheduleAll(scheduler, allowQueuedScheduling);
+							ejv.scheduleAll(slotProvider, allowQueuedScheduling);
 						}
 					}
 					break;
 
 				case EAGER:
 					for (ExecutionJobVertex ejv : getVerticesTopologically()) {
-						ejv.scheduleAll(scheduler, allowQueuedScheduling);
+						ejv.scheduleAll(slotProvider, allowQueuedScheduling);
 					}
 					break;
 
@@ -850,8 +850,8 @@ public class ExecutionGraph {
 					throw new IllegalStateException("Can only restart job from state restarting.");
 				}
 
-				if (scheduler == null) {
-					throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null.");
+				if (slotProvider == null) {
+					throw new IllegalStateException("The execution graph has not been scheduled before - slotProvider is null.");
 				}
 
 				this.currentExecutions.clear();
@@ -885,7 +885,7 @@ public class ExecutionGraph {
 				}
 			}
 
-			scheduleForExecution(scheduler);
+			scheduleForExecution(slotProvider);
 		}
 		catch (Throwable t) {
 			fail(t);
@@ -917,7 +917,7 @@ public class ExecutionGraph {
 
 		// clear the non-serializable fields
 		restartStrategy = null;
-		scheduler = null;
+		slotProvider = null;
 		checkpointCoordinator = null;
 		executionContext = null;
 		kvStateLocationRegistry = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index d3dc8fe..1ac9522 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -37,7 +38,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.Preconditions;
@@ -289,12 +289,13 @@ public class ExecutionJobVertex {
 	//  Actions
 	//---------------------------------------------------------------------------------------------
 	
-	public void scheduleAll(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
+	public void scheduleAll(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {
+		
 		ExecutionVertex[] vertices = this.taskVertices;
 
 		// kick off the tasks
 		for (ExecutionVertex ev : vertices) {
-			ev.scheduleForExecution(scheduler, queued);
+			ev.scheduleForExecution(slotProvider, queued);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 88e1b88..a8d5ee4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -40,12 +41,11 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.util.SerializedValue;
-
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
+
 import org.slf4j.Logger;
 
 import scala.concurrent.duration.FiniteDuration;
@@ -443,8 +443,8 @@ public class ExecutionVertex {
 		}
 	}
 
-	public boolean scheduleForExecution(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
-		return this.currentExecution.scheduleForExecution(scheduler, queued);
+	public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {
+		return this.currentExecution.scheduleForExecution(slotProvider, queued);
 	}
 
 	public void deployToSlot(SimpleSlot slot) throws JobException {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
new file mode 100644
index 0000000..b2c23a5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
+
+/**
+ * The slot provider is responsible for preparing slots for ready-to-run tasks.
+ * 
+ * <p>It supports two allocating modes:
+ * <ul>
+ *     <li>Immediate allocating: A request for a task slot immediately gets satisfied, we can call
+ *         {@link SlotAllocationFuture#get()} to get the allocated slot.</li>
+ *     <li>Queued allocating: A request for a task slot is queued and returns a future that will be
+ *         fulfilled as soon as a slot becomes available.</li>
+ * </ul>
+ */
+public interface SlotProvider {
+
+	/**
+	 * Allocating slot with specific requirement.
+	 *
+	 * @param task         The task to allocate the slot for
+	 * @param allowQueued  Whether allow the task be queued if we do not have enough resource
+	 * @return The future of the allocation
+	 * 
+	 * @throws NoResourceAvailableException
+	 */
+	SlotAllocationFuture allocateSlot(ScheduledUnit task, boolean allowQueued) throws NoResourceAvailableException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 734972d..c9cdd00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -39,6 +39,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.instance.SharedSlot;
@@ -65,7 +66,7 @@ import scala.concurrent.ExecutionContext;
  *         fulfilled as soon as a slot becomes available.</li>
  * </ul>
  */
-public class Scheduler implements InstanceListener, SlotAvailabilityListener {
+public class Scheduler implements InstanceListener, SlotAvailabilityListener, SlotProvider {
 
 	/** Scheduler-wide logger */
 	private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
@@ -129,30 +130,24 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 	// ------------------------------------------------------------------------
 	//  Scheduling
 	// ------------------------------------------------------------------------
-	
-	public SimpleSlot scheduleImmediately(ScheduledUnit task) throws NoResourceAvailableException {
-		Object ret = scheduleTask(task, false);
-		if (ret instanceof SimpleSlot) {
-			return (SimpleSlot) ret;
-		}
-		else {
-			throw new RuntimeException();
-		}
-	}
-	
-	public SlotAllocationFuture scheduleQueued(ScheduledUnit task) throws NoResourceAvailableException {
-		Object ret = scheduleTask(task, true);
+
+
+	@Override
+	public SlotAllocationFuture allocateSlot(ScheduledUnit task, boolean allowQueued) 
+			throws NoResourceAvailableException {
+
+		final Object ret = scheduleTask(task, allowQueued);
 		if (ret instanceof SimpleSlot) {
 			return new SlotAllocationFuture((SimpleSlot) ret);
 		}
-		if (ret instanceof SlotAllocationFuture) {
+		else if (ret instanceof SlotAllocationFuture) {
 			return (SlotAllocationFuture) ret;
 		}
 		else {
 			throw new RuntimeException();
 		}
 	}
-	
+
 	/**
 	 * Returns either a {@link org.apache.flink.runtime.instance.SimpleSlot}, or a {@link SlotAllocationFuture}.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index d5520fd..aa5925f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Slot;
@@ -70,8 +71,8 @@ import java.util.concurrent.TimeUnit;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+
+import static org.mockito.Mockito.*;
 
 public class ExecutionGraphMetricsTest extends TestLogger {
 
@@ -135,7 +136,8 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 			when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true);
 			when(simpleSlot.getRoot()).thenReturn(rootSlot);
 
-			when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(simpleSlot);
+			when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean()))
+					.thenReturn(new SlotAllocationFuture(simpleSlot));
 
 			
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 5e9ee33..c576ce5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -18,25 +18,29 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.Test;
 
+import org.junit.Test;
 import org.mockito.Matchers;
 
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class ExecutionVertexSchedulingTest {
 
 	@Test
@@ -54,7 +58,8 @@ public class ExecutionVertexSchedulingTest {
 			assertTrue(slot.isReleased());
 
 			Scheduler scheduler = mock(Scheduler.class);
-			when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(slot);
+			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean()))
+				.thenReturn(new SlotAllocationFuture(slot));
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			// try to deploy to the slot
@@ -86,7 +91,7 @@ public class ExecutionVertexSchedulingTest {
 			final SlotAllocationFuture future = new SlotAllocationFuture();
 
 			Scheduler scheduler = mock(Scheduler.class);
-			when(scheduler.scheduleQueued(Matchers.any(ScheduledUnit.class))).thenReturn(future);
+			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			// try to deploy to the slot
@@ -117,7 +122,8 @@ public class ExecutionVertexSchedulingTest {
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			Scheduler scheduler = mock(Scheduler.class);
-			when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(slot);
+			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean()))
+				.thenReturn(new SlotAllocationFuture(slot));
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 
@@ -130,4 +136,4 @@ public class ExecutionVertexSchedulingTest {
 			fail(e.getMessage());
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index 870ae05..4cae7c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -54,12 +55,12 @@ import java.util.concurrent.TimeUnit;
 import static org.junit.Assert.*;
 
 public class TerminalStateDeadlockTest {
-	
+
 	private final Field stateField;
 	private final Field resourceField;
 	private final Field execGraphStateField;
-	private final Field execGraphSchedulerField;
-	
+	private final Field execGraphSlotProviderField;
+
 	private final SimpleSlot resource;
 
 
@@ -75,8 +76,8 @@ public class TerminalStateDeadlockTest {
 			this.execGraphStateField = ExecutionGraph.class.getDeclaredField("state");
 			this.execGraphStateField.setAccessible(true);
 
-			this.execGraphSchedulerField = ExecutionGraph.class.getDeclaredField("scheduler");
-			this.execGraphSchedulerField.setAccessible(true);
+			this.execGraphSlotProviderField = ExecutionGraph.class.getDeclaredField("slotProvider");
+			this.execGraphSlotProviderField.setAccessible(true);
 			
 			// the dummy resource
 			ResourceID resourceId = ResourceID.generate();
@@ -96,11 +97,9 @@ public class TerminalStateDeadlockTest {
 			throw new RuntimeException();
 		}
 	}
-	
-	 
-	
+
 	// ------------------------------------------------------------------------
-	
+
 	@Test
 	public void testProvokeDeadlock() {
 		try {
@@ -135,7 +134,7 @@ public class TerminalStateDeadlockTest {
 				initializeExecution(e2);
 
 				execGraphStateField.set(eg, JobStatus.FAILING);
-				execGraphSchedulerField.set(eg, scheduler);
+				execGraphSlotProviderField.set(eg, scheduler);
 				
 				Runnable r1 = new Runnable() {
 					@Override
@@ -173,12 +172,10 @@ public class TerminalStateDeadlockTest {
 	
 	static class TestExecGraph extends ExecutionGraph {
 
-		private static final long serialVersionUID = -7606144898417942044L;
-		
 		private static final Configuration EMPTY_CONFIG = new Configuration();
 
 		private static final FiniteDuration TIMEOUT = new FiniteDuration(30, TimeUnit.SECONDS);
-		
+
 		private volatile boolean done;
 
 		TestExecGraph(JobID jobId) throws IOException {
@@ -193,14 +190,14 @@ public class TerminalStateDeadlockTest {
 		}
 
 		@Override
-		public void scheduleForExecution(Scheduler scheduler) {
+		public void scheduleForExecution(SlotProvider slotProvider) {
 			// notify that we are done with the "restarting"
 			synchronized (this) {
 				done = true;
 				this.notifyAll();
 			}
 		}
-		
+
 		public void waitTillDone() {
 			try {
 				synchronized (this) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index eab4fea..b803702 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -63,18 +63,18 @@ public class ScheduleWithCoLocationHintTest {
 			CoLocationConstraint c6 = new CoLocationConstraint(ccg);
 
 			// schedule 4 tasks from the first vertex group
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1));
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2));
-			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3));
-			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4));
-			SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1));
-			SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2));
-			SimpleSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3));
-			SimpleSlot s8 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5));
-			SimpleSlot s9 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6));
-			SimpleSlot s10 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4));
-			SimpleSlot s11 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5));
-			SimpleSlot s12 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6));
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2), false).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3), false).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4), false).get();
+			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1), false).get();
+			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2), false).get();
+			SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3), false).get();
+			SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5), false).get();
+			SimpleSlot s9 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6), false).get();
+			SimpleSlot s10 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4), false).get();
+			SimpleSlot s11 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5), false).get();
+			SimpleSlot s12 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6), false).get();
 
 			assertNotNull(s1);
 			assertNotNull(s2);
@@ -109,22 +109,22 @@ public class ScheduleWithCoLocationHintTest {
 			assertEquals(s4.getTaskManagerID(), s10.getTaskManagerID());
 			assertEquals(s8.getTaskManagerID(), s11.getTaskManagerID());
 			assertEquals(s9.getTaskManagerID(), s12.getTaskManagerID());
-			
+
 			assertEquals(c1.getLocation(), s1.getTaskManagerLocation());
 			assertEquals(c2.getLocation(), s2.getTaskManagerLocation());
 			assertEquals(c3.getLocation(), s3.getTaskManagerLocation());
 			assertEquals(c4.getLocation(), s4.getTaskManagerLocation());
 			assertEquals(c5.getLocation(), s8.getTaskManagerLocation());
 			assertEquals(c6.getLocation(), s9.getTaskManagerLocation());
-			
+
 			// check the scheduler's bookkeeping
 			assertEquals(0, scheduler.getNumberOfAvailableSlots());
-			
+
 			// the first assignments are unconstrained, co.-scheduling is constrained
 			assertEquals(6, scheduler.getNumberOfLocalizedAssignments());
 			assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
 			assertEquals(6, scheduler.getNumberOfUnconstrainedAssignments());
-			
+
 			// release some slots, be sure that new available ones come up
 			s1.releaseSlot();
 			s2.releaseSlot();
@@ -135,10 +135,11 @@ public class ScheduleWithCoLocationHintTest {
 			s11.releaseSlot();
 			s12.releaseSlot();
 			assertTrue(scheduler.getNumberOfAvailableSlots() >= 1);
-			
-			SimpleSlot single = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1)));
+
+			SimpleSlot single = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1)), false).get();
 			assertNotNull(single);
-			
+
 			s1.releaseSlot();
 			s2.releaseSlot();
 			s3.releaseSlot();
@@ -149,9 +150,9 @@ public class ScheduleWithCoLocationHintTest {
 			s9.releaseSlot();
 			s11.releaseSlot();
 			s12.releaseSlot();
-			
+
 			assertEquals(5, scheduler.getNumberOfAvailableSlots());
-			
+
 			assertEquals(6, scheduler.getNumberOfLocalizedAssignments());
 			assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
 			assertEquals(7, scheduler.getNumberOfUnconstrainedAssignments());
@@ -161,7 +162,7 @@ public class ScheduleWithCoLocationHintTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void scheduleWithIntermediateRelease() {
 		try {
@@ -169,34 +170,37 @@ public class ScheduleWithCoLocationHintTest {
 			JobVertexID jid2 = new JobVertexID();
 			JobVertexID jid3 = new JobVertexID();
 			JobVertexID jid4 = new JobVertexID();
-			
+
 			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
-			
+
 			Instance i1 = getRandomInstance(1);
 			Instance i2 = getRandomInstance(1);
-			
+
 			scheduler.newInstanceAvailable(i1);
 			scheduler.newInstanceAvailable(i2);
-			
+
 			assertEquals(2, scheduler.getNumberOfAvailableSlots());
-			
+
 			SlotSharingGroup sharingGroup = new SlotSharingGroup();
 			CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup());
-			
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1));
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1));
-			
-			SimpleSlot sSolo = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 1)));
-			
+
+			SimpleSlot s1 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1), false).get();
+
+			SimpleSlot sSolo = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 1)), false).get();
+
 			ResourceID taskManager = s1.getTaskManagerID();
-			
+
 			s1.releaseSlot();
 			s2.releaseSlot();
 			sSolo.releaseSlot();
-			
-			SimpleSlot sNew = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1));
+
+			SimpleSlot sNew = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false).get();
 			assertEquals(taskManager, sNew.getTaskManagerID());
-			
+
 			assertEquals(2, scheduler.getNumberOfLocalizedAssignments());
 			assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
 			assertEquals(2, scheduler.getNumberOfUnconstrainedAssignments());
@@ -206,41 +210,41 @@ public class ScheduleWithCoLocationHintTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void scheduleWithReleaseNoResource() {
 		try {
 			JobVertexID jid1 = new JobVertexID();
 			JobVertexID jid2 = new JobVertexID();
 			JobVertexID jid3 = new JobVertexID();
-			
+
 			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
-			
+
 			Instance i1 = getRandomInstance(1);
 			Instance i2 = getRandomInstance(1);
-			
+
 			scheduler.newInstanceAvailable(i1);
 			scheduler.newInstanceAvailable(i2);
-			
+
 			assertEquals(2, scheduler.getNumberOfAvailableSlots());
-			
+
 			SlotSharingGroup sharingGroup = new SlotSharingGroup();
 			CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup());
-			
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1));
+
+			SimpleSlot s1 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false).get();
 			s1.releaseSlot();
-			
-			scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1)));
-			scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 2)));
-			
-			
+
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1)), false).get();
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2)), false).get();
+
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1));
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false).get();
 				fail("Scheduled even though no resource was available.");
 			} catch (NoResourceAvailableException e) {
 				// expected
 			}
-			
+
 			assertEquals(0, scheduler.getNumberOfLocalizedAssignments());
 			assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
 			assertEquals(3, scheduler.getNumberOfUnconstrainedAssignments());
@@ -250,7 +254,7 @@ public class ScheduleWithCoLocationHintTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void scheduleMixedCoLocationSlotSharing() {
 		try {
@@ -276,27 +280,35 @@ public class ScheduleWithCoLocationHintTest {
 			SlotSharingGroup shareGroup = new SlotSharingGroup();
 
 			// first wave
-			scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), shareGroup));
-			scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), shareGroup));
-			scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), shareGroup));
-			scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), shareGroup));
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), shareGroup), false);
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), shareGroup), false);
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), shareGroup), false);
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), shareGroup), false);
 			
 			// second wave
-			SimpleSlot s21 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1));
-			SimpleSlot s22 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2));
-			SimpleSlot s23 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3));
-			SimpleSlot s24 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4));
+			SimpleSlot s21 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1), false).get();
+			SimpleSlot s22 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2), false).get();
+			SimpleSlot s23 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3), false).get();
+			SimpleSlot s24 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4), false).get();
 			
 			// third wave
-			SimpleSlot s31 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2));
-			SimpleSlot s32 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3));
-			SimpleSlot s33 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4));
-			SimpleSlot s34 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1));
-			
-			scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 4), shareGroup));
-			scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 1, 4), shareGroup));
-			scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 2, 4), shareGroup));
-			scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 3, 4), shareGroup));
+			SimpleSlot s31 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2), false).get();
+			SimpleSlot s32 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3), false).get();
+			SimpleSlot s33 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4), false).get();
+			SimpleSlot s34 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1), false).get();
+			
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), shareGroup), false);
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), shareGroup), false);
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), shareGroup), false);
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), shareGroup), false);
 			
 			assertEquals(s21.getTaskManagerID(), s34.getTaskManagerID());
 			assertEquals(s22.getTaskManagerID(), s31.getTaskManagerID());
@@ -341,20 +353,26 @@ public class ScheduleWithCoLocationHintTest {
 			CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
 
 			// schedule something into the shared group so that both instances are in the sharing group
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup));
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup));
+			SimpleSlot s1 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false).get();
 			
 			// schedule one locally to instance 1
-			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc1));
+			SimpleSlot s3 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc1), false).get();
 
 			// schedule with co location constraint (yet unassigned) and a preference for
 			// instance 1, but it can only get instance 2
-			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2));
+			SimpleSlot s4 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false).get();
 			
 			// schedule something into the assigned co-location constraints and check that they override the
 			// other preferences
-			SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, loc2), sharingGroup, cc1));
-			SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, loc1), sharingGroup, cc2));
+			SimpleSlot s5 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, loc2), sharingGroup, cc1), false).get();
+			SimpleSlot s6 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, loc1), sharingGroup, cc2), false).get();
 			
 			// check that each slot got three
 			assertEquals(3, s1.getRoot().getNumberLeaves());
@@ -386,13 +404,13 @@ public class ScheduleWithCoLocationHintTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testSlotReleasedInBetween() {
 		try {
 			JobVertexID jid1 = new JobVertexID();
 			JobVertexID jid2 = new JobVertexID();
-			
+
 			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 
 			Instance i1 = getRandomInstance(1);
@@ -403,36 +421,40 @@ public class ScheduleWithCoLocationHintTest {
 
 			scheduler.newInstanceAvailable(i2);
 			scheduler.newInstanceAvailable(i1);
-			
+
 			assertEquals(2, scheduler.getNumberOfAvailableSlots());
-			
+
 			SlotSharingGroup sharingGroup = new SlotSharingGroup();
-			
+
 			CoLocationGroup ccg = new CoLocationGroup();
 			CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
 			CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
 
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1));
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2));
-			
+			SimpleSlot s1 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get();
+
 			s1.releaseSlot();
 			s2.releaseSlot();
-			
+
 			assertEquals(2, scheduler.getNumberOfAvailableSlots());
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
 
-			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1));
-			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2));
-			
+			SimpleSlot s3 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false).get();
+			SimpleSlot s4 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false).get();
+
 			// still preserves the previous instance mapping)
 			assertEquals(i1.getTaskManagerID(), s3.getTaskManagerID());
 			assertEquals(i2.getTaskManagerID(), s4.getTaskManagerID());
-			
+
 			s3.releaseSlot();
 			s4.releaseSlot();
 
 			assertEquals(2, scheduler.getNumberOfAvailableSlots());
-			
+
 			assertEquals(4, scheduler.getNumberOfLocalizedAssignments());
 			assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
 			assertEquals(0, scheduler.getNumberOfUnconstrainedAssignments());
@@ -442,14 +464,14 @@ public class ScheduleWithCoLocationHintTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testSlotReleasedInBetweenAndNoNewLocal() {
 		try {
 			JobVertexID jid1 = new JobVertexID();
 			JobVertexID jid2 = new JobVertexID();
 			JobVertexID jidx = new JobVertexID();
-			
+
 			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
 
 			Instance i1 = getRandomInstance(1);
@@ -460,41 +482,46 @@ public class ScheduleWithCoLocationHintTest {
 
 			scheduler.newInstanceAvailable(i2);
 			scheduler.newInstanceAvailable(i1);
-			
+
 			assertEquals(2, scheduler.getNumberOfAvailableSlots());
-			
+
 			SlotSharingGroup sharingGroup = new SlotSharingGroup();
-			
+
 			CoLocationGroup ccg = new CoLocationGroup();
 			CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
 			CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
 
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1));
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2));
-			
+			SimpleSlot s1 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get();
+
 			s1.releaseSlot();
 			s2.releaseSlot();
-			
+
 			assertEquals(2, scheduler.getNumberOfAvailableSlots());
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
 
-			SimpleSlot sa = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)));
-			SimpleSlot sb = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)));
-			
+			SimpleSlot sa = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)), false).get();
+			SimpleSlot sb = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)), false).get();
+
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1));
+				scheduler.allocateSlot(
+						new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false);
 				fail("should not be able to find a resource");
 			} catch (NoResourceAvailableException e) {
 				// good
 			} catch (Exception e) {
 				fail("wrong exception");
 			}
-			
+
 			sa.releaseSlot();
 			sb.releaseSlot();
 
 			assertEquals(2, scheduler.getNumberOfAvailableSlots());
-			
+
 			assertEquals(2, scheduler.getNumberOfLocalizedAssignments());
 			assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
 			assertEquals(2, scheduler.getNumberOfUnconstrainedAssignments());
@@ -504,15 +531,15 @@ public class ScheduleWithCoLocationHintTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testScheduleOutOfOrder() {
 		try {
 			JobVertexID jid1 = new JobVertexID();
 			JobVertexID jid2 = new JobVertexID();
-			
+
 			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
-			
+
 			Instance i1 = getRandomInstance(1);
 			Instance i2 = getRandomInstance(1);
 
@@ -520,11 +547,11 @@ public class ScheduleWithCoLocationHintTest {
 
 			scheduler.newInstanceAvailable(i2);
 			scheduler.newInstanceAvailable(i1);
-			
+
 			assertEquals(2, scheduler.getNumberOfAvailableSlots());
-			
+
 			SlotSharingGroup sharingGroup = new SlotSharingGroup();
-			
+
 			CoLocationGroup ccg = new CoLocationGroup();
 			CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
 			CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
@@ -532,33 +559,37 @@ public class ScheduleWithCoLocationHintTest {
 			// schedule something from the second job vertex id before the first is filled,
 			// and give locality preferences that hint at using the same shared slot for both
 			// co location constraints (which we seek to prevent)
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1));
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc2));
+			SimpleSlot s1 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc2), false).get();
+
+			SimpleSlot s3 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc1), false).get();
+			SimpleSlot s4 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup, cc2), false).get();
 
-			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc1));
-			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup, cc2));
-			
 			// check that each slot got three
 			assertEquals(2, s1.getRoot().getNumberLeaves());
 			assertEquals(2, s2.getRoot().getNumberLeaves());
-			
+
 			assertEquals(s1.getTaskManagerID(), s3.getTaskManagerID());
 			assertEquals(s2.getTaskManagerID(), s4.getTaskManagerID());
-			
+
 			// check the scheduler's bookkeeping
 			assertEquals(0, scheduler.getNumberOfAvailableSlots());
-			
+
 			assertEquals(3, scheduler.getNumberOfLocalizedAssignments());
 			assertEquals(1, scheduler.getNumberOfNonLocalizedAssignments());
 			assertEquals(0, scheduler.getNumberOfUnconstrainedAssignments());
-			
+
 			// release some slots, be sure that new available ones come up
 			s1.releaseSlot();
 			s2.releaseSlot();
 			s3.releaseSlot();
 			s4.releaseSlot();
 			assertEquals(2, scheduler.getNumberOfAvailableSlots());
-			
+
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
@@ -568,15 +599,15 @@ public class ScheduleWithCoLocationHintTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void nonColocationFollowsCoLocation() {
 		try {
 			JobVertexID jid1 = new JobVertexID();
 			JobVertexID jid2 = new JobVertexID();
-			
+
 			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
-			
+
 			Instance i1 = getRandomInstance(1);
 			Instance i2 = getRandomInstance(1);
 
@@ -585,32 +616,36 @@ public class ScheduleWithCoLocationHintTest {
 
 			scheduler.newInstanceAvailable(i2);
 			scheduler.newInstanceAvailable(i1);
-			
+
 			assertEquals(2, scheduler.getNumberOfAvailableSlots());
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup();
-			
+
 			CoLocationGroup ccg = new CoLocationGroup();
 			CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
 			CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
 
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1));
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2));
+			SimpleSlot s1 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get();
+
+			SimpleSlot s3 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false).get();
+			SimpleSlot s4 = scheduler.allocateSlot(
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup), false).get();
 
-			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup));
-			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup));
-			
 			// check that each slot got two
 			assertEquals(2, s1.getRoot().getNumberLeaves());
 			assertEquals(2, s2.getRoot().getNumberLeaves());
-			
+
 			s1.releaseSlot();
 			s2.releaseSlot();
 			s3.releaseSlot();
 			s4.releaseSlot();
-			
+
 			assertEquals(2, scheduler.getNumberOfAvailableSlots());
-			
+
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));

http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index 25498c4..d78f551 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -35,7 +35,6 @@ import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.a
 import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getDummyTask;
 import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
 import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getTestVertex;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -122,17 +121,17 @@ public class SchedulerIsolatedTasksTest {
 			assertEquals(5, scheduler.getNumberOfAvailableSlots());
 			
 			// schedule something into all slots
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
-			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
-			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
-			SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
+			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
 			
 			// the slots should all be different
 			assertTrue(areAllDistinct(s1, s2, s3, s4, s5));
 			
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
+				scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false);
 				fail("Scheduler accepted scheduling request without available resource.");
 			}
 			catch (NoResourceAvailableException e) {
@@ -145,8 +144,8 @@ public class SchedulerIsolatedTasksTest {
 			assertEquals(2, scheduler.getNumberOfAvailableSlots());
 			
 			// now we can schedule some more slots
-			SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
-			SimpleSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
+			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
+			SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
 			
 			assertTrue(areAllDistinct(s1, s2, s3, s4, s5, s6, s7));
 			
@@ -245,7 +244,7 @@ public class SchedulerIsolatedTasksTest {
 			disposeThread.start();
 
 			for (int i = 0; i < NUM_TASKS_TO_SCHEDULE; i++) {
-				SlotAllocationFuture future = scheduler.scheduleQueued(new ScheduledUnit(getDummyTask()));
+				SlotAllocationFuture future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true);
 				future.setFutureAction(action);
 				allAllocatedSlots.add(future);
 			}
@@ -287,11 +286,11 @@ public class SchedulerIsolatedTasksTest {
 			scheduler.newInstanceAvailable(i3);
 			
 			List<SimpleSlot> slots = new ArrayList<SimpleSlot>();
-			slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())));
-			slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())));
-			slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())));
-			slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())));
-			slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())));
+			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
+			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
+			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
+			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
+			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
 			
 			i2.markDead();
 			
@@ -312,7 +311,7 @@ public class SchedulerIsolatedTasksTest {
 			
 			// cannot get another slot, since all instances are dead
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
+				scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
 				fail("Scheduler served a slot from a dead instance");
 			}
 			catch (NoResourceAvailableException e) {
@@ -347,7 +346,7 @@ public class SchedulerIsolatedTasksTest {
 			scheduler.newInstanceAvailable(i3);
 			
 			// schedule something on an arbitrary instance
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(new Instance[0])));
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(new Instance[0])), false).get();
 			
 			// figure out how we use the location hints
 			Instance first = (Instance) s1.getOwner();
@@ -355,28 +354,28 @@ public class SchedulerIsolatedTasksTest {
 			Instance third = first == i3 ? i2 : i3;
 			
 			// something that needs to go to the first instance again
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())));
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())), false).get();
 			assertEquals(first, s2.getOwner());
 
 			// first or second --> second, because first is full
-			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(first, second)));
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, second)), false).get();
 			assertEquals(second, s3.getOwner());
 			
 			// first or third --> third (because first is full)
-			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(first, third)));
-			SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(first, third)));
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get();
+			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get();
 			assertEquals(third, s4.getOwner());
 			assertEquals(third, s5.getOwner());
 			
 			// first or third --> second, because all others are full
-			SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(first, third)));
+			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get();
 			assertEquals(second, s6.getOwner());
 			
 			// release something on the first and second instance
 			s2.releaseSlot();
 			s6.releaseSlot();
 			
-			SimpleSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(first, third)));
+			SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get();
 			assertEquals(first, s7.getOwner());
 			
 			assertEquals(1, scheduler.getNumberOfUnconstrainedAssignments());