You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 16:42:21 UTC
[04/11] flink git commit: [FLINK-7956] [flip6] Add support for queued
scheduling with slot sharing to SlotPool
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index 41a7f02..025c795 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -18,14 +18,15 @@
package org.apache.flink.runtime.jobmanager.scheduler;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.LogicalSlot;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.util.Collections;
import java.util.Random;
@@ -49,7 +50,12 @@ import static org.junit.Assert.fail;
/**
* Tests for the scheduler when scheduling tasks in slot sharing groups.
*/
-public class SchedulerSlotSharingTest extends TestLogger {
+@RunWith(Parameterized.class)
+public class SchedulerSlotSharingTest extends SchedulerTestBase {
+
+ public SchedulerSlotSharingTest(SchedulerType schedulerType) {
+ super(schedulerType);
+ }
@Test
public void scheduleSingleVertexType() {
@@ -57,18 +63,15 @@ public class SchedulerSlotSharingTest extends TestLogger {
JobVertexID jid1 = new JobVertexID();
SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1);
-
- Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
- Instance i1 = getRandomInstance(2);
- Instance i2 = getRandomInstance(2);
- scheduler.newInstanceAvailable(i1);
- scheduler.newInstanceAvailable(i2);
+
+ final ResourceID tm1ResourceId = testingSlotProvider.addTaskManager(2).getResourceID();
+ testingSlotProvider.addTaskManager(2);
// schedule 4 tasks from the first vertex group
- LogicalSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup), false, Collections.emptyList()).get();
+ LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
assertNotNull(s1);
assertNotNull(s2);
@@ -79,7 +82,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
// we cannot schedule another task from the first vertex group
try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false, Collections.emptyList()).get();
+ testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
fail("Scheduler accepted too many tasks at the same time");
}
catch (ExecutionException e) {
@@ -93,7 +96,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
s3.releaseSlot();
// allocate another slot from that group
- LogicalSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false, Collections.emptyList()).get();
+ LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
assertNotNull(s5);
// release all old slots
@@ -101,9 +104,9 @@ public class SchedulerSlotSharingTest extends TestLogger {
s2.releaseSlot();
s4.releaseSlot();
- LogicalSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup), false, Collections.emptyList()).get();
+ LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s7 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 6, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s8 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 7, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
assertNotNull(s6);
assertNotNull(s7);
@@ -111,10 +114,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
// make sure we have two slots on the first instance, and two on the second
int c = 0;
- c += (s5.getTaskManagerLocation().equals(i1.getTaskManagerLocation())) ? 1 : -1;
- c += (s6.getTaskManagerLocation().equals(i1.getTaskManagerLocation())) ? 1 : -1;
- c += (s7.getTaskManagerLocation().equals(i1.getTaskManagerLocation())) ? 1 : -1;
- c += (s8.getTaskManagerLocation().equals(i1.getTaskManagerLocation())) ? 1 : -1;
+ c += (s5.getTaskManagerLocation().getResourceID().equals(tm1ResourceId)) ? 1 : -1;
+ c += (s6.getTaskManagerLocation().getResourceID().equals(tm1ResourceId)) ? 1 : -1;
+ c += (s7.getTaskManagerLocation().getResourceID().equals(tm1ResourceId)) ? 1 : -1;
+ c += (s8.getTaskManagerLocation().getResourceID().equals(tm1ResourceId)) ? 1 : -1;
assertEquals(0, c);
// release all
@@ -124,12 +127,12 @@ public class SchedulerSlotSharingTest extends TestLogger {
s8.releaseSlot();
// test that everything is released
- assertEquals(4, scheduler.getNumberOfAvailableSlots());
+ assertEquals(4, testingSlotProvider.getNumberOfAvailableSlots());
// check the scheduler's bookkeeping
- assertEquals(0, scheduler.getNumberOfLocalizedAssignments());
- assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
- assertEquals(8, scheduler.getNumberOfUnconstrainedAssignments());
+ assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
+ assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
+ assertEquals(8, testingSlotProvider.getNumberOfUnconstrainedAssignments());
}
catch (Exception e) {
e.printStackTrace();
@@ -138,123 +141,116 @@ public class SchedulerSlotSharingTest extends TestLogger {
}
@Test
- public void allocateSlotWithSharing() {
+ public void allocateSlotWithSharing() throws Exception {
+ JobVertexID jid1 = new JobVertexID();
+ JobVertexID jid2 = new JobVertexID();
+
+ SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
+
+ testingSlotProvider.addTaskManager(2);
+ testingSlotProvider.addTaskManager(2);
+
+ // schedule 4 tasks from the first vertex group
+ LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+
+ assertNotNull(s1);
+ assertNotNull(s2);
+ assertNotNull(s3);
+ assertNotNull(s4);
+
+ assertTrue(areAllDistinct(s1, s2, s3, s4));
+
+ // we cannot schedule another task from the first vertex group
try {
- JobVertexID jid1 = new JobVertexID();
- JobVertexID jid2 = new JobVertexID();
-
- SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
-
- Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
- scheduler.newInstanceAvailable(getRandomInstance(2));
- scheduler.newInstanceAvailable(getRandomInstance(2));
-
- // schedule 4 tasks from the first vertex group
- LogicalSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup), false, Collections.emptyList()).get();
-
- assertNotNull(s1);
- assertNotNull(s2);
- assertNotNull(s3);
- assertNotNull(s4);
-
- assertTrue(areAllDistinct(s1, s2, s3, s4));
-
- // we cannot schedule another task from the first vertex group
- try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false, Collections.emptyList()).get();
- fail("Scheduler accepted too many tasks at the same time");
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof NoResourceAvailableException);
- }
- catch (Exception e) {
- fail("Wrong exception.");
- }
-
- // schedule some tasks from the second ID group
- LogicalSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup), false, Collections.emptyList()).get();
-
- assertNotNull(s1_2);
- assertNotNull(s2_2);
- assertNotNull(s3_2);
- assertNotNull(s4_2);
-
- // we cannot schedule another task from the second vertex group
- try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false, Collections.emptyList()).get();
- fail("Scheduler accepted too many tasks at the same time");
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof NoResourceAvailableException);
- }
- catch (Exception e) {
- fail("Wrong exception.");
- }
-
- // now, we release some vertices (sub-slots) from the first group.
- // that should allow us to schedule more vertices from the first group
- s1.releaseSlot();
- s4.releaseSlot();
-
- assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
- assertEquals(2, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
- assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
-
- // we can still not schedule anything from the second group of vertices
- try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false, Collections.emptyList()).get();
- fail("Scheduler accepted too many tasks at the same time");
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof NoResourceAvailableException);
- }
- catch (Exception e) {
- fail("Wrong exception.");
- }
-
- // we can schedule something from the first vertex group
- LogicalSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false, Collections.emptyList()).get();
- assertNotNull(s5);
-
- assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
- assertEquals(1, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
- assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
-
-
- // now we release a slot from the second vertex group and schedule another task from that group
- s2_2.releaseSlot();
- LogicalSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false, Collections.emptyList()).get();
- assertNotNull(s5_2);
-
- // release all slots
- s2.releaseSlot();
- s3.releaseSlot();
- s5.releaseSlot();
-
- s1_2.releaseSlot();
- s3_2.releaseSlot();
- s4_2.releaseSlot();
- s5_2.releaseSlot();
-
- // test that everything is released
- assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
- assertEquals(4, scheduler.getNumberOfAvailableSlots());
-
- // check the scheduler's bookkeeping
- assertEquals(0, scheduler.getNumberOfLocalizedAssignments());
- assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
- assertEquals(10, scheduler.getNumberOfUnconstrainedAssignments());
+ testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ fail("Scheduler accepted too many tasks at the same time");
+ }
+ catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof NoResourceAvailableException);
}
catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ fail("Wrong exception.");
+ }
+
+ // schedule some tasks from the second ID group
+ LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+
+ assertNotNull(s1_2);
+ assertNotNull(s2_2);
+ assertNotNull(s3_2);
+ assertNotNull(s4_2);
+
+ // we cannot schedule another task from the second vertex group
+ try {
+ testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ fail("Scheduler accepted too many tasks at the same time");
+ }
+ catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof NoResourceAvailableException);
+ }
+ catch (Exception e) {
+ fail("Wrong exception.");
+ }
+
+ // now, we release some vertices (sub-slots) from the first group.
+ // that should allow us to schedule more vertices from the first group
+ s1.releaseSlot();
+ s4.releaseSlot();
+
+ assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
+ assertEquals(2, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
+ assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
+
+ // we can still not schedule anything from the second group of vertices
+ try {
+ testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ fail("Scheduler accepted too many tasks at the same time");
}
+ catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof NoResourceAvailableException);
+ }
+ catch (Exception e) {
+ fail("Wrong exception.");
+ }
+
+ // we can schedule something from the first vertex group
+ LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ assertNotNull(s5);
+
+ assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
+ assertEquals(1, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
+ assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
+
+
+ // now we release a slot from the second vertex group and schedule another task from that group
+ s2_2.releaseSlot();
+ LogicalSlot s5_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ assertNotNull(s5_2);
+
+ // release all slots
+ s2.releaseSlot();
+ s3.releaseSlot();
+ s5.releaseSlot();
+
+ s1_2.releaseSlot();
+ s3_2.releaseSlot();
+ s4_2.releaseSlot();
+ s5_2.releaseSlot();
+
+ // test that everything is released
+ assertEquals(0, testingSlotProvider.getNumberOfSlots(sharingGroup));
+ assertEquals(4, testingSlotProvider.getNumberOfAvailableSlots());
+
+ // check the scheduler's bookkeeping
+ assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
+ assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
+ assertEquals(10, testingSlotProvider.getNumberOfUnconstrainedAssignments());
}
@Test
@@ -264,56 +260,55 @@ public class SchedulerSlotSharingTest extends TestLogger {
JobVertexID jid2 = new JobVertexID();
SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
-
- Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
- scheduler.newInstanceAvailable(getRandomInstance(2));
- scheduler.newInstanceAvailable(getRandomInstance(2));
-
+
+ testingSlotProvider.addTaskManager(2);
+ testingSlotProvider.addTaskManager(2);
+
// schedule 4 tasks from the first vertex group
- LogicalSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get();
+ LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
- assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
- assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
- assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
+ assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
+ assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
+ assertEquals(4, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
s1.releaseSlot();
s2.releaseSlot();
s3.releaseSlot();
s4.releaseSlot();
- assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
- assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
- assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
+ assertEquals(0, testingSlotProvider.getNumberOfSlots(sharingGroup));
+ assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
+ assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
// schedule some tasks from the second ID group
- LogicalSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false, Collections.emptyList()).get();
+ LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
- assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
- assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
- assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
+ assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
+ assertEquals(4, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
+ assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
s1_2.releaseSlot();
s2_2.releaseSlot();
s3_2.releaseSlot();
s4_2.releaseSlot();
- assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
- assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
- assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
+ assertEquals(0, testingSlotProvider.getNumberOfSlots(sharingGroup));
+ assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
+ assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
// test that everything is released
- assertEquals(4, scheduler.getNumberOfAvailableSlots());
+ assertEquals(4, testingSlotProvider.getNumberOfAvailableSlots());
// check the scheduler's bookkeeping
- assertEquals(0, scheduler.getNumberOfLocalizedAssignments());
- assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
- assertEquals(8, scheduler.getNumberOfUnconstrainedAssignments());
+ assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
+ assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
+ assertEquals(8, testingSlotProvider.getNumberOfUnconstrainedAssignments());
}
catch (Exception e) {
e.printStackTrace();
@@ -329,16 +324,15 @@ public class SchedulerSlotSharingTest extends TestLogger {
JobVertexID jid3 = new JobVertexID();
SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3);
-
- Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
- scheduler.newInstanceAvailable(getRandomInstance(2));
- scheduler.newInstanceAvailable(getRandomInstance(2));
-
+
+ testingSlotProvider.addTaskManager(2);
+ testingSlotProvider.addTaskManager(2);
+
// schedule 4 tasks from the first vertex group
- LogicalSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get();
+ LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s3_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s4_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
assertNotNull(s1_1);
assertNotNull(s2_1);
@@ -348,10 +342,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
assertTrue(areAllDistinct(s1_1, s2_1, s3_1, s4_1));
// schedule 4 tasks from the second vertex group
- LogicalSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup), false, Collections.emptyList()).get();
+ LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
assertNotNull(s1_2);
assertNotNull(s2_2);
@@ -361,10 +355,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
assertTrue(areAllDistinct(s1_2, s2_2, s3_2, s4_2));
// schedule 4 tasks from the third vertex group
- LogicalSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup), false, Collections.emptyList()).get();
+ LogicalSlot s1_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s2_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s3_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s4_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
assertNotNull(s1_3);
assertNotNull(s2_3);
@@ -376,7 +370,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
// we cannot schedule another task from the second vertex group
try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false, Collections.emptyList()).get();
+ testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
fail("Scheduler accepted too many tasks at the same time");
}
catch (ExecutionException e) {
@@ -392,9 +386,9 @@ public class SchedulerSlotSharingTest extends TestLogger {
s3_2.releaseSlot();
s4_2.releaseSlot();
- LogicalSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s6_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s7_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup), false, Collections.emptyList()).get();
+ LogicalSlot s5_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s6_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 6, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s7_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 7, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
assertNotNull(s5_2);
assertNotNull(s6_2);
@@ -411,7 +405,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
s7_2.releaseSlot();
// test that everything is released
- assertEquals(0, scheduler.getNumberOfAvailableSlots());
+ assertEquals(0, testingSlotProvider.getNumberOfAvailableSlots());
s1_3.releaseSlot();
s2_3.releaseSlot();
@@ -419,12 +413,12 @@ public class SchedulerSlotSharingTest extends TestLogger {
s4_3.releaseSlot();
// test that everything is released
- assertEquals(4, scheduler.getNumberOfAvailableSlots());
+ assertEquals(4, testingSlotProvider.getNumberOfAvailableSlots());
// check the scheduler's bookkeeping
- assertEquals(0, scheduler.getNumberOfLocalizedAssignments());
- assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
- assertEquals(15, scheduler.getNumberOfUnconstrainedAssignments());
+ assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
+ assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
+ assertEquals(15, testingSlotProvider.getNumberOfUnconstrainedAssignments());
}
catch (Exception e) {
e.printStackTrace();
@@ -440,22 +434,21 @@ public class SchedulerSlotSharingTest extends TestLogger {
JobVertexID jid3 = new JobVertexID();
SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
-
- Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
- scheduler.newInstanceAvailable(getRandomInstance(2));
-
+
+ testingSlotProvider.addTaskManager(2);
+
// schedule 1 tasks from the first vertex group and 2 from the second
- LogicalSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup), false, Collections.emptyList()).get();
+ LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 2, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 2, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
assertNotNull(s1_1);
assertNotNull(s2_1);
assertNotNull(s2_2);
- assertEquals(2, sharingGroup.getTaskAssignment().getNumberOfSlots());
- assertEquals(1, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
- assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
+ assertEquals(2, testingSlotProvider.getNumberOfSlots(sharingGroup));
+ assertEquals(1, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
+ assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
// release the two from the second
s2_1.releaseSlot();
@@ -463,17 +456,17 @@ public class SchedulerSlotSharingTest extends TestLogger {
// this should free one slot so we can allocate one non-shared
- LogicalSlot sx = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1)), false, Collections.emptyList()).get();
+ LogicalSlot sx = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, null)), false, Collections.emptyList()).get();
assertNotNull(sx);
- assertEquals(1, sharingGroup.getTaskAssignment().getNumberOfSlots());
- assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
- assertEquals(1, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
+ assertEquals(1, testingSlotProvider.getNumberOfSlots(sharingGroup));
+ assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
+ assertEquals(1, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
// check the scheduler's bookkeeping
- assertEquals(0, scheduler.getNumberOfLocalizedAssignments());
- assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
- assertEquals(4, scheduler.getNumberOfUnconstrainedAssignments());
+ assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
+ assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
+ assertEquals(4, testingSlotProvider.getNumberOfUnconstrainedAssignments());
}
catch (Exception e) {
e.printStackTrace();
@@ -492,34 +485,33 @@ public class SchedulerSlotSharingTest extends TestLogger {
JobVertexID jidC = new JobVertexID();
SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
-
- Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
- scheduler.newInstanceAvailable(getRandomInstance(3));
- scheduler.newInstanceAvailable(getRandomInstance(2));
-
+
+ testingSlotProvider.addTaskManager(3);
+ testingSlotProvider.addTaskManager(2);
+
// schedule some individual vertices
- LogicalSlot sA1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 0, 2)), false, Collections.emptyList()).get();
- LogicalSlot sA2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 1, 2)), false, Collections.emptyList()).get();
+ LogicalSlot sA2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 1, 2, null)), false, Collections.emptyList()).get();
+ LogicalSlot sA1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 0, 2, null)), false, Collections.emptyList()).get();
assertNotNull(sA1);
assertNotNull(sA2);
// schedule some vertices in the sharing group
- LogicalSlot s1_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s2_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+ LogicalSlot s1_0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s2_0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
assertNotNull(s1_0);
assertNotNull(s1_1);
assertNotNull(s2_0);
assertNotNull(s2_1);
// schedule another isolated vertex
- LogicalSlot sB1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 1, 3)), false, Collections.emptyList()).get();
+ LogicalSlot sB1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 1, 3, null)), false, Collections.emptyList()).get();
assertNotNull(sB1);
// should not be able to schedule more vertices
try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+ testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
fail("Scheduler accepted too many tasks at the same time");
}
catch (ExecutionException e) {
@@ -530,7 +522,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
}
try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+ testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
fail("Scheduler accepted too many tasks at the same time");
}
catch (ExecutionException e) {
@@ -541,7 +533,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
}
try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false, Collections.emptyList()).get();
+ testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3, null)), false, Collections.emptyList()).get();
fail("Scheduler accepted too many tasks at the same time");
}
catch (ExecutionException e) {
@@ -552,7 +544,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
}
try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1)), false, Collections.emptyList()).get();
+ testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1, null)), false, Collections.emptyList()).get();
fail("Scheduler accepted too many tasks at the same time");
}
catch (ExecutionException e) {
@@ -565,8 +557,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
// release some isolated task and check that the sharing group may grow
sA1.releaseSlot();
- LogicalSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false, Collections.emptyList()).get();
+ LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
assertNotNull(s1_2);
assertNotNull(s2_2);
@@ -575,22 +567,22 @@ public class SchedulerSlotSharingTest extends TestLogger {
s1_1.releaseSlot();
s2_0.releaseSlot();
- assertEquals(1, scheduler.getNumberOfAvailableSlots());
+ assertEquals(1, testingSlotProvider.getNumberOfAvailableSlots());
// schedule one more no-shared task
- LogicalSlot sB0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false, Collections.emptyList()).get();
+ LogicalSlot sB0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3, null)), false, Collections.emptyList()).get();
assertNotNull(sB0);
// release the last of the original shared slots and allocate one more non-shared slot
s2_1.releaseSlot();
- LogicalSlot sB2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 2, 3)), false, Collections.emptyList()).get();
+ LogicalSlot sB2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 2, 3, null)), false, Collections.emptyList()).get();
assertNotNull(sB2);
// release on non-shared and add some shared slots
sA2.releaseSlot();
- LogicalSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+ LogicalSlot s1_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s2_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
assertNotNull(s1_3);
assertNotNull(s2_3);
@@ -600,8 +592,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
s1_3.releaseSlot();
s2_3.releaseSlot();
- LogicalSlot sC0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 1, 2)), false, Collections.emptyList()).get();
- LogicalSlot sC1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 2)), false, Collections.emptyList()).get();
+ LogicalSlot sC0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 1, 2, null)), false, Collections.emptyList()).get();
+ LogicalSlot sC1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 2, null)), false, Collections.emptyList()).get();
assertNotNull(sC0);
assertNotNull(sC1);
@@ -613,12 +605,12 @@ public class SchedulerSlotSharingTest extends TestLogger {
sC1.releaseSlot();
// test that everything is released
- assertEquals(5, scheduler.getNumberOfAvailableSlots());
+ assertEquals(5, testingSlotProvider.getNumberOfAvailableSlots());
// check the scheduler's bookkeeping
- assertEquals(0, scheduler.getNumberOfLocalizedAssignments());
- assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
- assertEquals(15, scheduler.getNumberOfUnconstrainedAssignments());
+ assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
+ assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
+ assertEquals(15, testingSlotProvider.getNumberOfUnconstrainedAssignments());
}
catch (Exception e) {
e.printStackTrace();
@@ -637,41 +629,34 @@ public class SchedulerSlotSharingTest extends TestLogger {
SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
- Instance i1 = getRandomInstance(2);
- Instance i2 = getRandomInstance(2);
-
- TaskManagerLocation loc1 = i1.getTaskManagerLocation();
- TaskManagerLocation loc2 = i2.getTaskManagerLocation();
-
- Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
- scheduler.newInstanceAvailable(i1);
- scheduler.newInstanceAvailable(i2);
-
+ TaskManagerLocation loc1 = testingSlotProvider.addTaskManager(2);
+ TaskManagerLocation loc2 = testingSlotProvider.addTaskManager(2);
// schedule one to each instance
- LogicalSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
- LogicalSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
+ LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1)).get();
+ LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc2)).get();
assertNotNull(s1);
assertNotNull(s2);
- assertEquals(2, sharingGroup.getTaskAssignment().getNumberOfSlots());
- assertEquals(1, i1.getNumberOfAvailableSlots());
- assertEquals(1, i2.getNumberOfAvailableSlots());
+ assertEquals(2, testingSlotProvider.getNumberOfSlots(sharingGroup));
+ assertEquals(loc1, s1.getTaskManagerLocation());
+ assertEquals(loc2, s2.getTaskManagerLocation());
// schedule one from the other group to each instance
- LogicalSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
- LogicalSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
+ LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1)).get();
+ LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc2)).get();
assertNotNull(s3);
assertNotNull(s4);
- assertEquals(2, sharingGroup.getTaskAssignment().getNumberOfSlots());
- assertEquals(1, i1.getNumberOfAvailableSlots());
- assertEquals(1, i2.getNumberOfAvailableSlots());
+ assertEquals(2, testingSlotProvider.getNumberOfSlots(sharingGroup));
+ assertEquals(loc1, s3.getTaskManagerLocation());
+ assertEquals(loc2, s4.getTaskManagerLocation());
+ assertEquals(2, testingSlotProvider.getNumberOfAvailableSlots());
// check the scheduler's bookkeeping
- assertEquals(4, scheduler.getNumberOfLocalizedAssignments());
- assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
- assertEquals(0, scheduler.getNumberOfUnconstrainedAssignments());
+ assertEquals(4, testingSlotProvider.getNumberOfLocalizedAssignments());
+ assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
+ assertEquals(0, testingSlotProvider.getNumberOfUnconstrainedAssignments());
}
catch (Exception e) {
e.printStackTrace();
@@ -690,41 +675,33 @@ public class SchedulerSlotSharingTest extends TestLogger {
SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
- Instance i1 = getRandomInstance(2);
- Instance i2 = getRandomInstance(2);
-
- TaskManagerLocation loc1 = i1.getTaskManagerLocation();
- TaskManagerLocation loc2 = i2.getTaskManagerLocation();
+ TaskManagerLocation loc1 = testingSlotProvider.addTaskManager(2);
+ TaskManagerLocation loc2 = testingSlotProvider.addTaskManager(2);
- Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
- scheduler.newInstanceAvailable(i1);
- scheduler.newInstanceAvailable(i2);
-
-
// schedule one to each instance
- LogicalSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
- LogicalSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+ LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1)).get();
+ LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1)).get();
assertNotNull(s1);
assertNotNull(s2);
- assertEquals(2, sharingGroup.getTaskAssignment().getNumberOfSlots());
- assertEquals(0, i1.getNumberOfAvailableSlots());
- assertEquals(2, i2.getNumberOfAvailableSlots());
+ assertEquals(2, testingSlotProvider.getNumberOfSlots(sharingGroup));
+ assertEquals(loc1, s1.getTaskManagerLocation());
+ assertEquals(loc1, s2.getTaskManagerLocation());
// schedule one from the other group to each instance
- LogicalSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
- LogicalSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
+ LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc2)).get();
+ LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc2)).get();
assertNotNull(s3);
assertNotNull(s4);
- assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
- assertEquals(0, i1.getNumberOfAvailableSlots());
- assertEquals(0, i2.getNumberOfAvailableSlots());
+ assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
+ assertEquals(loc2, s3.getTaskManagerLocation());
+ assertEquals(loc2, s4.getTaskManagerLocation());
// check the scheduler's bookkeeping
- assertEquals(4, scheduler.getNumberOfLocalizedAssignments());
- assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
- assertEquals(0, scheduler.getNumberOfUnconstrainedAssignments());
+ assertEquals(4, testingSlotProvider.getNumberOfLocalizedAssignments());
+ assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
+ assertEquals(0, testingSlotProvider.getNumberOfUnconstrainedAssignments());
}
catch (Exception e) {
e.printStackTrace();
@@ -743,24 +720,18 @@ public class SchedulerSlotSharingTest extends TestLogger {
SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
- Instance i1 = getRandomInstance(2);
- Instance i2 = getRandomInstance(2);
+ TaskManagerLocation loc1 = testingSlotProvider.addTaskManager(2);
+ TaskManagerLocation loc2 = testingSlotProvider.addTaskManager(2);
- TaskManagerLocation loc1 = i1.getTaskManagerLocation();
-
- Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
- scheduler.newInstanceAvailable(i1);
- scheduler.newInstanceAvailable(i2);
-
// schedule until the one instance is full
- LogicalSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
- LogicalSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
- LogicalSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
- LogicalSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+ LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1)).get();
+ LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1)).get();
+ LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1)).get();
+ LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1)).get();
// schedule two more with preference of same instance --> need to go to other instance
- LogicalSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
- LogicalSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+ LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1)).get();
+ LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, Collections.singleton(loc1)).get();
assertNotNull(s1);
assertNotNull(s2);
@@ -769,22 +740,21 @@ public class SchedulerSlotSharingTest extends TestLogger {
assertNotNull(s5);
assertNotNull(s6);
- assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
-
- assertEquals(0, i1.getNumberOfAvailableSlots());
- assertEquals(0, i2.getNumberOfAvailableSlots());
-
- assertEquals(i1.getTaskManagerLocation(), s1.getTaskManagerLocation());
- assertEquals(i1.getTaskManagerLocation(), s2.getTaskManagerLocation());
- assertEquals(i1.getTaskManagerLocation(), s3.getTaskManagerLocation());
- assertEquals(i1.getTaskManagerLocation(), s4.getTaskManagerLocation());
- assertEquals(i2.getTaskManagerLocation(), s5.getTaskManagerLocation());
- assertEquals(i2.getTaskManagerLocation(), s6.getTaskManagerLocation());
+ assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
+
+ assertEquals(loc1, s1.getTaskManagerLocation());
+ assertEquals(loc1, s2.getTaskManagerLocation());
+ assertEquals(loc1, s3.getTaskManagerLocation());
+ assertEquals(loc1, s4.getTaskManagerLocation());
+ assertEquals(loc2, s5.getTaskManagerLocation());
+ assertEquals(loc2, s6.getTaskManagerLocation());
// check the scheduler's bookkeeping
- assertEquals(4, scheduler.getNumberOfLocalizedAssignments());
- assertEquals(2, scheduler.getNumberOfNonLocalizedAssignments());
- assertEquals(0, scheduler.getNumberOfUnconstrainedAssignments());
+ assertEquals(4, testingSlotProvider.getNumberOfLocalizedAssignments());
+ // Flip-6 supports host localized assignments which happen in this case because all TaskManagerLocations point to the loopback address
+ assertTrue(2 == testingSlotProvider.getNumberOfNonLocalizedAssignments() || 2 == testingSlotProvider.getNumberOfHostLocalizedAssignments());
+
+ assertEquals(0, testingSlotProvider.getNumberOfUnconstrainedAssignments());
}
catch (Exception e) {
e.printStackTrace();
@@ -802,23 +772,22 @@ public class SchedulerSlotSharingTest extends TestLogger {
final SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4);
- final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
- scheduler.newInstanceAvailable(getRandomInstance(4));
-
+ testingSlotProvider.addTaskManager(4);
+
// allocate something from group 1 and 2 interleaved with schedule for group 3
- LogicalSlot slot_1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot slot_1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+ LogicalSlot slot_1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot slot_1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
- LogicalSlot slot_2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot slot_2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+ LogicalSlot slot_2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot slot_2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
- LogicalSlot slot_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false, Collections.emptyList()).get();
+ LogicalSlot slot_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
- LogicalSlot slot_1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot slot_1_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get();
+ LogicalSlot slot_1_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot slot_1_4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
- LogicalSlot slot_2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot slot_2_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false, Collections.emptyList()).get();
+ LogicalSlot slot_2_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot slot_2_4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
// release groups 1 and 2
@@ -834,10 +803,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
// allocate group 4
- LogicalSlot slot_4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot slot_4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot slot_4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot slot_4_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false, Collections.emptyList()).get();
+ LogicalSlot slot_4_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot slot_4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot slot_4_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot slot_4_4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
// release groups 3 and 4
@@ -859,6 +828,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
final ExecutorService executor = Executors.newFixedThreadPool(20);
try {
+ testingSlotProvider.addTaskManager(4);
+
for (int run = 0; run < 50; run++) {
final JobVertexID jid1 = new JobVertexID();
final JobVertexID jid2 = new JobVertexID();
@@ -866,10 +837,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
final JobVertexID jid4 = new JobVertexID();
final SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4);
-
- final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
- scheduler.newInstanceAvailable(getRandomInstance(4));
-
+
final AtomicInteger enumerator1 = new AtomicInteger();
final AtomicInteger enumerator2 = new AtomicInteger();
final AtomicBoolean flag3 = new AtomicBoolean();
@@ -883,13 +851,11 @@ public class SchedulerSlotSharingTest extends TestLogger {
// use atomic integer as a mutable integer reference
final AtomicInteger completed = new AtomicInteger();
-
final Runnable deploy4 = new Runnable() {
@Override
public void run() {
try {
- LogicalSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4), sharingGroup), false, Collections.emptyList()).get();
-
+ LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
sleepUninterruptibly(rnd.nextInt(5));
slot.releaseSlot();
@@ -911,8 +877,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
public void run() {
try {
if (flag3.compareAndSet(false, true)) {
- LogicalSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false, Collections.emptyList()).get();
-
+ LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
sleepUninterruptibly(5);
executor.execute(deploy4);
@@ -940,8 +905,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
@Override
public void run() {
try {
- LogicalSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4), sharingGroup), false, Collections.emptyList()).get();
-
+ LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+
// wait a bit till scheduling the successor
sleepUninterruptibly(rnd.nextInt(5));
executor.execute(deploy3);
@@ -967,8 +932,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
@Override
public void run() {
try {
- LogicalSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4), sharingGroup), false, Collections.emptyList()).get();
-
+ LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+
// wait a bit till scheduling the successor
sleepUninterruptibly(rnd.nextInt(5));
executor.execute(deploy2);
@@ -1012,14 +977,12 @@ public class SchedulerSlotSharingTest extends TestLogger {
assertFalse("Thread failed", failed.get());
- while (scheduler.getNumberOfAvailableSlots() < 4) {
+ while (testingSlotProvider.getNumberOfAvailableSlots() < 4) {
sleepUninterruptibly(5);
}
- assertEquals(1, scheduler.getNumberOfAvailableInstances());
- assertEquals(1, scheduler.getNumberOfInstancesWithAvailableSlots());
- assertEquals(4, scheduler.getNumberOfAvailableSlots());
- assertEquals(13, scheduler.getNumberOfUnconstrainedAssignments());
+ assertEquals(4, testingSlotProvider.getNumberOfAvailableSlots());
+ assertEquals(13 * (run + 1), testingSlotProvider.getNumberOfUnconstrainedAssignments());
}
}
catch (Exception e) {
@@ -1042,27 +1005,27 @@ public class SchedulerSlotSharingTest extends TestLogger {
scheduler.newInstanceAvailable(getRandomInstance(4));
// schedule one task for the first and second vertex
- LogicalSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup), false, Collections.emptyList()).get();
+ LogicalSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
assertEquals( s1.getTaskManagerLocation(), s2.getTaskManagerLocation() );
assertEquals(3, scheduler.getNumberOfAvailableSlots());
- LogicalSlot s3_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s4_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+ LogicalSlot s3_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s4_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
s1.releaseSlot();
s2.releaseSlot();
- LogicalSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false, Collections.emptyList()).get();
- LogicalSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false, Collections.emptyList()).get();
+ LogicalSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
+ LogicalSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup), false, Collections.emptyList()).get();
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, Collections.emptyList()).get();
fail("should throw an exception");
}
catch (ExecutionException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
new file mode 100644
index 0000000..8fd5f9e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class SchedulerTest extends TestLogger {
+
+ @Test
+ public void testAddAndRemoveInstance() {
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+
+ Instance i1 = getRandomInstance(2);
+ Instance i2 = getRandomInstance(2);
+ Instance i3 = getRandomInstance(2);
+
+ assertEquals(0, scheduler.getNumberOfAvailableInstances());
+ assertEquals(0, scheduler.getNumberOfAvailableSlots());
+ scheduler.newInstanceAvailable(i1);
+ assertEquals(1, scheduler.getNumberOfAvailableInstances());
+ assertEquals(2, scheduler.getNumberOfAvailableSlots());
+ scheduler.newInstanceAvailable(i2);
+ assertEquals(2, scheduler.getNumberOfAvailableInstances());
+ assertEquals(4, scheduler.getNumberOfAvailableSlots());
+ scheduler.newInstanceAvailable(i3);
+ assertEquals(3, scheduler.getNumberOfAvailableInstances());
+ assertEquals(6, scheduler.getNumberOfAvailableSlots());
+
+ // cannot add available instance again
+ try {
+ scheduler.newInstanceAvailable(i2);
+ fail("Scheduler accepted instance twice");
+ }
+ catch (IllegalArgumentException e) {
+ // bueno!
+ }
+
+ // some instances die
+ assertEquals(3, scheduler.getNumberOfAvailableInstances());
+ assertEquals(6, scheduler.getNumberOfAvailableSlots());
+ scheduler.instanceDied(i2);
+ assertEquals(2, scheduler.getNumberOfAvailableInstances());
+ assertEquals(4, scheduler.getNumberOfAvailableSlots());
+
+ // try to add a dead instance
+ try {
+ scheduler.newInstanceAvailable(i2);
+ fail("Scheduler accepted dead instance");
+ }
+ catch (IllegalArgumentException e) {
+ // stimmt
+
+ }
+
+ scheduler.instanceDied(i1);
+ assertEquals(1, scheduler.getNumberOfAvailableInstances());
+ assertEquals(2, scheduler.getNumberOfAvailableSlots());
+ scheduler.instanceDied(i3);
+ assertEquals(0, scheduler.getNumberOfAvailableInstances());
+ assertEquals(0, scheduler.getNumberOfAvailableSlots());
+
+ assertFalse(i1.isAlive());
+ assertFalse(i2.isAlive());
+ assertFalse(i3.isAlive());
+ }
+}