You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 16:42:21 UTC

[04/11] flink git commit: [FLINK-7956] [flip6] Add support for queued scheduling with slot sharing to SlotPool

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index 41a7f02..025c795 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -18,14 +18,15 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.LogicalSlot;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.util.Collections;
 import java.util.Random;
@@ -49,7 +50,12 @@ import static org.junit.Assert.fail;
 /**
  * Tests for the scheduler when scheduling tasks in slot sharing groups.
  */
-public class SchedulerSlotSharingTest extends TestLogger {
+@RunWith(Parameterized.class)
+public class SchedulerSlotSharingTest extends SchedulerTestBase {
+
+	public SchedulerSlotSharingTest(SchedulerType schedulerType) {
+		super(schedulerType);
+	}
 
 	@Test
 	public void scheduleSingleVertexType() {
@@ -57,18 +63,15 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			JobVertexID jid1 = new JobVertexID();
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1);
-			
-			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
-			Instance i1 = getRandomInstance(2);
-			Instance i2 = getRandomInstance(2);
-			scheduler.newInstanceAvailable(i1);
-			scheduler.newInstanceAvailable(i2);
+
+			final ResourceID tm1ResourceId = testingSlotProvider.addTaskManager(2).getResourceID();
+			testingSlotProvider.addTaskManager(2);
 			
 			// schedule 4 tasks from the first vertex group
-			LogicalSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup), false, Collections.emptyList()).get();
+			LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 			
 			assertNotNull(s1);
 			assertNotNull(s2);
@@ -79,7 +82,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			// we cannot schedule another task from the first vertex group
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false, Collections.emptyList()).get();
+				testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -93,7 +96,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			s3.releaseSlot();
 			
 			// allocate another slot from that group
-			LogicalSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false, Collections.emptyList()).get();
+			LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 			assertNotNull(s5);
 			
 			// release all old slots
@@ -101,9 +104,9 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			s2.releaseSlot();
 			s4.releaseSlot();
 			
-			LogicalSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup), false, Collections.emptyList()).get();
+			LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s7 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 6, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s8 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 7, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 			
 			assertNotNull(s6);
 			assertNotNull(s7);
@@ -111,10 +114,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			// make sure we have two slots on the first instance, and two on the second
 			int c = 0;
-			c += (s5.getTaskManagerLocation().equals(i1.getTaskManagerLocation())) ? 1 : -1;
-			c += (s6.getTaskManagerLocation().equals(i1.getTaskManagerLocation())) ? 1 : -1;
-			c += (s7.getTaskManagerLocation().equals(i1.getTaskManagerLocation())) ? 1 : -1;
-			c += (s8.getTaskManagerLocation().equals(i1.getTaskManagerLocation())) ? 1 : -1;
+			c += (s5.getTaskManagerLocation().getResourceID().equals(tm1ResourceId)) ? 1 : -1;
+			c += (s6.getTaskManagerLocation().getResourceID().equals(tm1ResourceId)) ? 1 : -1;
+			c += (s7.getTaskManagerLocation().getResourceID().equals(tm1ResourceId)) ? 1 : -1;
+			c += (s8.getTaskManagerLocation().getResourceID().equals(tm1ResourceId)) ? 1 : -1;
 			assertEquals(0, c);
 			
 			// release all
@@ -124,12 +127,12 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			s8.releaseSlot();
 			
 			// test that everything is released
-			assertEquals(4, scheduler.getNumberOfAvailableSlots());
+			assertEquals(4, testingSlotProvider.getNumberOfAvailableSlots());
 
 			// check the scheduler's bookkeeping
-			assertEquals(0, scheduler.getNumberOfLocalizedAssignments());
-			assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
-			assertEquals(8, scheduler.getNumberOfUnconstrainedAssignments());
+			assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
+			assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
+			assertEquals(8, testingSlotProvider.getNumberOfUnconstrainedAssignments());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -138,123 +141,116 @@ public class SchedulerSlotSharingTest extends TestLogger {
 	}
 	
 	@Test
