You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/08 15:28:27 UTC

[05/50] [abbrv] flink git commit: [FLINK-4459] [distributed runtime] Introduce SlotProvider for Scheduler

http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index fd0523b..c4121f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -62,10 +62,10 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(i2);
 			
 			// schedule 4 tasks from the first vertex group
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup));
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup));
-			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup));
-			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup));
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup), false).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup), false).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup), false).get();
 			
 			assertNotNull(s1);
 			assertNotNull(s2);
@@ -76,7 +76,7 @@ public class SchedulerSlotSharingTest {
 			
 			// we cannot schedule another task from the first vertex group
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup));
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (NoResourceAvailableException e) {
@@ -90,7 +90,7 @@ public class SchedulerSlotSharingTest {
 			s3.releaseSlot();
 			
 			// allocate another slot from that group
-			SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup));
+			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false).get();
 			assertNotNull(s5);
 			
 			// release all old slots
@@ -98,9 +98,9 @@ public class SchedulerSlotSharingTest {
 			s2.releaseSlot();
 			s4.releaseSlot();
 			
-			SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup));
-			SimpleSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup));
-			SimpleSlot s8 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup));
+			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup), false).get();
+			SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup), false).get();
+			SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup), false).get();
 			
 			assertNotNull(s6);
 			assertNotNull(s7);
@@ -135,7 +135,7 @@ public class SchedulerSlotSharingTest {
 	}
 	
 	@Test
-	public void scheduleImmediatelyWithSharing() {
+	public void allocateSlotWithSharing() {
 		try {
 			JobVertexID jid1 = new JobVertexID();
 			JobVertexID jid2 = new JobVertexID();
@@ -147,10 +147,10 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule 4 tasks from the first vertex group
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup));
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup));
-			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup));
-			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup));
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup), false).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup), false).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup), false).get();
 			
 			assertNotNull(s1);
 			assertNotNull(s2);
@@ -161,7 +161,7 @@ public class SchedulerSlotSharingTest {
 			
 			// we cannot schedule another task from the first vertex group
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup));
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (NoResourceAvailableException e) {
@@ -172,10 +172,10 @@ public class SchedulerSlotSharingTest {
 			}
 			
 			// schedule some tasks from the second ID group
-			SimpleSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup));
-			SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup));
-			SimpleSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup));
-			SimpleSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup));
+			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup), false).get();
+			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup), false).get();
+			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup), false).get();
+			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup), false).get();
 			
 			assertNotNull(s1_2);
 			assertNotNull(s2_2);
@@ -184,7 +184,7 @@ public class SchedulerSlotSharingTest {
 			
 			// we cannot schedule another task from the second vertex group
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup));
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (NoResourceAvailableException e) {
@@ -205,7 +205,7 @@ public class SchedulerSlotSharingTest {
 			
 			// we can still not schedule anything from the second group of vertices
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup));
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (NoResourceAvailableException e) {
@@ -216,7 +216,7 @@ public class SchedulerSlotSharingTest {
 			}
 			
 			// we can schedule something from the first vertex group
-			SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup));
+			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get();
 			assertNotNull(s5);
 			
 			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
@@ -226,7 +226,7 @@ public class SchedulerSlotSharingTest {
 			
 			// now we release a slot from the second vertex group and schedule another task from that group
 			s2_2.releaseSlot();
-			SimpleSlot s5_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup));
+			SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get();
 			assertNotNull(s5_2);
 			
 			// release all slots
@@ -255,7 +255,7 @@ public class SchedulerSlotSharingTest {
 	}
 	
 	@Test
-	public void scheduleImmediatelyWithIntermediateTotallyEmptySharingGroup() {
+	public void allocateSlotWithIntermediateTotallyEmptySharingGroup() {
 		try {
 			JobVertexID jid1 = new JobVertexID();
 			JobVertexID jid2 = new JobVertexID();
@@ -267,10 +267,10 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule 4 tasks from the first vertex group
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
-			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
-			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get();
 			
 			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
@@ -286,10 +286,10 @@ public class SchedulerSlotSharingTest {
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
 			
 			// schedule some tasks from the second ID group
-			SimpleSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup));
-			SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup));
-			SimpleSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup));
-			SimpleSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup));
+			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get();
+			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get();
+			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get();
+			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get();
 
 			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
 			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
