You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/02 17:01:13 UTC

[2/4] flink git commit: [FLINK-7153] Introduce LocationPreferenceConstraint for scheduling

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index 9f4a675..2d35ce2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 
 public class ScheduleWithCoLocationHintTest extends TestLogger {
@@ -66,18 +67,18 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			CoLocationConstraint c6 = new CoLocationConstraint(ccg);
 
 			// schedule 4 tasks from the first vertex group
-			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1), false).get();
-			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2), false).get();
-			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3), false).get();
-			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4), false).get();
-			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1), false).get();
-			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2), false).get();
-			SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3), false).get();
-			SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5), false).get();
-			SimpleSlot s9 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6), false).get();
-			SimpleSlot s10 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4), false).get();
-			SimpleSlot s11 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5), false).get();
-			SimpleSlot s12 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6), false).get();
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1), false, Collections.emptyList()).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2), false, Collections.emptyList()).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3), false, Collections.emptyList()).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4), false, Collections.emptyList()).get();
+			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1), false, Collections.emptyList()).get();
+			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2), false, Collections.emptyList()).get();
+			SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3), false, Collections.emptyList()).get();
+			SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5), false, Collections.emptyList()).get();
+			SimpleSlot s9 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6), false, Collections.emptyList()).get();
+			SimpleSlot s10 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4), false, Collections.emptyList()).get();
+			SimpleSlot s11 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5), false, Collections.emptyList()).get();
+			SimpleSlot s12 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6), false, Collections.emptyList()).get();
 
 			assertNotNull(s1);
 			assertNotNull(s2);
@@ -140,7 +141,7 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			assertTrue(scheduler.getNumberOfAvailableSlots() >= 1);
 
 			SimpleSlot single = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1)), false).get();
+					new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1)), false, Collections.emptyList()).get();
 			assertNotNull(single);
 
 			s1.releaseSlot();
@@ -188,11 +189,11 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup());
 
 			SimpleSlot s1 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false).get();
+					new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get();
 			SimpleSlot s2 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1), false).get();
+					new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get();
 
-			SimpleSlot sSolo = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 1)), false).get();
+			SimpleSlot sSolo = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 1)), false, Collections.emptyList()).get();
 
 			ResourceID taskManager = s1.getTaskManagerID();
 
@@ -201,7 +202,7 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			sSolo.releaseSlot();
 
 			SimpleSlot sNew = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false).get();
+					new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get();
 			assertEquals(taskManager, sNew.getTaskManagerID());
 
 			assertEquals(2, scheduler.getNumberOfLocalizedAssignments());
@@ -235,14 +236,14 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup());
 
 			SimpleSlot s1 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false).get();
+					new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get();
 			s1.releaseSlot();
 
-			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1)), false).get();
-			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2)), false).get();
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1)), false, Collections.emptyList()).get();
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2)), false, Collections.emptyList()).get();
 
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get();
 				fail("Scheduled even though no resource was available.");
 			} catch (ExecutionException e) {
 				assertTrue(e.getCause() instanceof NoResourceAvailableException);
@@ -283,35 +284,35 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			SlotSharingGroup shareGroup = new SlotSharingGroup();
 
 			// first wave
-			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), shareGroup), false);
-			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), shareGroup), false);
-			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), shareGroup), false);
-			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), shareGroup), false);
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), shareGroup), false, Collections.emptyList());
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), shareGroup), false, Collections.emptyList());
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), shareGroup), false, Collections.emptyList());
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), shareGroup), false, Collections.emptyList());
 			
 			// second wave
 			SimpleSlot s21 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1), false).get();
+					new ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1), false, Collections.emptyList()).get();
 			SimpleSlot s22 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2), false).get();
+					new ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2), false, Collections.emptyList()).get();
 			SimpleSlot s23 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3), false).get();
+					new ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3), false, Collections.emptyList()).get();
 			SimpleSlot s24 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4), false).get();
+					new ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4), false, Collections.emptyList()).get();
 			
 			// third wave
 			SimpleSlot s31 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2), false).get();
+					new ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2), false, Collections.emptyList()).get();
 			SimpleSlot s32 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3), false).get();
+					new ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3), false, Collections.emptyList()).get();
 			SimpleSlot s33 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4), false).get();
+					new ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4), false, Collections.emptyList()).get();
 			SimpleSlot s34 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1), false).get();