-	public void allocateSlotWithSharing() {
+	public void allocateSlotWithSharing() throws Exception {
+		JobVertexID jid1 = new JobVertexID();
+		JobVertexID jid2 = new JobVertexID();
+
+		SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
+
+		testingSlotProvider.addTaskManager(2);
+		testingSlotProvider.addTaskManager(2);
+
+		// schedule 4 tasks from the first vertex group
+		LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+		LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+		LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+		LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+
+		assertNotNull(s1);
+		assertNotNull(s2);
+		assertNotNull(s3);
+		assertNotNull(s4);
+
+		assertTrue(areAllDistinct(s1, s2, s3, s4));
+
+		// we cannot schedule another task from the first vertex group
 		try {
-			JobVertexID jid1 = new JobVertexID();
-			JobVertexID jid2 = new JobVertexID();
-			
-			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
-			
-			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
-			scheduler.newInstanceAvailable(getRandomInstance(2));
-			scheduler.newInstanceAvailable(getRandomInstance(2));
-			
-			// schedule 4 tasks from the first vertex group
-			LogicalSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup), false, Collections.emptyList()).get();
-			
-			assertNotNull(s1);
-			assertNotNull(s2);
-			assertNotNull(s3);
-			assertNotNull(s4);
-			
-			assertTrue(areAllDistinct(s1, s2, s3, s4));
-			
-			// we cannot schedule another task from the first vertex group
-			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false, Collections.emptyList()).get();
-				fail("Scheduler accepted too many tasks at the same time");
-			}
-			catch (ExecutionException e) {
-				assertTrue(e.getCause() instanceof NoResourceAvailableException);
-			}
-			catch (Exception e) {
-				fail("Wrong exception.");
-			}
-			
-			// schedule some tasks from the second ID group
-			LogicalSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup), false, Collections.emptyList()).get();
-			
-			assertNotNull(s1_2);
-			assertNotNull(s2_2);
-			assertNotNull(s3_2);
-			assertNotNull(s4_2);
-			
-			// we cannot schedule another task from the second vertex group
-			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false, Collections.emptyList()).get();
-				fail("Scheduler accepted too many tasks at the same time");
-			}
-			catch (ExecutionException e) {
-				assertTrue(e.getCause() instanceof NoResourceAvailableException);
-			}
-			catch (Exception e) {
-				fail("Wrong exception.");
-			}
-			
-			// now, we release some vertices (sub-slots) from the first group.
-			// that should allow us to schedule more vertices from the first group
-			s1.releaseSlot();
-			s4.releaseSlot();
-			
-			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
-			assertEquals(2, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
-			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
-			
-			// we can still not schedule anything from the second group of vertices
-			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false, Collections.emptyList()).get();
-				fail("Scheduler accepted too many tasks at the same time");
-			}
-			catch (ExecutionException e) {
-				assertTrue(e.getCause() instanceof NoResourceAvailableException);
-			}
-			catch (Exception e) {
-				fail("Wrong exception.");
-			}
-			
-			// we can schedule something from the first vertex group
-			LogicalSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false, Collections.emptyList()).get();
-			assertNotNull(s5);
-			
-			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
-			assertEquals(1, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
-			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
-			
-			
-			// now we release a slot from the second vertex group and schedule another task from that group
-			s2_2.releaseSlot();
-			LogicalSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false, Collections.emptyList()).get();
-			assertNotNull(s5_2);
-			
-			// release all slots
-			s2.releaseSlot();
-			s3.releaseSlot();
-			s5.releaseSlot();
-			
-			s1_2.releaseSlot();
-			s3_2.releaseSlot();
-			s4_2.releaseSlot();
-			s5_2.releaseSlot();
-			
-			// test that everything is released
-			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
-			assertEquals(4, scheduler.getNumberOfAvailableSlots());
-			
-			// check the scheduler's bookkeeping
-			assertEquals(0, scheduler.getNumberOfLocalizedAssignments());
-			assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
-			assertEquals(10, scheduler.getNumberOfUnconstrainedAssignments());
+			testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			fail("Scheduler accepted too many tasks at the same time");
+		}
+		catch (ExecutionException e) {
+			assertTrue(e.getCause() instanceof NoResourceAvailableException);
 		}
 		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