@@ -319,7 +319,7 @@ public class SchedulerSlotSharingTest {
 	}
 	
 	@Test
-	public void scheduleImmediatelyWithTemprarilyEmptyVertexGroup() {
+	public void allocateSlotWithTemprarilyEmptyVertexGroup() {
 		try {
 			JobVertexID jid1 = new JobVertexID();
 			JobVertexID jid2 = new JobVertexID();
@@ -332,10 +332,10 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule 4 tasks from the first vertex group
-			SimpleSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
-			SimpleSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
-			SimpleSlot s3_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
-			SimpleSlot s4_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
+			SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get();
+			SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get();
+			SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
+			SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get();
 			
 			assertNotNull(s1_1);
 			assertNotNull(s2_1);
@@ -345,10 +345,10 @@ public class SchedulerSlotSharingTest {
 			assertTrue(areAllDistinct(s1_1, s2_1, s3_1, s4_1));
 			
 			// schedule 4 tasks from the second vertex group
-			SimpleSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup));
-			SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup));
-			SimpleSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup));
-			SimpleSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup));
+			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup), false).get();
+			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup), false).get();
+			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup), false).get();
+			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup), false).get();
 			
 			assertNotNull(s1_2);
 			assertNotNull(s2_2);
@@ -358,10 +358,10 @@ public class SchedulerSlotSharingTest {
 			assertTrue(areAllDistinct(s1_2, s2_2, s3_2, s4_2));
 			
 			// schedule 4 tasks from the third vertex group
-			SimpleSlot s1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup));
-			SimpleSlot s2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup));
-			SimpleSlot s3_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup));
-			SimpleSlot s4_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup));
+			SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup), false).get();
+			SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup), false).get();
+			SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup), false).get();
+			SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup), false).get();
 			
 			assertNotNull(s1_3);
 			assertNotNull(s2_3);
@@ -373,7 +373,7 @@ public class SchedulerSlotSharingTest {
 			
 			// we cannot schedule another task from the second vertex group
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup));
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (NoResourceAvailableException e) {
@@ -389,9 +389,9 @@ public class SchedulerSlotSharingTest {
 			s3_2.releaseSlot();
 			s4_2.releaseSlot();
 			
-			SimpleSlot s5_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup));
-			SimpleSlot s6_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup));
-			SimpleSlot s7_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup));
+			SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup), false).get();
+			SimpleSlot s6_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup), false).get();
+			SimpleSlot s7_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup), false).get();
 			
 			assertNotNull(s5_2);
 			assertNotNull(s6_2);
@@ -430,7 +430,7 @@ public class SchedulerSlotSharingTest {
 	}
 	
 	@Test
-	public void scheduleImmediatelyWithTemprarilyEmptyVertexGroup2() {
+	public void allocateSlotWithTemporarilyEmptyVertexGroup2() {
 		try {
 			JobVertexID jid1 = new JobVertexID();
 			JobVertexID jid2 = new JobVertexID();
@@ -442,9 +442,9 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule 1 tasks from the first vertex group and 2 from the second
-			SimpleSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup));
-			SimpleSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup));
-			SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup));
+			SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup), false).get();
+			SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup), false).get();
+			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup), false).get();
 			
 			assertNotNull(s1_1);
 			assertNotNull(s2_1);
@@ -460,7 +460,7 @@ public class SchedulerSlotSharingTest {
 			
 			
 			// this should free one slot so we can allocate one non-shared
-			SimpleSlot sx = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1)));
+			SimpleSlot sx = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1)), false).get();
 			assertNotNull(sx);
 			
 			assertEquals(1, sharingGroup.getTaskAssignment().getNumberOfSlots());
@@ -495,28 +495,28 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule some individual vertices
-			SimpleSlot sA1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidA, 0, 2)));
-			SimpleSlot sA2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidA, 1, 2)));
+			SimpleSlot sA1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 0, 2)), false).get();
+			SimpleSlot sA2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 1, 2)), false).get();
 			assertNotNull(sA1);
 			assertNotNull(sA2);
 			
 			// schedule some vertices in the sharing group