+					new ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1), false, Collections.emptyList()).get();
 			
-			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), shareGroup), false);
-			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), shareGroup), false);
-			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), shareGroup), false);
-			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), shareGroup), false);
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), shareGroup), false, Collections.emptyList());
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), shareGroup), false, Collections.emptyList());
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), shareGroup), false, Collections.emptyList());
+			scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), shareGroup), false, Collections.emptyList());
 			
 			assertEquals(s21.getTaskManagerID(), s34.getTaskManagerID());
 			assertEquals(s22.getTaskManagerID(), s31.getTaskManagerID());
@@ -357,25 +358,25 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 
 			// schedule something into the shared group so that both instances are in the sharing group
 			SimpleSlot s1 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
 			SimpleSlot s2 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
 			
 			// schedule one locally to instance 1
 			SimpleSlot s3 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc1), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get();
 
 			// schedule with co location constraint (yet unassigned) and a preference for
 			// instance 1, but it can only get instance 2
 			SimpleSlot s4 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get();
 			
 			// schedule something into the assigned co-location constraints and check that they override the
 			// other preferences
 			SimpleSlot s5 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, loc2), sharingGroup, cc1), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, loc2), sharingGroup, cc1), false, Collections.singleton(loc2)).get();
 			SimpleSlot s6 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, loc1), sharingGroup, cc2), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get();
 			
 			// check that each slot got three
 			assertEquals(3, s1.getRoot().getNumberLeaves());
@@ -434,9 +435,9 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
 
 			SimpleSlot s1 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get();
 			SimpleSlot s2 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false, Collections.singleton(loc2)).get();
 
 			s1.releaseSlot();
 			s2.releaseSlot();
@@ -445,9 +446,9 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
 
 			SimpleSlot s3 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false, Collections.singleton(loc2)).get();
 			SimpleSlot s4 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get();
 
 			// still preserves the previous instance mapping)
 			assertEquals(i1.getTaskManagerID(), s3.getTaskManagerID());
@@ -495,9 +496,9 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
 
 			SimpleSlot s1 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get();
 			SimpleSlot s2 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false, Collections.singleton(loc2)).get();
 
 			s1.releaseSlot();
 			s2.releaseSlot();
@@ -506,13 +507,13 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
 
 			SimpleSlot sa = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)), false, Collections.emptyList()).get();
 			SimpleSlot sb = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)), false, Collections.emptyList()).get();
 
 			try {
 				scheduler.allocateSlot(
-						new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false).get();
+						new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false, Collections.singleton(loc2)).get();
 				fail("should not be able to find a resource");
 			}
 			catch (ExecutionException e) {
@@ -565,14 +566,14 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			// and give locality preferences that hint at using the same shared slot for both
 			// co location constraints (which we seek to prevent)
 			SimpleSlot s1 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get();
 			SimpleSlot s2 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc2), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get();
 
 			SimpleSlot s3 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc1), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get();
 			SimpleSlot s4 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup, cc2), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get();
 
 			// check that each slot got three
 			assertEquals(2, s1.getRoot().getNumberLeaves());