+			fail("Wrong exception.");
+		}
+
+		// schedule some tasks from the second ID group
+		LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+		LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+		LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+		LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+
+		assertNotNull(s1_2);
+		assertNotNull(s2_2);
+		assertNotNull(s3_2);
+		assertNotNull(s4_2);
+
+		// we cannot schedule another task from the second vertex group
+		try {
+			testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			fail("Scheduler accepted too many tasks at the same time");
+		}
+		catch (ExecutionException e) {
+			assertTrue(e.getCause() instanceof NoResourceAvailableException);
+		}
+		catch (Exception e) {
+			fail("Wrong exception.");
+		}
+
+		// now, we release some vertices (sub-slots) from the first group.
+		// that should allow us to schedule more vertices from the first group
+		s1.releaseSlot();
+		s4.releaseSlot();
+
+		assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
+		assertEquals(2, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
+		assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
+
+		// we can still not schedule anything from the second group of vertices
+		try {
+			testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			fail("Scheduler accepted too many tasks at the same time");
 		}
+		catch (ExecutionException e) {
+			assertTrue(e.getCause() instanceof NoResourceAvailableException);
+		}
+		catch (Exception e) {
+			fail("Wrong exception.");
+		}
+
+		// we can schedule something from the first vertex group
+		LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+		assertNotNull(s5);
+
+		assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
+		assertEquals(1, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
+		assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
+
+
+		// now we release a slot from the second vertex group and schedule another task from that group
+		s2_2.releaseSlot();
+		LogicalSlot s5_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+		assertNotNull(s5_2);
+
+		// release all slots
+		s2.releaseSlot();
+		s3.releaseSlot();
+		s5.releaseSlot();
+
+		s1_2.releaseSlot();
+		s3_2.releaseSlot();
+		s4_2.releaseSlot();
+		s5_2.releaseSlot();
+
+		// test that everything is released
+		assertEquals(0, testingSlotProvider.getNumberOfSlots(sharingGroup));
+		assertEquals(4, testingSlotProvider.getNumberOfAvailableSlots());
+
+		// check the scheduler's bookkeeping
+		assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
+		assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
+		assertEquals(10, testingSlotProvider.getNumberOfUnconstrainedAssignments());
 	}
 	
 	@Test
@@ -264,56 +260,55 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			JobVertexID jid2 = new JobVertexID();
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
-			
-			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
-			scheduler.newInstanceAvailable(getRandomInstance(2));
-			scheduler.newInstanceAvailable(getRandomInstance(2));
-			
+
+			testingSlotProvider.addTaskManager(2);
+			testingSlotProvider.addTaskManager(2);
+
 			// schedule 4 tasks from the first vertex group
-			LogicalSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get();
+			LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 			
-			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
-			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
-			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
+			assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
+			assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
+			assertEquals(4, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
 			
 			s1.releaseSlot();
 			s2.releaseSlot();
 			s3.releaseSlot();
 			s4.releaseSlot();
 			
-			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
-			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
-			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
+			assertEquals(0, testingSlotProvider.getNumberOfSlots(sharingGroup));
+			assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
+			assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
 			
 			// schedule some tasks from the second ID group
-			LogicalSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false, Collections.emptyList()).get();
+			LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 
-			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
-			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
-			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
+			assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
+			assertEquals(4, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
+			assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
 			
 			s1_2.releaseSlot();
 			s2_2.releaseSlot();
 			s3_2.releaseSlot();
 			s4_2.releaseSlot();
 			
-			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
-			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
-			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
+			assertEquals(0, testingSlotProvider.getNumberOfSlots(sharingGroup));
+			assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
+			assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
 
 			// test that everything is released
-			assertEquals(4, scheduler.getNumberOfAvailableSlots());
+			assertEquals(4, testingSlotProvider.getNumberOfAvailableSlots());
 			
 			// check the scheduler's bookkeeping
-			assertEquals(0, scheduler.getNumberOfLocalizedAssignments());
-			assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
-			assertEquals(8, scheduler.getNumberOfUnconstrainedAssignments());
+			assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
+			assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
+			assertEquals(8, testingSlotProvider.getNumberOfUnconstrainedAssignments());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -329,16 +324,15 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			JobVertexID jid3 = new JobVertexID();
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3);
-			
-			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
-			scheduler.newInstanceAvailable(getRandomInstance(2));
-			scheduler.newInstanceAvailable(getRandomInstance(2));
-			
+
+			testingSlotProvider.addTaskManager(2);
+			testingSlotProvider.addTaskManager(2);
+
 			// schedule 4 tasks from the first vertex group
-			LogicalSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get();
+			LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s3_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s4_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 			
 			assertNotNull(s1_1);
 			assertNotNull(s2_1);
@@ -348,10 +342,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			assertTrue(areAllDistinct(s1_1, s2_1, s3_1, s4_1));
 			
 			// schedule 4 tasks from the second vertex group
-			LogicalSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup), false, Collections.emptyList()).get();
+			LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 			
 			assertNotNull(s1_2);
 			assertNotNull(s2_2);
@@ -361,10 +355,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			assertTrue(areAllDistinct(s1_2, s2_2, s3_2, s4_2));
 			
 			// schedule 4 tasks from the third vertex group
-			LogicalSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup), false, Collections.emptyList()).get();
+			LogicalSlot s1_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s2_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s3_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s4_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 			
 			assertNotNull(s1_3);
 			assertNotNull(s2_3);
@@ -376,7 +370,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			// we cannot schedule another task from the second vertex group
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false, Collections.emptyList()).get();
+				testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -392,9 +386,9 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			s3_2.releaseSlot();
 			s4_2.releaseSlot();
 			