-			SimpleSlot s1_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
-			SimpleSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
-			SimpleSlot s2_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup));
-			SimpleSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup));
+			SimpleSlot s1_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get();
+			SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get();
+			SimpleSlot s2_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get();
+			SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get();
 			assertNotNull(s1_0);
 			assertNotNull(s1_1);
 			assertNotNull(s2_0);
 			assertNotNull(s2_1);
 			
 			// schedule another isolated vertex
-			SimpleSlot sB1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 1, 3)));
+			SimpleSlot sB1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 1, 3)), false).get();
 			assertNotNull(sB1);
 			
 			// should not be able to schedule more vertices
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (NoResourceAvailableException e) {
@@ -527,7 +527,7 @@ public class SchedulerSlotSharingTest {
 			}
 			
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup));
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (NoResourceAvailableException e) {
@@ -538,7 +538,7 @@ public class SchedulerSlotSharingTest {
 			}
 			
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 0, 3)));
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false);
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (NoResourceAvailableException e) {
@@ -549,7 +549,7 @@ public class SchedulerSlotSharingTest {
 			}
 			
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidC, 0, 1)));
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1)), false);
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (NoResourceAvailableException e) {
@@ -562,8 +562,8 @@ public class SchedulerSlotSharingTest {
 			// release some isolated task and check that the sharing group may grow
 			sA1.releaseSlot();
 			
-			SimpleSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
-			SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup));
+			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
+			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get();
 			assertNotNull(s1_2);
 			assertNotNull(s2_2);
 			
@@ -575,19 +575,19 @@ public class SchedulerSlotSharingTest {
 			assertEquals(1, scheduler.getNumberOfAvailableSlots());
 			
 			// schedule one more no-shared task
-			SimpleSlot sB0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 0, 3)));
+			SimpleSlot sB0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false).get();
 			assertNotNull(sB0);
 			
 			// release the last of the original shared slots and allocate one more non-shared slot
 			s2_1.releaseSlot();
-			SimpleSlot sB2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 2, 3)));
+			SimpleSlot sB2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 2, 3)), false).get();
 			assertNotNull(sB2);
 			
 			
 			// release on non-shared and add some shared slots
 			sA2.releaseSlot();
-			SimpleSlot s1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
-			SimpleSlot s2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup));
+			SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get();
+			SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get();
 			assertNotNull(s1_3);
 			assertNotNull(s2_3);
 			
@@ -597,8 +597,8 @@ public class SchedulerSlotSharingTest {
 			s1_3.releaseSlot();
 			s2_3.releaseSlot();
 			
-			SimpleSlot sC0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidC, 1, 2)));
-			SimpleSlot sC1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidC, 0, 2)));
+			SimpleSlot sC0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 1, 2)), false).get();
+			SimpleSlot sC1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 2)), false).get();
 			assertNotNull(sC0);
 			assertNotNull(sC1);
 			
@@ -646,8 +646,8 @@ public class SchedulerSlotSharingTest {
 			
 			
 			// schedule one to each instance
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup));
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup));
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false).get();
 			assertNotNull(s1);
 			assertNotNull(s2);
 			
@@ -656,8 +656,8 @@ public class SchedulerSlotSharingTest {
 			assertEquals(1, i2.getNumberOfAvailableSlots());
 			
 			// schedule one from the other group to each instance
-			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup));
-			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup));
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false).get();
 			assertNotNull(s3);
 			assertNotNull(s4);
 			
@@ -699,8 +699,8 @@ public class SchedulerSlotSharingTest {
 			
 			
 			// schedule one to each instance
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup));
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup));
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false).get();
 			assertNotNull(s1);
 			assertNotNull(s2);
 			
@@ -709,8 +709,8 @@ public class SchedulerSlotSharingTest {
 			assertEquals(2, i2.getNumberOfAvailableSlots());
 			
 			// schedule one from the other group to each instance
-			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup));
-			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup));
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup), false).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false).get();
 			assertNotNull(s3);
 			assertNotNull(s4);
 			
@@ -750,14 +750,14 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(i2);
 			
 			// schedule until the one instance is full
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup));
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup));
-			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, loc1), sharingGroup));
-			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, loc1), sharingGroup));
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, loc1), sharingGroup), false).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, loc1), sharingGroup), false).get();
 
 			// schedule two more with preference of same instance --> need to go to other instance
