You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:47 UTC

[23/63] [abbrv] Redesign Scheduler part 2

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
new file mode 100644
index 0000000..afc0db9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -0,0 +1,633 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.areAllDistinct;
+import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getTestVertex;
+import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getTestVertexWithLocation;
+import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+/**
+ * Tests for the scheduler when scheduling tasks in slot sharing groups.
+ */
+public class SchedulerSlotSharingTest {
+	
+	@Test
+	public void scheduleSingleVertexType() {
+		try {
+			JobVertexID jid1 = new JobVertexID();
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1);
+			
+			DefaultScheduler scheduler = new DefaultScheduler();
+			Instance i1 = getRandomInstance(2);
+			Instance i2 = getRandomInstance(2);
+			scheduler.newInstanceAvailable(i1);
+			scheduler.newInstanceAvailable(i2);
+			
+			// schedule 4 tasks from the first vertex group
+			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup));
+			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup));
+			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup));
+			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup));
+			
+			assertNotNull(s1);
+			assertNotNull(s2);
+			assertNotNull(s3);
+			assertNotNull(s4);
+			
+			assertTrue(areAllDistinct(s1, s2, s3, s4));
+			
+			// we cannot schedule another task from the first vertex group
+			try {
+				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup));
+				fail("Scheduler accepted too many tasks at the same time");
+			}
+			catch (NoResourceAvailableException e) {
+				// good!
+			}
+			catch (Exception e) {
+				fail("Wrong exception.");
+			}
+			
+			// release something
+			s3.releaseSlot();
+			
+			// allocate another slot from that group
+			AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup));
+			assertNotNull(s5);
+			
+			// release all old slots
+			s1.releaseSlot();
+			s2.releaseSlot();
+			s4.releaseSlot();
+			
+			AllocatedSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup));
+			AllocatedSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup));
+			AllocatedSlot s8 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup));
+			
+			assertNotNull(s6);
+			assertNotNull(s7);
+			assertNotNull(s8);
+			
+			// make sure we have two slots on the first instance, and two on the second
+			int c = 0;
+			c += (s5.getInstance() == i1) ? 1 : -1;
+			c += (s6.getInstance() == i1) ? 1 : -1;
+			c += (s7.getInstance() == i1) ? 1 : -1;
+			c += (s8.getInstance() == i1) ? 1 : -1;
+			assertEquals(0, c);
+			
+			// release all
+			s5.releaseSlot();
+			s6.releaseSlot();
+			s7.releaseSlot();
+			s8.releaseSlot();
+			
+			// test that everything is released
+			assertEquals(4, scheduler.getNumberOfAvailableSlots());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void scheduleImmediatelyWithSharing() {
+		try {
+			JobVertexID jid1 = new JobVertexID();
+			JobVertexID jid2 = new JobVertexID();
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
+			
+			DefaultScheduler scheduler = new DefaultScheduler();
+			scheduler.newInstanceAvailable(getRandomInstance(2));
+			scheduler.newInstanceAvailable(getRandomInstance(2));
+			
+			// schedule 4 tasks from the first vertex group
+			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup));
+			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup));
+			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup));
+			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup));
+			
+			assertNotNull(s1);
+			assertNotNull(s2);
+			assertNotNull(s3);
+			assertNotNull(s4);
+			
+			assertTrue(areAllDistinct(s1, s2, s3, s4));
+			
+			// we cannot schedule another task from the first vertex group
+			try {
+				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup));
+				fail("Scheduler accepted too many tasks at the same time");
+			}
+			catch (NoResourceAvailableException e) {
+				// good!
+			}
+			catch (Exception e) {
+				fail("Wrong exception.");
+			}
+			
+			// schedule some tasks from the second ID group
+			AllocatedSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup));
+			AllocatedSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup));
+			AllocatedSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup));
+			AllocatedSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup));
+			
+			assertNotNull(s1_2);
+			assertNotNull(s2_2);
+			assertNotNull(s3_2);
+			assertNotNull(s4_2);
+			
+			// we cannot schedule another task from the second vertex group
+			try {
+				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup));
+				fail("Scheduler accepted too many tasks at the same time");
+			}
+			catch (NoResourceAvailableException e) {
+				// good!
+			}
+			catch (Exception e) {
+				fail("Wrong exception.");
+			}
+			
+			// now, we release some vertices (sub-slots) from the first group.
+			// that should allow us to schedule more vertices from the first group
+			s1.releaseSlot();
+			s4.releaseSlot();
+			
+			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
+			assertEquals(2, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid1));
+			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid2));
+			
+			// we can still not schedule anything from the second group of vertices
+			try {
+				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup));
+				fail("Scheduler accepted too many tasks at the same time");
+			}
+			catch (NoResourceAvailableException e) {
+				// good!
+			}
+			catch (Exception e) {
+				fail("Wrong exception.");
+			}
+			
+			// we can schedule something from the first vertex group
+			AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup));
+			assertNotNull(s5);
+			
+			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
+			assertEquals(1, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid1));
+			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid2));
+			
+			
+			// now we release a slot from the second vertex group and schedule another task from that group
+			s2_2.releaseSlot();
+			AllocatedSlot s5_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup));
+			assertNotNull(s5_2);
+			
+			// release all slots
+			s2.releaseSlot();
+			s3.releaseSlot();
+			s5.releaseSlot();
+			
+			s1_2.releaseSlot();
+			s3_2.releaseSlot();
+			s4_2.releaseSlot();
+			s5_2.releaseSlot();
+			
+			// test that everything is released
+			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
+			assertEquals(4, scheduler.getNumberOfAvailableSlots());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void scheduleImmediatelyWithTemprarilyEmptyVertexGroup() {
+		try {
+			JobVertexID jid1 = new JobVertexID();
+			JobVertexID jid2 = new JobVertexID();
+			JobVertexID jid3 = new JobVertexID();
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3);
+			
+			DefaultScheduler scheduler = new DefaultScheduler();
+			scheduler.newInstanceAvailable(getRandomInstance(2));
+			scheduler.newInstanceAvailable(getRandomInstance(2));
+			
+			// schedule 4 tasks from the first vertex group
+			AllocatedSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
+			AllocatedSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
+			AllocatedSlot s3_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
+			AllocatedSlot s4_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
+			
+			assertNotNull(s1_1);
+			assertNotNull(s2_1);
+			assertNotNull(s3_1);
+			assertNotNull(s4_1);
+			
+			assertTrue(areAllDistinct(s1_1, s2_1, s3_1, s4_1));
+			
+			// schedule 4 tasks from the second vertex group
+			AllocatedSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup));
+			AllocatedSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup));
+			AllocatedSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup));
+			AllocatedSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup));
+			
+			assertNotNull(s1_2);
+			assertNotNull(s2_2);
+			assertNotNull(s3_2);
+			assertNotNull(s4_2);
+			
+			assertTrue(areAllDistinct(s1_2, s2_2, s3_2, s4_2));
+			
+			// schedule 4 tasks from the third vertex group
+			AllocatedSlot s1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup));
+			AllocatedSlot s2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup));
+			AllocatedSlot s3_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup));
+			AllocatedSlot s4_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup));
+			
+			assertNotNull(s1_3);
+			assertNotNull(s2_3);
+			assertNotNull(s3_3);
+			assertNotNull(s4_3);
+			
+			assertTrue(areAllDistinct(s1_3, s2_3, s3_3, s4_3));
+			
+			
+			// we cannot schedule another task from the second vertex group
+			try {
+				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup));
+				fail("Scheduler accepted too many tasks at the same time");
+			}
+			catch (NoResourceAvailableException e) {
+				// good!
+			}
+			catch (Exception e) {
+				fail("Wrong exception.");
+			}
+			
+			// release the second vertex group
+			s1_2.releaseSlot();
+			s2_2.releaseSlot();
+			s3_2.releaseSlot();
+			s4_2.releaseSlot();
+			
+			AllocatedSlot s5_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup));
+			AllocatedSlot s6_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup));
+			AllocatedSlot s7_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup));
+			
+			assertNotNull(s5_2);
+			assertNotNull(s6_2);
+			assertNotNull(s7_2);
+			
+			// release the slots
+			s1_1.releaseSlot();
+			s2_1.releaseSlot();
+			s3_1.releaseSlot();
+			s4_1.releaseSlot();
+			
+			s5_2.releaseSlot();
+			s6_2.releaseSlot();
+			s7_2.releaseSlot();
+			
+			// test that everything is released
+			assertEquals(0, scheduler.getNumberOfAvailableSlots());
+			
+			s1_3.releaseSlot();
+			s2_3.releaseSlot();
+			s3_3.releaseSlot();
+			s4_3.releaseSlot();
+			
+			// test that everything is released
+			assertEquals(4, scheduler.getNumberOfAvailableSlots());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void scheduleImmediatelyWithTemprarilyEmptyVertexGroup2() {
+		try {
+			JobVertexID jid1 = new JobVertexID();
+			JobVertexID jid2 = new JobVertexID();
+			JobVertexID jid3 = new JobVertexID();
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
+			
+			DefaultScheduler scheduler = new DefaultScheduler();
+			scheduler.newInstanceAvailable(getRandomInstance(2));
+			
+			// schedule 1 tasks from the first vertex group and 2 from the second
+			AllocatedSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup));
+			AllocatedSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup));
+			AllocatedSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup));
+			
+			assertNotNull(s1_1);
+			assertNotNull(s2_1);
+			assertNotNull(s2_2);
+			
+			assertEquals(2, sharingGroup.getTaskAssignment().getNumberOfSlots());
+			assertEquals(1, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid1));
+			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid2));
+			
+			// release the two from the second
+			s2_1.releaseSlot();
+			s2_2.releaseSlot();
+			
+			
+			// this should free one slot so we can allocate one non-shared
+			AllocatedSlot sx = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1)));
+			assertNotNull(sx);
+			
+			assertEquals(1, sharingGroup.getTaskAssignment().getNumberOfSlots());
+			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid1));
+			assertEquals(1, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid2));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void scheduleMixedSharingAndNonSharing() {
+		try {
+			JobVertexID jid1 = new JobVertexID();
+			JobVertexID jid2 = new JobVertexID();
+			
+			JobVertexID jidA = new JobVertexID();
+			JobVertexID jidB= new JobVertexID();
+			JobVertexID jidC = new JobVertexID();
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
+			
+			DefaultScheduler scheduler = new DefaultScheduler();
+			scheduler.newInstanceAvailable(getRandomInstance(3));
+			scheduler.newInstanceAvailable(getRandomInstance(2));
+			
+			// schedule some individual vertices
+			AllocatedSlot sA1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidA, 0, 2)));
+			AllocatedSlot sA2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidA, 1, 2)));
+			assertNotNull(sA1);
+			assertNotNull(sA2);
+			
+			// schedule some vertices in the sharing group
+			AllocatedSlot s1_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
+			AllocatedSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
+			AllocatedSlot s2_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup));
+			AllocatedSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup));
+			assertNotNull(s1_0);
+			assertNotNull(s1_1);
+			assertNotNull(s2_0);
+			assertNotNull(s2_1);
+			
+			// schedule another isolated vertex
+			AllocatedSlot sB1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 1, 3)));
+			assertNotNull(sB1);
+			
+			// should not be able to schedule more vertices
+			try {
+				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
+				fail("Scheduler accepted too many tasks at the same time");
+			}
+			catch (NoResourceAvailableException e) {
+				// good!
+			}
+			catch (Exception e) {
+				fail("Wrong exception.");
+			}
+			
+			try {
+				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup));
+				fail("Scheduler accepted too many tasks at the same time");
+			}
+			catch (NoResourceAvailableException e) {
+				// good!
+			}
+			catch (Exception e) {
+				fail("Wrong exception.");
+			}
+			
+			try {
+				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 0, 3)));
+				fail("Scheduler accepted too many tasks at the same time");
+			}
+			catch (NoResourceAvailableException e) {
+				// good!
+			}
+			catch (Exception e) {
+				fail("Wrong exception.");
+			}
+			
+			try {
+				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidC, 0, 1)));
+				fail("Scheduler accepted too many tasks at the same time");
+			}
+			catch (NoResourceAvailableException e) {
+				// good!
+			}
+			catch (Exception e) {
+				fail("Wrong exception.");
+			}
+			
+			// release some isolated task and check that the sharing group may grow
+			sA1.releaseSlot();
+			
+			AllocatedSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
+			AllocatedSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup));
+			assertNotNull(s1_2);
+			assertNotNull(s2_2);
+			
+			// release three of the previously allocated sub slots, which guarantees to return one shared slot
+			s1_0.releaseSlot();
+			s1_1.releaseSlot();
+			s2_0.releaseSlot();
+			
+			assertEquals(1, scheduler.getNumberOfAvailableSlots());
+			
+			// schedule one more no-shared task
+			AllocatedSlot sB0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 0, 3)));
+			assertNotNull(sB0);
+			
+			// release the last of the original shared slots and allocate one more non-shared slot
+			s2_1.releaseSlot();
+			AllocatedSlot sB2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 2, 3)));
+			assertNotNull(sB2);
+			
+			
+			// release on non-shared and add some shared slots
+			sA2.releaseSlot();
+			AllocatedSlot s1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
+			AllocatedSlot s2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup));
+			assertNotNull(s1_3);
+			assertNotNull(s2_3);
+			
+			// release all shared and allocate all in non-shared
+			s1_2.releaseSlot();
+			s2_2.releaseSlot();
+			s1_3.releaseSlot();
+			s2_3.releaseSlot();
+			
+			AllocatedSlot sC0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidC, 1, 2)));
+			AllocatedSlot sC1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidC, 0, 2)));
+			assertNotNull(sC0);
+			assertNotNull(sC1);
+			
+			
+			sB0.releaseSlot();
+			sB1.releaseSlot();
+			sB2.releaseSlot();
+			sC0.releaseSlot();
+			sC1.releaseSlot();
+			
+			// test that everything is released
+			assertEquals(5, scheduler.getNumberOfAvailableSlots());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	/**
+	 * Tests that the scheduler assigns the correct existing shared slots
+	 */
+	@Test
+	public void testLocalizedAssignment1() {
+		try {
+			JobVertexID jid1 = new JobVertexID();
+			JobVertexID jid2 = new JobVertexID();
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
+			
+			Instance i1 = getRandomInstance(2);
+			Instance i2 = getRandomInstance(2);
+			
+			DefaultScheduler scheduler = new DefaultScheduler();
+			scheduler.newInstanceAvailable(i1);
+			scheduler.newInstanceAvailable(i2);
+			
+			
+			// schedule one to each instance
+			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup));
+			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup));
+			assertNotNull(s1);
+			assertNotNull(s2);
+			
+			assertEquals(2, sharingGroup.getTaskAssignment().getNumberOfSlots());
+			assertEquals(1, i1.getNumberOfAvailableSlots());
+			assertEquals(1, i2.getNumberOfAvailableSlots());
+			
+			// schedule one from the other group to each instance
+			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup));
+			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i2), sharingGroup));
+			assertNotNull(s3);
+			assertNotNull(s4);
+			
+			assertEquals(2, sharingGroup.getTaskAssignment().getNumberOfSlots());
+			assertEquals(1, i1.getNumberOfAvailableSlots());
+			assertEquals(1, i2.getNumberOfAvailableSlots());
+			
+			// check the scheduler's bookkeeping
+			assertEquals(4, scheduler.getNumberOfLocalizedAssignments());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	/**
+	 * Tests that the scheduler assigns to new local slots, rather than to existing non-local slots
+	 */
+	@Test
+	public void testLocalizedAssignment2() {
+		try {
+			JobVertexID jid1 = new JobVertexID();
+			JobVertexID jid2 = new JobVertexID();
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
+			
+			Instance i1 = getRandomInstance(2);
+			Instance i2 = getRandomInstance(2);
+			
+			DefaultScheduler scheduler = new DefaultScheduler();
+			scheduler.newInstanceAvailable(i1);
+			scheduler.newInstanceAvailable(i2);
+			
+			
+			// schedule one to each instance
+			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup));
+			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i1), sharingGroup));
+			assertNotNull(s1);
+			assertNotNull(s2);
+			
+			assertEquals(2, sharingGroup.getTaskAssignment().getNumberOfSlots());
+			assertEquals(0, i1.getNumberOfAvailableSlots());
+			assertEquals(2, i2.getNumberOfAvailableSlots());
+			
+			// schedule one from the other group to each instance
+			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i2), sharingGroup));
+			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i2), sharingGroup));
+			assertNotNull(s3);
+			assertNotNull(s4);
+			
+			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
+			assertEquals(0, i1.getNumberOfAvailableSlots());
+			assertEquals(0, i2.getNumberOfAvailableSlots());
+			
+			// check the scheduler's bookkeeping
+			assertEquals(4, scheduler.getNumberOfLocalizedAssignments());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	/**
+	 * Tests that the scheduler can fall back to non-local
+	 */
+	@Test
+	public void testLocalizedAssignment3() {
+		try {
+			
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
new file mode 100644
index 0000000..78a397e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+
+public class SchedulerTestUtils {
+	
+	private static final AtomicInteger port = new AtomicInteger(10000);
+
+	// --------------------------------------------------------------------------------------------
+	
+	public static Instance getRandomInstance(int numSlots) {
+		if (numSlots <= 0) {
+			throw new IllegalArgumentException();
+		}
+		
+		InetAddress address;
+		try {
+			address = InetAddress.getByName("127.0.0.1");
+		} catch (UnknownHostException e) {
+			throw new RuntimeException("Test could not create IP address for localhost loopback.");
+		}
+		
+		int ipcPort = port.getAndIncrement();
+		int dataPort = port.getAndIncrement();
+		
+		InstanceConnectionInfo ci = new InstanceConnectionInfo(address, ipcPort, dataPort);
+		
+		final long GB = 1024L*1024*1024;
+		HardwareDescription resources = new HardwareDescription(4, 4*GB, 3*GB, 2*GB);
+		
+		return new Instance(ci, new InstanceID(), resources, numSlots);
+	}
+	
+	
+	public static ExecutionVertex2 getDummyVertex() {
+		ExecutionVertex2 vertex = mock(ExecutionVertex2.class);
+		
+		when(vertex.getJobId()).thenReturn(new JobID());
+		when(vertex.toString()).thenReturn("TEST-VERTEX");
+		
+		return vertex;
+	}
+	
+	public static ExecutionVertex2 getTestVertex(JobVertexID jid, int taskIndex, int numTasks) {
+		ExecutionVertex2 vertex = mock(ExecutionVertex2.class);
+		
+		when(vertex.getPreferredLocations()).thenReturn(null);
+		when(vertex.getJobId()).thenReturn(new JobID());
+		when(vertex.getJobvertexId()).thenReturn(jid);
+		when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);
+		when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(numTasks);
+		when(vertex.toString()).thenReturn("TEST-VERTEX");
+		
+		return vertex;
+	}
+	
+	public static ExecutionVertex2 getTestVertexWithLocation(JobVertexID jid, int taskIndex, int numTasks, Instance... locations) {
+		ExecutionVertex2 vertex = mock(ExecutionVertex2.class);
+		
+		when(vertex.getPreferredLocations()).thenReturn(Arrays.asList(locations));
+		when(vertex.getJobId()).thenReturn(new JobID());
+		when(vertex.getJobvertexId()).thenReturn(jid);
+		when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);
+		when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(numTasks);
+		when(vertex.toString()).thenReturn("TEST-VERTEX");
+		
+		return vertex;
+	}
+	
+	public static ExecutionVertex2 getTestVertex(Iterable<Instance> preferredLocations) {
+		ExecutionVertex2 vertex = mock(ExecutionVertex2.class);
+		
+		when(vertex.getPreferredLocations()).thenReturn(preferredLocations);
+		when(vertex.getJobId()).thenReturn(new JobID());
+		when(vertex.toString()).thenReturn("TEST-VERTEX");
+		
+		return vertex;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static boolean areAllDistinct(Object ... obj) {
+		if (obj == null) {
+			return true;
+		}
+		
+		HashSet<Object> set = new HashSet<Object>();
+		for (Object o : obj) {
+			set.add(o);
+		}
+		
+		return set.size() == obj.length;
+	}
+	
+	public static boolean areSameSets(Collection<Object> set1, Collection<Object> set2) {
+		if (set1 == null || set2 == null) {
+			throw new IllegalArgumentException();
+		}
+		
+		HashSet<Object> set = new HashSet<Object>(set1);
+		for (Object o : set2) {
+			if (!set.remove(o)) {
+				return false;
+			}
+		}
+		
+		return set.isEmpty();
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SetQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SetQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SetQueueTest.java
new file mode 100644
index 0000000..9b9a899
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SetQueueTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import static org.junit.Assert.*;
+
+import java.util.Queue;
+
+import org.junit.Test;
+
+/**
+ * Test for the basic functionality of the {@link LifoSetQueue}.
+ */
+public class SetQueueTest {
+
+	@Test
+	public void testSizeAddPollAndPeek() {
+		try {
+			Queue<Integer> queue = new SetQueue<Integer>();
+			
+			// empty queue
+			assertEquals(0, queue.size());
+			assertNull(queue.poll());
+			assertNull(queue.peek());
+			
+			// add some elements
+			assertTrue(queue.add(1));
+			assertTrue(queue.offer(2));
+			assertTrue(queue.offer(3));
+			assertEquals(3, queue.size());
+			
+			assertEquals(1, queue.peek().intValue());
+			
+			// prevent duplicates. note that the methods return true, because no capacity constraint is violated
+			assertTrue(queue.add(1));
+			assertTrue(queue.offer(1));
+			assertTrue(queue.add(3));
+			assertTrue(queue.offer(3));
+			assertTrue(queue.add(2));
+			assertTrue(queue.offer(2));
+			assertEquals(3, queue.size());
+			
+			// peek and poll some elements
+			assertEquals(1, queue.peek().intValue());
+			assertEquals(3, queue.size());
+			assertEquals(1, queue.poll().intValue());
+			assertEquals(2, queue.size());
+			assertEquals(2, queue.peek().intValue());
+			assertEquals(2, queue.size());
+			assertEquals(2, queue.poll().intValue());
+			assertEquals(1, queue.size());
+			assertEquals(3, queue.peek().intValue());
+			assertEquals(1, queue.size());
+			assertEquals(3, queue.poll().intValue());
+			assertEquals(0, queue.size());
+			assertTrue(queue.isEmpty());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " : " + e.getMessage());
+		}
+	}
+	
+
+	@Test
+	public void testOrder() {
+		try {
+			Queue<Integer> queue = new SetQueue<Integer>();
+			
+			queue.add(1);
+			queue.add(2);
+			queue.add(3);
+			
+			assertEquals(1, queue.peek().intValue());
+			
+			queue.add(4);
+			assertEquals(1, queue.peek().intValue());
+			
+			queue.remove(2);
+			assertEquals(1, queue.peek().intValue());
+			
+			queue.remove(4);
+			assertEquals(1, queue.peek().intValue());
+			
+			queue.remove(2);
+			assertEquals(1, queue.poll().intValue());
+			assertEquals(3, queue.poll().intValue());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " : " + e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
new file mode 100644
index 0000000..de22999
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+public class SharedSlotsTest {
+
+	@Test
+	public void createAndDoNotRelease() {
+		try {
+			SlotSharingGroupAssignment assignment = mock(SlotSharingGroupAssignment.class);
+			when(assignment.sharedSlotAvailableForJid(any(SharedSlot.class), any(JobVertexID.class), any(boolean.class))).thenReturn(false);
+			
+			Instance instance = SchedulerTestUtils.getRandomInstance(1);
+			
+			SharedSlot slot = new SharedSlot(instance.allocateSlot(new JobID()), assignment);
+			assertFalse(slot.isDisposed());
+			
+			SubSlot ss1 = slot.allocateSubSlot(new JobVertexID());
+			assertNotNull(ss1);
+			
+			// verify resources
+			assertEquals(instance, ss1.getInstance());
+			assertEquals(0, ss1.getSlotNumber());
+			assertEquals(slot.getAllocatedSlot().getJobID(), ss1.getJobID());
+			
+			SubSlot ss2 = slot.allocateSubSlot(new JobVertexID());
+			assertNotNull(ss2);
+			
+			assertEquals(2, slot.getNumberOfAllocatedSubSlots());
+			
+			// release first slot, should not trigger release
+			ss1.releaseSlot();
+			assertFalse(slot.isDisposed());
+			
+			ss2.releaseSlot();
+			assertFalse(slot.isDisposed());
+			
+			// the shared slot should now dispose itself
+			assertEquals(0, slot.getNumberOfAllocatedSubSlots());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void createAndRelease() {
+		try {
+			SlotSharingGroupAssignment assignment = mock(SlotSharingGroupAssignment.class);
+			when(assignment.sharedSlotAvailableForJid(any(SharedSlot.class), any(JobVertexID.class), eq(false))).thenReturn(false);
+			when(assignment.sharedSlotAvailableForJid(any(SharedSlot.class), any(JobVertexID.class), eq(true))).thenReturn(true);
+			
+			Instance instance = SchedulerTestUtils.getRandomInstance(1);
+			
+			SharedSlot slot = new SharedSlot(instance.allocateSlot(new JobID()), assignment);
+			assertFalse(slot.isDisposed());
+			
+			SubSlot ss1 = slot.allocateSubSlot(new JobVertexID());
+			assertNotNull(ss1);
+			
+			// verify resources
+			assertEquals(instance, ss1.getInstance());
+			assertEquals(0, ss1.getSlotNumber());
+			assertEquals(slot.getAllocatedSlot().getJobID(), ss1.getJobID());
+			
+			SubSlot ss2 = slot.allocateSubSlot(new JobVertexID());
+			assertNotNull(ss2);
+			
+			assertEquals(2, slot.getNumberOfAllocatedSubSlots());
+			
+			// release first slot, should not trigger release
+			ss1.releaseSlot();
+			assertFalse(slot.isDisposed());
+			
+			ss2.releaseSlot();
+			assertTrue(slot.isDisposed());
+			
+			// the shared slot should now dispose itself
+			assertEquals(0, slot.getNumberOfAllocatedSubSlots());
+			
+			assertNull(slot.allocateSubSlot(new JobVertexID()));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
new file mode 100644
index 0000000..423004f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import static org.junit.Assert.*;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.junit.Test;
+
+public class SlotAllocationFutureTest {
+
+	@Test
+	public void testInvalidActions() {
+		try {
+			final SlotAllocationFuture future = new SlotAllocationFuture();
+			
+			SlotAllocationFutureAction action = new SlotAllocationFutureAction() {
+				@Override
+				public void slotAllocated(AllocatedSlot slot) {}
+			};
+			
+			future.setFutureAction(action);
+			try {
+				future.setFutureAction(action);
+				fail();
+			} catch (IllegalStateException e) {
+				// expected
+			}
+			
+			final AllocatedSlot slot1 = new AllocatedSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0);
+			final AllocatedSlot slot2 = new AllocatedSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0);
+			
+			future.setSlot(slot1);
+			try {
+				future.setSlot(slot2);
+				fail();
+			} catch (IllegalStateException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void setWithAction() {
+		try {
+			
+			// action before the slot
+			{
+				final AtomicInteger invocations = new AtomicInteger();
+				final AllocatedSlot thisSlot = new AllocatedSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0);
+				
+				SlotAllocationFuture future = new SlotAllocationFuture();
+				
+				future.setFutureAction(new SlotAllocationFutureAction() {
+					@Override
+					public void slotAllocated(AllocatedSlot slot) {
+						assertEquals(thisSlot, slot);
+						invocations.incrementAndGet();
+					}
+				});
+				
+				future.setSlot(thisSlot);
+				
+				assertEquals(1, invocations.get());
+			}
+			
+			// slot before action
+			{
+				final AtomicInteger invocations = new AtomicInteger();
+				final AllocatedSlot thisSlot = new AllocatedSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0);
+				
+				SlotAllocationFuture future = new SlotAllocationFuture();
+				future.setSlot(thisSlot);
+				
+				future.setFutureAction(new SlotAllocationFutureAction() {
+					@Override
+					public void slotAllocated(AllocatedSlot slot) {
+						assertEquals(thisSlot, slot);
+						invocations.incrementAndGet();
+					}
+				});
+				
+				assertEquals(1, invocations.get());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void setSync() {
+		try {
+			// sync before setting the slot
+			{
+				final AtomicInteger invocations = new AtomicInteger();
+				final AtomicBoolean error = new AtomicBoolean();
+				
+				final AllocatedSlot thisSlot = new AllocatedSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0);
+				
+				final SlotAllocationFuture future = new SlotAllocationFuture();
+				
+				
+				Runnable r = new Runnable() {
+					@Override
+					public void run() {
+						try {
+							AllocatedSlot syncSlot = future.waitTillAllocated();
+							if (syncSlot == null || syncSlot != thisSlot) {
+								error.set(true);
+								return;
+							}
+							invocations.incrementAndGet();
+						}
+						catch (Throwable t) {
+							error.set(true);
+						}
+					}
+				};
+				
+				Thread syncer = new Thread(r);
+				syncer.start();
+				
+				// wait, and give the sync thread a chance to sync
+				Thread.sleep(10);
+				future.setSlot(thisSlot);
+				
+				syncer.join();
+				
+				assertFalse(error.get());
+				assertEquals(1, invocations.get());
+			}
+			
+			// setting slot before syncing
+			{
+				final AllocatedSlot thisSlot = new AllocatedSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0);
+				final SlotAllocationFuture future = new SlotAllocationFuture();
+
+				future.setSlot(thisSlot);
+				
+				AllocatedSlot retrieved = future.waitTillAllocated();
+				
+				assertNotNull(retrieved);
+				assertEquals(thisSlot, retrieved);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}