-			LogicalSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s6_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s7_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup), false, Collections.emptyList()).get();
+			LogicalSlot s5_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s6_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 6, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s7_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 7, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 			
 			assertNotNull(s5_2);
 			assertNotNull(s6_2);
@@ -411,7 +405,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			s7_2.releaseSlot();
 			
 			// test that everything is released
-			assertEquals(0, scheduler.getNumberOfAvailableSlots());
+			assertEquals(0, testingSlotProvider.getNumberOfAvailableSlots());
 			
 			s1_3.releaseSlot();
 			s2_3.releaseSlot();
@@ -419,12 +413,12 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			s4_3.releaseSlot();
 			
 			// test that everything is released
-			assertEquals(4, scheduler.getNumberOfAvailableSlots());
+			assertEquals(4, testingSlotProvider.getNumberOfAvailableSlots());
 			
 			// check the scheduler's bookkeeping
-			assertEquals(0, scheduler.getNumberOfLocalizedAssignments());
-			assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
-			assertEquals(15, scheduler.getNumberOfUnconstrainedAssignments());
+			assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
+			assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
+			assertEquals(15, testingSlotProvider.getNumberOfUnconstrainedAssignments());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -440,22 +434,21 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			JobVertexID jid3 = new JobVertexID();
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
-			
-			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
-			scheduler.newInstanceAvailable(getRandomInstance(2));
-			
+
+			testingSlotProvider.addTaskManager(2);
+
 			// schedule 1 tasks from the first vertex group and 2 from the second
-			LogicalSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup), false, Collections.emptyList()).get();
+			LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 2, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 2, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 			
 			assertNotNull(s1_1);
 			assertNotNull(s2_1);
 			assertNotNull(s2_2);
 			
-			assertEquals(2, sharingGroup.getTaskAssignment().getNumberOfSlots());
-			assertEquals(1, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
-			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
+			assertEquals(2, testingSlotProvider.getNumberOfSlots(sharingGroup));
+			assertEquals(1, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
+			assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
 			
 			// release the two from the second
 			s2_1.releaseSlot();
@@ -463,17 +456,17 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			
 			// this should free one slot so we can allocate one non-shared
-			LogicalSlot sx = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1)), false, Collections.emptyList()).get();
+			LogicalSlot sx = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, null)), false, Collections.emptyList()).get();
 			assertNotNull(sx);
 			
-			assertEquals(1, sharingGroup.getTaskAssignment().getNumberOfSlots());
-			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
-			assertEquals(1, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
+			assertEquals(1, testingSlotProvider.getNumberOfSlots(sharingGroup));
+			assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
+			assertEquals(1, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
 			
 			// check the scheduler's bookkeeping
-			assertEquals(0, scheduler.getNumberOfLocalizedAssignments());
-			assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
-			assertEquals(4, scheduler.getNumberOfUnconstrainedAssignments());
+			assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
+			assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
+			assertEquals(4, testingSlotProvider.getNumberOfUnconstrainedAssignments());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -492,34 +485,33 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			JobVertexID jidC = new JobVertexID();
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
-			
-			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
-			scheduler.newInstanceAvailable(getRandomInstance(3));
-			scheduler.newInstanceAvailable(getRandomInstance(2));
-			
+
+			testingSlotProvider.addTaskManager(3);
+			testingSlotProvider.addTaskManager(2);
+
 			// schedule some individual vertices
-			LogicalSlot sA1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 0, 2)), false, Collections.emptyList()).get();
-			LogicalSlot sA2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 1, 2)), false, Collections.emptyList()).get();
+			LogicalSlot sA2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 1, 2, null)), false, Collections.emptyList()).get();
+			LogicalSlot sA1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 0, 2, null)), false, Collections.emptyList()).get();
 			assertNotNull(sA1);
 			assertNotNull(sA2);
 			
 			// schedule some vertices in the sharing group
-			LogicalSlot s1_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s2_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+			LogicalSlot s1_0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s2_0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 			assertNotNull(s1_0);
 			assertNotNull(s1_1);
 			assertNotNull(s2_0);
 			assertNotNull(s2_1);
 			
 			// schedule another isolated vertex