@@ -631,14 +632,14 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
 			CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
 
 			SimpleSlot s1 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.emptyList()).get();
 			SimpleSlot s2 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false, Collections.emptyList()).get();
 
 			SimpleSlot s3 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false, Collections.emptyList()).get();
 			SimpleSlot s4 = scheduler.allocateSlot(
-					new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup), false).get();
+					new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup), false, Collections.emptyList()).get();
 
 			// check that each slot got two
 			assertEquals(2, s1.getRoot().getNumberLeaves());

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index a05c1a3..7882f4a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -25,6 +25,8 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -123,17 +125,17 @@ public class SchedulerIsolatedTasksTest {
 			assertEquals(5, scheduler.getNumberOfAvailableSlots());
 			
 			// schedule something into all slots
-			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
-			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
-			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
-			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
-			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
+			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
 			
 			// the slots should all be different
 			assertTrue(areAllDistinct(s1, s2, s3, s4, s5));
 			
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
 				fail("Scheduler accepted scheduling request without available resource.");
 			}
 			catch (ExecutionException e) {
@@ -146,8 +148,8 @@ public class SchedulerIsolatedTasksTest {
 			assertEquals(2, scheduler.getNumberOfAvailableSlots());
 			
 			// now we can schedule some more slots
-			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
-			SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
+			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
+			SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
 			
 			assertTrue(areAllDistinct(s1, s2, s3, s4, s5, s6, s7));
 			
@@ -235,7 +237,7 @@ public class SchedulerIsolatedTasksTest {
 			disposeThread.start();
 
 			for (int i = 0; i < NUM_TASKS_TO_SCHEDULE; i++) {
-				CompletableFuture<SimpleSlot> future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true);
+				CompletableFuture<SimpleSlot> future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true, Collections.emptyList());
 				future.thenAcceptAsync(
 					(SimpleSlot slot) -> {
 						synchronized (toRelease) {
@@ -284,11 +286,11 @@ public class SchedulerIsolatedTasksTest {
 			scheduler.newInstanceAvailable(i3);
 			
 			List<SimpleSlot> slots = new ArrayList<SimpleSlot>();
-			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
-			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
-			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
-			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
-			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
+			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get());
+			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get());
+			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get());
+			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get());
+			slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get());
 			
 			i2.markDead();
 			
@@ -309,7 +311,7 @@ public class SchedulerIsolatedTasksTest {
 			
 			// cannot get another slot, since all instances are dead
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
 				fail("Scheduler served a slot from a dead instance");
 			}
 			catch (ExecutionException e) {
@@ -344,7 +346,7 @@ public class SchedulerIsolatedTasksTest {
 			scheduler.newInstanceAvailable(i3);
 			
 			// schedule something on an arbitrary instance
-			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(new Instance[0])), false).get();
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(new Instance[0])), false, Collections.emptyList()).get();
 			
 			// figure out how we use the location hints
 			Instance first = (Instance) s1.getOwner();
@@ -352,28 +354,28 @@ public class SchedulerIsolatedTasksTest {
 			Instance third = first == i3 ? i2 : i3;
 			
 			// something that needs to go to the first instance again
-			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())), false).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())), false, Collections.singleton(s1.getTaskManagerLocation())).get();
 			assertEquals(first, s2.getOwner());
 
 			// first or second --> second, because first is full
-			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, second)), false).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, second)), false, Arrays.asList(first.getTaskManagerLocation(), second.getTaskManagerLocation())).get();
 			assertEquals(second, s3.getOwner());
 			
 			// first or third --> third (because first is full)
-			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get();
-			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
+			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
 			assertEquals(third, s4.getOwner());
 			assertEquals(third, s5.getOwner());
 			
 			// first or third --> second, because all others are full
-			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get();
+			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
 			assertEquals(second, s6.getOwner());
 			
 			// release something on the first and second instance
 			s2.releaseSlot();
 			s6.releaseSlot();
 			
-			SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get();
+			SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
 			assertEquals(first, s7.getOwner());
 			
 			assertEquals(1, scheduler.getNumberOfUnconstrainedAssignments());

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index 1f88dd8..a478eb9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.Random;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -64,10 +65,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			scheduler.newInstanceAvailable(i2);
 			
 			// schedule 4 tasks from the first vertex group
-			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup), false).get();
-			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup), false).get();
-			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup), false).get();
-			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup), false).get();
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup), false, Collections.emptyList()).get();
 			
 			assertNotNull(s1);
 			assertNotNull(s2);
@@ -78,7 +79,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			// we cannot schedule another task from the first vertex group
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -92,7 +93,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			s3.releaseSlot();
 			
 			// allocate another slot from that group
-			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false).get();
+			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false, Collections.emptyList()).get();
 			assertNotNull(s5);
 			
 			// release all old slots
@@ -100,9 +101,9 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			s2.releaseSlot();
 			s4.releaseSlot();
 			
-			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup), false).get();
-			SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup), false).get();
-			SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup), false).get();
+			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup), false, Collections.emptyList()).get();
 			
 			assertNotNull(s6);
 			assertNotNull(s7);
@@ -149,10 +150,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule 4 tasks from the first vertex group
-			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup), false).get();
-			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup), false).get();
-			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup), false).get();
-			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup), false).get();
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup), false, Collections.emptyList()).get();
 			
 			assertNotNull(s1);
 			assertNotNull(s2);
@@ -163,7 +164,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			// we cannot schedule another task from the first vertex group
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -174,10 +175,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			}
 			
 			// schedule some tasks from the second ID group