-			SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, loc1), sharingGroup));
-			SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, loc1), sharingGroup));
+			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, loc1), sharingGroup), false).get();
+			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, loc1), sharingGroup), false).get();
 			
 			assertNotNull(s1);
 			assertNotNull(s2);
@@ -803,19 +803,19 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(getRandomInstance(4));
 			
 			// allocate something from group 1 and 2 interleaved with schedule for group 3
-			SimpleSlot slot_1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
-			SimpleSlot slot_1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
+			SimpleSlot slot_1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get();
+			SimpleSlot slot_1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get();
 
-			SimpleSlot slot_2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup));
-			SimpleSlot slot_2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup));
+			SimpleSlot slot_2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get();
+			SimpleSlot slot_2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get();
 			
-			SimpleSlot slot_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup));
+			SimpleSlot slot_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false).get();
 			
-			SimpleSlot slot_1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
-			SimpleSlot slot_1_4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
+			SimpleSlot slot_1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
+			SimpleSlot slot_1_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get();
 			
-			SimpleSlot slot_2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup));
-			SimpleSlot slot_2_4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup));
+			SimpleSlot slot_2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get();
+			SimpleSlot slot_2_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get();
 			
 			// release groups 1 and 2
 			
@@ -831,10 +831,10 @@ public class SchedulerSlotSharingTest {
 			
 			// allocate group 4
 			
-			SimpleSlot slot_4_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup));
-			SimpleSlot slot_4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup));
-			SimpleSlot slot_4_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup));
-			SimpleSlot slot_4_4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup));
+			SimpleSlot slot_4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false).get();
+			SimpleSlot slot_4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false).get();
+			SimpleSlot slot_4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false).get();
+			SimpleSlot slot_4_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false).get();
 			
 			// release groups 3 and 4
 			
@@ -885,7 +885,7 @@ public class SchedulerSlotSharingTest {
 					@Override
 					public void run() {
 						try {
-							SimpleSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4), sharingGroup));
+							SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4), sharingGroup), false).get();
 
 							sleepUninterruptibly(rnd.nextInt(5));
 							slot.releaseSlot();
@@ -908,7 +908,7 @@ public class SchedulerSlotSharingTest {
 					public void run() {
 						try {
 							if (flag3.compareAndSet(false, true)) {
-								SimpleSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup));
+								SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false).get();
 								
 								sleepUninterruptibly(5);
 								
@@ -937,7 +937,7 @@ public class SchedulerSlotSharingTest {
 					@Override
 					public void run() {
 						try {
-							SimpleSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4), sharingGroup));
+							SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4), sharingGroup), false).get();
 							
 							// wait a bit till scheduling the successor
 							sleepUninterruptibly(rnd.nextInt(5));
@@ -964,7 +964,7 @@ public class SchedulerSlotSharingTest {
 					@Override
 					public void run() {
 						try {
-							SimpleSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4), sharingGroup));
+							SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4), sharingGroup), false).get();
 							
 							// wait a bit till scheduling the successor
 							sleepUninterruptibly(rnd.nextInt(5));
@@ -1039,27 +1039,27 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(getRandomInstance(4));
 			
 			// schedule one task for the first and second vertex
-			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup));
-			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup));
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup), false).get();
 			
 			assertTrue(  s1.getParent() == s2.getParent() );
 			assertEquals(3, scheduler.getNumberOfAvailableSlots());
 			
-			SimpleSlot s3_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup));
-			SimpleSlot s3_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup));
-			SimpleSlot s4_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup));
-			SimpleSlot s4_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup));
+			SimpleSlot s3_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup), false).get();
+			SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup), false).get();
+			SimpleSlot s4_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false).get();
+			SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false).get();
 			
 			s1.releaseSlot();
 			s2.releaseSlot();
 			
-			SimpleSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup));
-			SimpleSlot s3_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup));
-			SimpleSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup));
-			SimpleSlot s4_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup));
+			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup), false).get();
+			SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup), false).get();
+			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false).get();
+			SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false).get();
 			
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup));
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup), false);
 				fail("should throw an exception");
 			}
 			catch (NoResourceAvailableException e) {