-			LogicalSlot sB1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 1, 3)), false, Collections.emptyList()).get();
+			LogicalSlot sB1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 1, 3, null)), false, Collections.emptyList()).get();
 			assertNotNull(sB1);
 			
 			// should not be able to schedule more vertices
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+				testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -530,7 +522,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			}
 			
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+				testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -541,7 +533,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			}
 			
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false, Collections.emptyList()).get();
+				testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3, null)), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -552,7 +544,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			}
 			
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1)), false, Collections.emptyList()).get();
+				testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1, null)), false, Collections.emptyList()).get();
 				fail("Scheduler accepted too many tasks at the same time");
 			}
 			catch (ExecutionException e) {
@@ -565,8 +557,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			// release some isolated task and check that the sharing group may grow
 			sA1.releaseSlot();
 			
-			LogicalSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false, Collections.emptyList()).get();
+			LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 			assertNotNull(s1_2);
 			assertNotNull(s2_2);
 			
@@ -575,22 +567,22 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			s1_1.releaseSlot();
 			s2_0.releaseSlot();
 			
-			assertEquals(1, scheduler.getNumberOfAvailableSlots());
+			assertEquals(1, testingSlotProvider.getNumberOfAvailableSlots());
 			
 			// schedule one more no-shared task
-			LogicalSlot sB0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false, Collections.emptyList()).get();
+			LogicalSlot sB0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3, null)), false, Collections.emptyList()).get();
 			assertNotNull(sB0);
 			
 			// release the last of the original shared slots and allocate one more non-shared slot
 			s2_1.releaseSlot();
-			LogicalSlot sB2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 2, 3)), false, Collections.emptyList()).get();
+			LogicalSlot sB2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 2, 3, null)), false, Collections.emptyList()).get();
 			assertNotNull(sB2);
 			
 			
 			// release on non-shared and add some shared slots
 			sA2.releaseSlot();
-			LogicalSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+			LogicalSlot s1_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s2_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 			assertNotNull(s1_3);
 			assertNotNull(s2_3);
 			
@@ -600,8 +592,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			s1_3.releaseSlot();
 			s2_3.releaseSlot();
 			
-			LogicalSlot sC0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 1, 2)), false, Collections.emptyList()).get();
-			LogicalSlot sC1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 2)), false, Collections.emptyList()).get();
+			LogicalSlot sC0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 1, 2, null)), false, Collections.emptyList()).get();
+			LogicalSlot sC1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 2, null)), false, Collections.emptyList()).get();
 			assertNotNull(sC0);
 			assertNotNull(sC1);
 			
@@ -613,12 +605,12 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			sC1.releaseSlot();
 			
 			// test that everything is released
-			assertEquals(5, scheduler.getNumberOfAvailableSlots());
+			assertEquals(5, testingSlotProvider.getNumberOfAvailableSlots());
 			
 			// check the scheduler's bookkeeping
-			assertEquals(0, scheduler.getNumberOfLocalizedAssignments());
-			assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
-			assertEquals(15, scheduler.getNumberOfUnconstrainedAssignments());
+			assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
+			assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
+			assertEquals(15, testingSlotProvider.getNumberOfUnconstrainedAssignments());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -637,41 +629,34 @@ public class SchedulerSlotSharingTest extends TestLogger {
 
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
 
-			Instance i1 = getRandomInstance(2);
-			Instance i2 = getRandomInstance(2);
-
-			TaskManagerLocation loc1 = i1.getTaskManagerLocation();
-			TaskManagerLocation loc2 = i2.getTaskManagerLocation();
-
-			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
-			scheduler.newInstanceAvailable(i1);
-			scheduler.newInstanceAvailable(i2);
-			
+			TaskManagerLocation loc1 = testingSlotProvider.addTaskManager(2);
+			TaskManagerLocation loc2 = testingSlotProvider.addTaskManager(2);
 			
 			// schedule one to each instance
-			LogicalSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
-			LogicalSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
+			LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1)).get();
+			LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc2)).get();
 			assertNotNull(s1);
 			assertNotNull(s2);
 			
-			assertEquals(2, sharingGroup.getTaskAssignment().getNumberOfSlots());
-			assertEquals(1, i1.getNumberOfAvailableSlots());
-			assertEquals(1, i2.getNumberOfAvailableSlots());
+			assertEquals(2, testingSlotProvider.getNumberOfSlots(sharingGroup));
+			assertEquals(loc1, s1.getTaskManagerLocation());
+			assertEquals(loc2, s2.getTaskManagerLocation());
 			
 			// schedule one from the other group to each instance
-			LogicalSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
-			LogicalSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
+			LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1)).get();
+			LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc2)).get();
 			assertNotNull(s3);
 			assertNotNull(s4);
 			
-			assertEquals(2, sharingGroup.getTaskAssignment().getNumberOfSlots());
-			assertEquals(1, i1.getNumberOfAvailableSlots());
-			assertEquals(1, i2.getNumberOfAvailableSlots());
+			assertEquals(2, testingSlotProvider.getNumberOfSlots(sharingGroup));
+			assertEquals(loc1, s3.getTaskManagerLocation());
+			assertEquals(loc2, s4.getTaskManagerLocation());
+			assertEquals(2, testingSlotProvider.getNumberOfAvailableSlots());
 			
 			// check the scheduler's bookkeeping