-			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup), false).get();
-			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup), false).get();
-			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup), false).get();
-			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup), false).get();
+			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup), false, Collections.emptyList()).get();
 			
 			assertNotNull(s1_2);
 			assertNotNull(s2_2);
@@ -186,7 +187,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			// we cannot schedule another task from the second vertex group
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -207,7 +208,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			// we can still not schedule anything from the second group of vertices
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -218,7 +219,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			}
 			
 			// we can schedule something from the first vertex group
-			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get();
+			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false, Collections.emptyList()).get();
 			assertNotNull(s5);
 			
 			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
@@ -228,7 +229,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			// now we release a slot from the second vertex group and schedule another task from that group
 			s2_2.releaseSlot();
-			SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get();
+			SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false, Collections.emptyList()).get();
 			assertNotNull(s5_2);
 			
 			// release all slots
@@ -269,10 +270,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule 4 tasks from the first vertex group
-			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get();
-			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get();
-			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
-			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get();
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get();
 			
 			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
@@ -288,10 +289,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
 			
 			// schedule some tasks from the second ID group
-			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get();
-			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get();
-			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get();
-			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get();
+			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false, Collections.emptyList()).get();
 
 			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
 			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
@@ -334,10 +335,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule 4 tasks from the first vertex group
-			SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get();
-			SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get();
-			SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
-			SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get();
+			SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get();
 			
 			assertNotNull(s1_1);
 			assertNotNull(s2_1);
@@ -347,10 +348,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			assertTrue(areAllDistinct(s1_1, s2_1, s3_1, s4_1));
 			
 			// schedule 4 tasks from the second vertex group
-			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup), false).get();
-			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup), false).get();
-			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup), false).get();
-			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup), false).get();
+			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup), false, Collections.emptyList()).get();
 			
 			assertNotNull(s1_2);
 			assertNotNull(s2_2);
@@ -360,10 +361,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			assertTrue(areAllDistinct(s1_2, s2_2, s3_2, s4_2));
 			
 			// schedule 4 tasks from the third vertex group
-			SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup), false).get();
-			SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup), false).get();
-			SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup), false).get();
-			SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup), false).get();
+			SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup), false, Collections.emptyList()).get();
 			
 			assertNotNull(s1_3);
 			assertNotNull(s2_3);
@@ -375,7 +376,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			// we cannot schedule another task from the second vertex group
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -391,9 +392,9 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			s3_2.releaseSlot();
 			s4_2.releaseSlot();
 			
-			SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup), false).get();
-			SimpleSlot s6_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup), false).get();
-			SimpleSlot s7_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup), false).get();
+			SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s6_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s7_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup), false, Collections.emptyList()).get();
 			
 			assertNotNull(s5_2);
 			assertNotNull(s6_2);
@@ -444,9 +445,9 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule 1 tasks from the first vertex group and 2 from the second
-			SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup), false).get();
-			SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup), false).get();
-			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup), false).get();
+			SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup), false, Collections.emptyList()).get();
 			
 			assertNotNull(s1_1);
 			assertNotNull(s2_1);
@@ -462,7 +463,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			
 			// this should free one slot so we can allocate one non-shared
-			SimpleSlot sx = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1)), false).get();
+			SimpleSlot sx = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1)), false, Collections.emptyList()).get();
 			assertNotNull(sx);
 			
 			assertEquals(1, sharingGroup.getTaskAssignment().getNumberOfSlots());
@@ -497,28 +498,28 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule some individual vertices
-			SimpleSlot sA1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 0, 2)), false).get();
-			SimpleSlot sA2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 1, 2)), false).get();
+			SimpleSlot sA1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 0, 2)), false, Collections.emptyList()).get();
+			SimpleSlot sA2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 1, 2)), false, Collections.emptyList()).get();
 			assertNotNull(sA1);
 			assertNotNull(sA2);
 			
 			// schedule some vertices in the sharing group
-			SimpleSlot s1_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get();
-			SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get();
-			SimpleSlot s2_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get();
-			SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get();
+			SimpleSlot s1_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false, Collections.emptyList()).get();
 			assertNotNull(s1_0);
 			assertNotNull(s1_1);
 			assertNotNull(s2_0);
 			assertNotNull(s2_1);
 			
 			// schedule another isolated vertex
-			SimpleSlot sB1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 1, 3)), false).get();
+			SimpleSlot sB1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 1, 3)), false, Collections.emptyList()).get();
 			assertNotNull(sB1);
 			
 			// should not be able to schedule more vertices
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -529,7 +530,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			}
 			
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -540,7 +541,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			}
 			
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -551,7 +552,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			}
 			
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1)), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1)), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -564,8 +565,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			// release some isolated task and check that the sharing group may grow
 			sA1.releaseSlot();
 			
-			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
-			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get();
+			SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false, Collections.emptyList()).get();
 			assertNotNull(s1_2);
 			assertNotNull(s2_2);
 			
@@ -577,19 +578,19 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			assertEquals(1, scheduler.getNumberOfAvailableSlots());
 			
 			// schedule one more no-shared task
-			SimpleSlot sB0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false).get();
+			SimpleSlot sB0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false, Collections.emptyList()).get();
 			assertNotNull(sB0);
 			
 			// release the last of the original shared slots and allocate one more non-shared slot
 			s2_1.releaseSlot();
-			SimpleSlot sB2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 2, 3)), false).get();
+			SimpleSlot sB2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 2, 3)), false, Collections.emptyList()).get();
 			assertNotNull(sB2);
 			
 			
 			// release on non-shared and add some shared slots
 			sA2.releaseSlot();
-			SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get();
-			SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get();
+			SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get();
 			assertNotNull(s1_3);
 			assertNotNull(s2_3);
 			
@@ -599,8 +600,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			s1_3.releaseSlot();
 			s2_3.releaseSlot();
 			
-			SimpleSlot sC0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 1, 2)), false).get();
-			SimpleSlot sC1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 2)), false).get();
+			SimpleSlot sC0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 1, 2)), false, Collections.emptyList()).get();
+			SimpleSlot sC1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 2)), false, Collections.emptyList()).get();
 			assertNotNull(sC0);
 			assertNotNull(sC1);
 			
@@ -648,8 +649,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			
 			// schedule one to each instance
-			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get();
-			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false).get();
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
 			assertNotNull(s1);
 			assertNotNull(s2);
 			
@@ -658,8 +659,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			assertEquals(1, i2.getNumberOfAvailableSlots());
 			
 			// schedule one from the other group to each instance
-			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false).get();
-			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
 			assertNotNull(s3);
 			assertNotNull(s4);
 			
@@ -701,8 +702,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			
 			// schedule one to each instance
-			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get();
-			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false).get();
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
 			assertNotNull(s1);
 			assertNotNull(s2);
 			
@@ -711,8 +712,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			assertEquals(2, i2.getNumberOfAvailableSlots());
 			
 			// schedule one from the other group to each instance
-			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup), false).get();
-			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
 			assertNotNull(s3);
 			assertNotNull(s4);
 			
@@ -752,14 +753,14 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			scheduler.newInstanceAvailable(i2);
 			
 			// schedule until the one instance is full
-			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get();
-			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false).get();
-			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, loc1), sharingGroup), false).get();
-			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, loc1), sharingGroup), false).get();
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+			SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+			SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
 
 			// schedule two more with preference of same instance --> need to go to other instance
-			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, loc1), sharingGroup), false).get();
-			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, loc1), sharingGroup), false).get();
+			SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+			SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
 			
 			assertNotNull(s1);
 			assertNotNull(s2);
@@ -805,19 +806,19 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			scheduler.newInstanceAvailable(getRandomInstance(4));
 			
 			// allocate something from group 1 and 2 interleaved with schedule for group 3
-			SimpleSlot slot_1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get();
-			SimpleSlot slot_1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get();
+			SimpleSlot slot_1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot slot_1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get();
 
-			SimpleSlot slot_2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get();
-			SimpleSlot slot_2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get();
+			SimpleSlot slot_2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot slot_2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false, Collections.emptyList()).get();
 			
-			SimpleSlot slot_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false).get();
+			SimpleSlot slot_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false, Collections.emptyList()).get();
 			
-			SimpleSlot slot_1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
-			SimpleSlot slot_1_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get();
+			SimpleSlot slot_1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot slot_1_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get();
 			
-			SimpleSlot slot_2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get();
-			SimpleSlot slot_2_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get();
+			SimpleSlot slot_2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot slot_2_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false, Collections.emptyList()).get();
 			
 			// release groups 1 and 2
 			