-			assertEquals(4, scheduler.getNumberOfLocalizedAssignments());
-			assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
-			assertEquals(0, scheduler.getNumberOfUnconstrainedAssignments());
+			assertEquals(4, testingSlotProvider.getNumberOfLocalizedAssignments());
+			assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
+			assertEquals(0, testingSlotProvider.getNumberOfUnconstrainedAssignments());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -690,41 +675,33 @@ public class SchedulerSlotSharingTest extends TestLogger {
 
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
 
-			Instance i1 = getRandomInstance(2);
-			Instance i2 = getRandomInstance(2);
-
-			TaskManagerLocation loc1 = i1.getTaskManagerLocation();
-			TaskManagerLocation loc2 = i2.getTaskManagerLocation();
+			TaskManagerLocation loc1 = testingSlotProvider.addTaskManager(2);
+			TaskManagerLocation loc2 = testingSlotProvider.addTaskManager(2);
 
-			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
-			scheduler.newInstanceAvailable(i1);
-			scheduler.newInstanceAvailable(i2);
-			
-			
 			// schedule one to each instance
-			LogicalSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
-			LogicalSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+			LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1)).get();
+			LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1)).get();
 			assertNotNull(s1);
 			assertNotNull(s2);
 			
-			assertEquals(2, sharingGroup.getTaskAssignment().getNumberOfSlots());
-			assertEquals(0, i1.getNumberOfAvailableSlots());
-			assertEquals(2, i2.getNumberOfAvailableSlots());
+			assertEquals(2, testingSlotProvider.getNumberOfSlots(sharingGroup));
+			assertEquals(loc1, s1.getTaskManagerLocation());
+			assertEquals(loc1, s2.getTaskManagerLocation());
 			
 			// schedule one from the other group to each instance
-			LogicalSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
-			LogicalSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
+			LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc2)).get();
+			LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc2)).get();
 			assertNotNull(s3);
 			assertNotNull(s4);
 			
-			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
-			assertEquals(0, i1.getNumberOfAvailableSlots());
-			assertEquals(0, i2.getNumberOfAvailableSlots());
+			assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
+			assertEquals(loc2, s3.getTaskManagerLocation());
+			assertEquals(loc2, s4.getTaskManagerLocation());
 			
 			// check the scheduler's bookkeeping
-			assertEquals(4, scheduler.getNumberOfLocalizedAssignments());
-			assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
-			assertEquals(0, scheduler.getNumberOfUnconstrainedAssignments());
+			assertEquals(4, testingSlotProvider.getNumberOfLocalizedAssignments());
+			assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
+			assertEquals(0, testingSlotProvider.getNumberOfUnconstrainedAssignments());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -743,24 +720,18 @@ public class SchedulerSlotSharingTest extends TestLogger {
 
 			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
 
-			Instance i1 = getRandomInstance(2);
-			Instance i2 = getRandomInstance(2);
+			TaskManagerLocation loc1 = testingSlotProvider.addTaskManager(2);
+			TaskManagerLocation loc2 = testingSlotProvider.addTaskManager(2);
 
-			TaskManagerLocation loc1 = i1.getTaskManagerLocation();
-
-			Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
-			scheduler.newInstanceAvailable(i1);
-			scheduler.newInstanceAvailable(i2);
-			
 			// schedule until the one instance is full
-			LogicalSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
-			LogicalSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
-			LogicalSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
-			LogicalSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+			LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1)).get();
+			LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1)).get();
+			LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1)).get();
+			LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1)).get();
 
 			// schedule two more with preference of same instance --> need to go to other instance
-			LogicalSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
-			LogicalSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+			LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1)).get();
+			LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1)).get();
 			
 			assertNotNull(s1);
 			assertNotNull(s2);
@@ -769,22 +740,21 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			assertNotNull(s5);
 			assertNotNull(s6);
 			