@@ -833,10 +834,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			// allocate group 4
 			
-			SimpleSlot slot_4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false).get();
-			SimpleSlot slot_4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false).get();
-			SimpleSlot slot_4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false).get();
-			SimpleSlot slot_4_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false).get();
+			SimpleSlot slot_4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot slot_4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot slot_4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot slot_4_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false, Collections.emptyList()).get();
 			
 			// release groups 3 and 4
 			
@@ -887,7 +888,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 					@Override
 					public void run() {
 						try {
-							SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4), sharingGroup), false).get();
+							SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4), sharingGroup), false, Collections.emptyList()).get();
 
 							sleepUninterruptibly(rnd.nextInt(5));
 							slot.releaseSlot();
@@ -910,7 +911,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 					public void run() {
 						try {
 							if (flag3.compareAndSet(false, true)) {
-								SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false).get();
+								SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false, Collections.emptyList()).get();
 								
 								sleepUninterruptibly(5);
 								
@@ -939,7 +940,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 					@Override
 					public void run() {
 						try {
-							SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4), sharingGroup), false).get();
+							SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4), sharingGroup), false, Collections.emptyList()).get();
 							
 							// wait a bit till scheduling the successor
 							sleepUninterruptibly(rnd.nextInt(5));
@@ -966,7 +967,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 					@Override
 					public void run() {
 						try {
-							SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4), sharingGroup), false).get();
+							SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4), sharingGroup), false, Collections.emptyList()).get();
 							
 							// wait a bit till scheduling the successor
 							sleepUninterruptibly(rnd.nextInt(5));
@@ -1041,27 +1042,27 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			scheduler.newInstanceAvailable(getRandomInstance(4));
 			
 			// schedule one task for the first and second vertex
-			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup), false).get();
-			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup), false).get();
+			SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup), false, Collections.emptyList()).get();
 			
 			assertTrue(  s1.getParent() == s2.getParent() );
 			assertEquals(3, scheduler.getNumberOfAvailableSlots());
 			
-			SimpleSlot s3_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup), false).get();
-			SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup), false).get();
-			SimpleSlot s4_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false).get();
-			SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false).get();
+			SimpleSlot s3_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false, Collections.emptyList()).get();
 			
 			s1.releaseSlot();
 			s2.releaseSlot();
 			
-			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup), false).get();
-			SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup), false).get();
-			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false).get();
-			SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false).get();
+			SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+			SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false, Collections.emptyList()).get();
 			
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup), false).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup), false, Collections.emptyList()).get();
 				fail("should throw an exception");
 			}
 			catch (ExecutionException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index c7d0f09..98dca03 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -106,20 +107,26 @@ public class SchedulerTestUtils {
 	
 	
 	public static Execution getTestVertex(Iterable<TaskManagerLocation> preferredLocations) {
-		ExecutionVertex vertex = mock(ExecutionVertex.class);
-
 		Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = new ArrayList<>(4);
 
 		for (TaskManagerLocation preferredLocation : preferredLocations) {
 			preferredLocationFutures.add(CompletableFuture.completedFuture(preferredLocation));
 		}
+
+		return getTestVertex(preferredLocationFutures);
+	}
+
+	public static Execution getTestVertex(Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures) {
+		ExecutionVertex vertex = mock(ExecutionVertex.class);
+
 		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocationFutures);
 		when(vertex.getJobId()).thenReturn(new JobID());
 		when(vertex.toString()).thenReturn("TEST-VERTEX");
-		
+
 		Execution execution = mock(Execution.class);
 		when(execution.getVertex()).thenReturn(vertex);
-		
+		when(execution.calculatePreferredLocations(any(LocationPreferenceConstraint.class))).thenCallRealMethod();
+
 		return execution;
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java
new file mode 100644
index 0000000..60dddbb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+import java.net.InetAddress;
+
+/**
+ * Dummy local task manager location for testing purposes.
+ */
+public class LocalTaskManagerLocation extends TaskManagerLocation {
+
+	private static final long serialVersionUID = 2396142513336559461L;
+
+	public LocalTaskManagerLocation() {
+		super(ResourceID.generate(), InetAddress.getLoopbackAddress(), -1);
+	}
+}