-			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
-			
-			assertEquals(0, i1.getNumberOfAvailableSlots());
-			assertEquals(0, i2.getNumberOfAvailableSlots());
-			
-			assertEquals(i1.getTaskManagerLocation(), s1.getTaskManagerLocation());
-			assertEquals(i1.getTaskManagerLocation(), s2.getTaskManagerLocation());
-			assertEquals(i1.getTaskManagerLocation(), s3.getTaskManagerLocation());
-			assertEquals(i1.getTaskManagerLocation(), s4.getTaskManagerLocation());
-			assertEquals(i2.getTaskManagerLocation(), s5.getTaskManagerLocation());
-			assertEquals(i2.getTaskManagerLocation(), s6.getTaskManagerLocation());
+			assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
+
+			assertEquals(loc1, s1.getTaskManagerLocation());
+			assertEquals(loc1, s2.getTaskManagerLocation());
+			assertEquals(loc1, s3.getTaskManagerLocation());
+			assertEquals(loc1, s4.getTaskManagerLocation());
+			assertEquals(loc2, s5.getTaskManagerLocation());
+			assertEquals(loc2, s6.getTaskManagerLocation());
 			
 			// check the scheduler's bookkeeping
-			assertEquals(4, scheduler.getNumberOfLocalizedAssignments());
-			assertEquals(2, scheduler.getNumberOfNonLocalizedAssignments());
-			assertEquals(0, scheduler.getNumberOfUnconstrainedAssignments());
+			assertEquals(4, testingSlotProvider.getNumberOfLocalizedAssignments());
+			// Flip-6 supports host localized assignments which happen in this case because all TaskManagerLocations point to the loopback address
+			assertTrue(2 == testingSlotProvider.getNumberOfNonLocalizedAssignments() || 2 == testingSlotProvider.getNumberOfHostLocalizedAssignments());
+
+			assertEquals(0, testingSlotProvider.getNumberOfUnconstrainedAssignments());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -802,23 +772,22 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			final SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4);
 			
-			final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-			scheduler.newInstanceAvailable(getRandomInstance(4));
-			
+			testingSlotProvider.addTaskManager(4);
+
 			// allocate something from group 1 and 2 interleaved with schedule for group 3
-			LogicalSlot slot_1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot slot_1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+			LogicalSlot slot_1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot slot_1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 
-			LogicalSlot slot_2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot slot_2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+			LogicalSlot slot_2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot slot_2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 			
-			LogicalSlot slot_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false, Collections.emptyList()).get();
+			LogicalSlot slot_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 			
-			LogicalSlot slot_1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot slot_1_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get();
+			LogicalSlot slot_1_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot slot_1_4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 			
-			LogicalSlot slot_2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot slot_2_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false, Collections.emptyList()).get();
+			LogicalSlot slot_2_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot slot_2_4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 			
 			// release groups 1 and 2
 			
@@ -834,10 +803,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			
 			// allocate group 4
 			
-			LogicalSlot slot_4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot slot_4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot slot_4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot slot_4_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false, Collections.emptyList()).get();
+			LogicalSlot slot_4_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot slot_4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot slot_4_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot slot_4_4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 			
 			// release groups 3 and 4
 			
@@ -859,6 +828,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
 		final ExecutorService executor = Executors.newFixedThreadPool(20);
 
 		try {
+			testingSlotProvider.addTaskManager(4);
+
 			for (int run = 0; run < 50; run++) {
 				final JobVertexID jid1 = new JobVertexID();
 				final JobVertexID jid2 = new JobVertexID();
@@ -866,10 +837,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 				final JobVertexID jid4 = new JobVertexID();
 				
 				final SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4);
-				
-				final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-				scheduler.newInstanceAvailable(getRandomInstance(4));
-				
+
 				final AtomicInteger enumerator1 = new AtomicInteger();
 				final AtomicInteger enumerator2 = new AtomicInteger();
 				final AtomicBoolean flag3 = new AtomicBoolean();
@@ -883,13 +851,11 @@ public class SchedulerSlotSharingTest extends TestLogger {
 				// use atomic integer as a mutable integer reference
 				final AtomicInteger completed = new AtomicInteger();
 				
-				
 				final Runnable deploy4 = new Runnable() {
 					@Override
 					public void run() {
 						try {
-							LogicalSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4), sharingGroup), false, Collections.emptyList()).get();
-
+							LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 							sleepUninterruptibly(rnd.nextInt(5));
 							slot.releaseSlot();
 
@@ -911,8 +877,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
 					public void run() {
 						try {
 							if (flag3.compareAndSet(false, true)) {
-								LogicalSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false, Collections.emptyList()).get();
-								
+								LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 								sleepUninterruptibly(5);
 								
 								executor.execute(deploy4);
@@ -940,8 +905,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
 					@Override
 					public void run() {
 						try {
-							LogicalSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4), sharingGroup), false, Collections.emptyList()).get();
-							
+							LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+
 							// wait a bit till scheduling the successor
 							sleepUninterruptibly(rnd.nextInt(5));
 							executor.execute(deploy3);
@@ -967,8 +932,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
 					@Override
 					public void run() {
 						try {
-							LogicalSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4), sharingGroup), false, Collections.emptyList()).get();
-							
+							LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+
 							// wait a bit till scheduling the successor
 							sleepUninterruptibly(rnd.nextInt(5));
 							executor.execute(deploy2);
@@ -1012,14 +977,12 @@ public class SchedulerSlotSharingTest extends TestLogger {
 				
 				assertFalse("Thread failed", failed.get());
 				
-				while (scheduler.getNumberOfAvailableSlots() < 4) {
+				while (testingSlotProvider.getNumberOfAvailableSlots() < 4) {
 					sleepUninterruptibly(5);
 				}
 				
-				assertEquals(1, scheduler.getNumberOfAvailableInstances());
-				assertEquals(1, scheduler.getNumberOfInstancesWithAvailableSlots());
-				assertEquals(4, scheduler.getNumberOfAvailableSlots());
-				assertEquals(13, scheduler.getNumberOfUnconstrainedAssignments());
+				assertEquals(4, testingSlotProvider.getNumberOfAvailableSlots());
+				assertEquals(13 * (run + 1), testingSlotProvider.getNumberOfUnconstrainedAssignments());
 			}
 		}
 		catch (Exception e) {
@@ -1042,27 +1005,27 @@ public class SchedulerSlotSharingTest extends TestLogger {
 			scheduler.newInstanceAvailable(getRandomInstance(4));
 			
 			// schedule one task for the first and second vertex
-			LogicalSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup), false, Collections.emptyList()).get();
+			LogicalSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 			
 			assertEquals( s1.getTaskManagerLocation(), s2.getTaskManagerLocation() );
 			assertEquals(3, scheduler.getNumberOfAvailableSlots());
 			
-			LogicalSlot s3_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s4_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+			LogicalSlot s3_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s4_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 			
 			s1.releaseSlot();
 			s2.releaseSlot();
 			
-			LogicalSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false, Collections.emptyList()).get();
-			LogicalSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false, Collections.emptyList()).get();
+			LogicalSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+			LogicalSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 			
 			try {
-				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup), false, Collections.emptyList()).get();
+				scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
 				fail("should throw an exception");
 			}
 			catch (ExecutionException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
new file mode 100644
index 0000000..8fd5f9e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class SchedulerTest extends TestLogger {
+
+	@Test
+	public void testAddAndRemoveInstance() {
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+
+		Instance i1 = getRandomInstance(2);
+		Instance i2 = getRandomInstance(2);
+		Instance i3 = getRandomInstance(2);
+
+		assertEquals(0, scheduler.getNumberOfAvailableInstances());
+		assertEquals(0, scheduler.getNumberOfAvailableSlots());
+		scheduler.newInstanceAvailable(i1);
+		assertEquals(1, scheduler.getNumberOfAvailableInstances());
+		assertEquals(2, scheduler.getNumberOfAvailableSlots());
+		scheduler.newInstanceAvailable(i2);
+		assertEquals(2, scheduler.getNumberOfAvailableInstances());
+		assertEquals(4, scheduler.getNumberOfAvailableSlots());
+		scheduler.newInstanceAvailable(i3);
+		assertEquals(3, scheduler.getNumberOfAvailableInstances());
+		assertEquals(6, scheduler.getNumberOfAvailableSlots());
+
+		// cannot add available instance again
+		try {
+			scheduler.newInstanceAvailable(i2);
+			fail("Scheduler accepted instance twice");
+		}
+		catch (IllegalArgumentException e) {
+			// bueno!
+		}
+
+		// some instances die
+		assertEquals(3, scheduler.getNumberOfAvailableInstances());
+		assertEquals(6, scheduler.getNumberOfAvailableSlots());
+		scheduler.instanceDied(i2);
+		assertEquals(2, scheduler.getNumberOfAvailableInstances());
+		assertEquals(4, scheduler.getNumberOfAvailableSlots());
+
+		// try to add a dead instance
+		try {
+			scheduler.newInstanceAvailable(i2);
+			fail("Scheduler accepted dead instance");
+		}
+		catch (IllegalArgumentException e) {
+			// stimmt
+
+		}
+
+		scheduler.instanceDied(i1);
+		assertEquals(1, scheduler.getNumberOfAvailableInstances());
+		assertEquals(2, scheduler.getNumberOfAvailableSlots());
+		scheduler.instanceDied(i3);
+		assertEquals(0, scheduler.getNumberOfAvailableInstances());
+		assertEquals(0, scheduler.getNumberOfAvailableSlots());
+
+		assertFalse(i1.isAlive());
+		assertFalse(i2.isAlive());
+		assertFalse(i3.isAlive());
+	}
+}