You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/02 17:01:13 UTC
[2/4] flink git commit: [FLINK-7153] Introduce
LocationPreferenceConstraint for scheduling
http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index 9f4a675..2d35ce2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import java.util.Collections;
import java.util.concurrent.ExecutionException;
public class ScheduleWithCoLocationHintTest extends TestLogger {
@@ -66,18 +67,18 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
CoLocationConstraint c6 = new CoLocationConstraint(ccg);
// schedule 4 tasks from the first vertex group
- SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1), false).get();
- SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2), false).get();
- SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3), false).get();
- SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4), false).get();
- SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1), false).get();
- SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2), false).get();
- SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3), false).get();
- SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5), false).get();
- SimpleSlot s9 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6), false).get();
- SimpleSlot s10 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4), false).get();
- SimpleSlot s11 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5), false).get();
- SimpleSlot s12 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6), false).get();
+ SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1), false, Collections.emptyList()).get();
+ SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2), false, Collections.emptyList()).get();
+ SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3), false, Collections.emptyList()).get();
+ SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4), false, Collections.emptyList()).get();
+ SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1), false, Collections.emptyList()).get();
+ SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2), false, Collections.emptyList()).get();
+ SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3), false, Collections.emptyList()).get();
+ SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5), false, Collections.emptyList()).get();
+ SimpleSlot s9 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6), false, Collections.emptyList()).get();
+ SimpleSlot s10 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4), false, Collections.emptyList()).get();
+ SimpleSlot s11 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5), false, Collections.emptyList()).get();
+ SimpleSlot s12 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6), false, Collections.emptyList()).get();
assertNotNull(s1);
assertNotNull(s2);
@@ -140,7 +141,7 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
assertTrue(scheduler.getNumberOfAvailableSlots() >= 1);
SimpleSlot single = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1)), false).get();
+ new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1)), false, Collections.emptyList()).get();
assertNotNull(single);
s1.releaseSlot();
@@ -188,11 +189,11 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup());
SimpleSlot s1 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false).get();
+ new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get();
SimpleSlot s2 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1), false).get();
+ new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get();
- SimpleSlot sSolo = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 1)), false).get();
+ SimpleSlot sSolo = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 1)), false, Collections.emptyList()).get();
ResourceID taskManager = s1.getTaskManagerID();
@@ -201,7 +202,7 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
sSolo.releaseSlot();
SimpleSlot sNew = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false).get();
+ new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get();
assertEquals(taskManager, sNew.getTaskManagerID());
assertEquals(2, scheduler.getNumberOfLocalizedAssignments());
@@ -235,14 +236,14 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup());
SimpleSlot s1 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false).get();
+ new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get();
s1.releaseSlot();
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1)), false).get();
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2)), false).get();
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1)), false, Collections.emptyList()).get();
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2)), false, Collections.emptyList()).get();
try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false).get();
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get();
fail("Scheduled even though no resource was available.");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof NoResourceAvailableException);
@@ -283,35 +284,35 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
SlotSharingGroup shareGroup = new SlotSharingGroup();
// first wave
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), shareGroup), false);
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), shareGroup), false);
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), shareGroup), false);
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), shareGroup), false);
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), shareGroup), false, Collections.emptyList());
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), shareGroup), false, Collections.emptyList());
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), shareGroup), false, Collections.emptyList());
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), shareGroup), false, Collections.emptyList());
// second wave
SimpleSlot s21 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1), false).get();
+ new ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1), false, Collections.emptyList()).get();
SimpleSlot s22 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2), false).get();
+ new ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2), false, Collections.emptyList()).get();
SimpleSlot s23 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3), false).get();
+ new ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3), false, Collections.emptyList()).get();
SimpleSlot s24 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4), false).get();
+ new ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4), false, Collections.emptyList()).get();
// third wave
SimpleSlot s31 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2), false).get();
+ new ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2), false, Collections.emptyList()).get();
SimpleSlot s32 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3), false).get();
+ new ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3), false, Collections.emptyList()).get();
SimpleSlot s33 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4), false).get();
+ new ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4), false, Collections.emptyList()).get();
SimpleSlot s34 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1), false).get();
+ new ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1), false, Collections.emptyList()).get();
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), shareGroup), false);
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), shareGroup), false);
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), shareGroup), false);
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), shareGroup), false);
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), shareGroup), false, Collections.emptyList());
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), shareGroup), false, Collections.emptyList());
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), shareGroup), false, Collections.emptyList());
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), shareGroup), false, Collections.emptyList());
assertEquals(s21.getTaskManagerID(), s34.getTaskManagerID());
assertEquals(s22.getTaskManagerID(), s31.getTaskManagerID());
@@ -357,25 +358,25 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
// schedule something into the shared group so that both instances are in the sharing group
SimpleSlot s1 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
SimpleSlot s2 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
// schedule one locally to instance 1
SimpleSlot s3 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc1), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get();
// schedule with co location constraint (yet unassigned) and a preference for
// instance 1, but it can only get instance 2
SimpleSlot s4 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get();
// schedule something into the assigned co-location constraints and check that they override the
// other preferences
SimpleSlot s5 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, loc2), sharingGroup, cc1), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, loc2), sharingGroup, cc1), false, Collections.singleton(loc2)).get();
SimpleSlot s6 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, loc1), sharingGroup, cc2), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get();
// check that each slot got three
assertEquals(3, s1.getRoot().getNumberLeaves());
@@ -434,9 +435,9 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
SimpleSlot s1 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get();
SimpleSlot s2 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false, Collections.singleton(loc2)).get();
s1.releaseSlot();
s2.releaseSlot();
@@ -445,9 +446,9 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
SimpleSlot s3 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false, Collections.singleton(loc2)).get();
SimpleSlot s4 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get();
// still preserves the previous instance mapping)
assertEquals(i1.getTaskManagerID(), s3.getTaskManagerID());
@@ -495,9 +496,9 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
SimpleSlot s1 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get();
SimpleSlot s2 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false, Collections.singleton(loc2)).get();
s1.releaseSlot();
s2.releaseSlot();
@@ -506,13 +507,13 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
SimpleSlot sa = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)), false, Collections.emptyList()).get();
SimpleSlot sb = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)), false, Collections.emptyList()).get();
try {
scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false, Collections.singleton(loc2)).get();
fail("should not be able to find a resource");
}
catch (ExecutionException e) {
@@ -565,14 +566,14 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
// and give locality preferences that hint at using the same shared slot for both
// co location constraints (which we seek to prevent)
SimpleSlot s1 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get();
SimpleSlot s2 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc2), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get();
SimpleSlot s3 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc1), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get();
SimpleSlot s4 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup, cc2), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get();
// check that each slot got three
assertEquals(2, s1.getRoot().getNumberLeaves());
@@ -631,14 +632,14 @@ public class ScheduleWithCoLocationHintTest extends TestLogger {
CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
SimpleSlot s1 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.emptyList()).get();
SimpleSlot s2 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false, Collections.emptyList()).get();
SimpleSlot s3 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false, Collections.emptyList()).get();
SimpleSlot s4 = scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup), false).get();
+ new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup), false, Collections.emptyList()).get();
// check that each slot got two
assertEquals(2, s1.getRoot().getNumberLeaves());
http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index a05c1a3..7882f4a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -25,6 +25,8 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -123,17 +125,17 @@ public class SchedulerIsolatedTasksTest {
assertEquals(5, scheduler.getNumberOfAvailableSlots());
// schedule something into all slots
- SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
- SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
- SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
- SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
- SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
+ SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
+ SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
+ SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
+ SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
+ SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
// the slots should all be different
assertTrue(areAllDistinct(s1, s2, s3, s4, s5));
try {
- scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
+ scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
fail("Scheduler accepted scheduling request without available resource.");
}
catch (ExecutionException e) {
@@ -146,8 +148,8 @@ public class SchedulerIsolatedTasksTest {
assertEquals(2, scheduler.getNumberOfAvailableSlots());
// now we can schedule some more slots
- SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
- SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
+ SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
+ SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
assertTrue(areAllDistinct(s1, s2, s3, s4, s5, s6, s7));
@@ -235,7 +237,7 @@ public class SchedulerIsolatedTasksTest {
disposeThread.start();
for (int i = 0; i < NUM_TASKS_TO_SCHEDULE; i++) {
- CompletableFuture<SimpleSlot> future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true);
+ CompletableFuture<SimpleSlot> future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true, Collections.emptyList());
future.thenAcceptAsync(
(SimpleSlot slot) -> {
synchronized (toRelease) {
@@ -284,11 +286,11 @@ public class SchedulerIsolatedTasksTest {
scheduler.newInstanceAvailable(i3);
List<SimpleSlot> slots = new ArrayList<SimpleSlot>();
- slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
- slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
- slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
- slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
- slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
+ slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get());
+ slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get());
+ slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get());
+ slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get());
+ slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get());
i2.markDead();
@@ -309,7 +311,7 @@ public class SchedulerIsolatedTasksTest {
// cannot get another slot, since all instances are dead
try {
- scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
+ scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
fail("Scheduler served a slot from a dead instance");
}
catch (ExecutionException e) {
@@ -344,7 +346,7 @@ public class SchedulerIsolatedTasksTest {
scheduler.newInstanceAvailable(i3);
// schedule something on an arbitrary instance
- SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(new Instance[0])), false).get();
+ SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(new Instance[0])), false, Collections.emptyList()).get();
// figure out how we use the location hints
Instance first = (Instance) s1.getOwner();
@@ -352,28 +354,28 @@ public class SchedulerIsolatedTasksTest {
Instance third = first == i3 ? i2 : i3;
// something that needs to go to the first instance again
- SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())), false).get();
+ SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())), false, Collections.singleton(s1.getTaskManagerLocation())).get();
assertEquals(first, s2.getOwner());
// first or second --> second, because first is full
- SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, second)), false).get();
+ SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, second)), false, Arrays.asList(first.getTaskManagerLocation(), second.getTaskManagerLocation())).get();
assertEquals(second, s3.getOwner());
// first or third --> third (because first is full)
- SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get();
- SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get();
+ SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
+ SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
assertEquals(third, s4.getOwner());
assertEquals(third, s5.getOwner());
// first or third --> second, because all others are full
- SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get();
+ SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
assertEquals(second, s6.getOwner());
// release something on the first and second instance
s2.releaseSlot();
s6.releaseSlot();
- SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get();
+ SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
assertEquals(first, s7.getOwner());
assertEquals(1, scheduler.getNumberOfUnconstrainedAssignments());
http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index 1f88dd8..a478eb9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import java.util.Collections;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -64,10 +65,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
scheduler.newInstanceAvailable(i2);
// schedule 4 tasks from the first vertex group
- SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup), false).get();
- SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup), false).get();
- SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup), false).get();
- SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup), false).get();
+ SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup), false, Collections.emptyList()).get();
assertNotNull(s1);
assertNotNull(s2);
@@ -78,7 +79,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
// we cannot schedule another task from the first vertex group
try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false).get();
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false, Collections.emptyList()).get();
fail("Scheduler accepted too many tasks at the same time");
}
catch (ExecutionException e) {
@@ -92,7 +93,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
s3.releaseSlot();
// allocate another slot from that group
- SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false).get();
+ SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false, Collections.emptyList()).get();
assertNotNull(s5);
// release all old slots
@@ -100,9 +101,9 @@ public class SchedulerSlotSharingTest extends TestLogger {
s2.releaseSlot();
s4.releaseSlot();
- SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup), false).get();
- SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup), false).get();
- SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup), false).get();
+ SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup), false, Collections.emptyList()).get();
assertNotNull(s6);
assertNotNull(s7);
@@ -149,10 +150,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
scheduler.newInstanceAvailable(getRandomInstance(2));
// schedule 4 tasks from the first vertex group
- SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup), false).get();
- SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup), false).get();
- SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup), false).get();
- SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup), false).get();
+ SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup), false, Collections.emptyList()).get();
assertNotNull(s1);
assertNotNull(s2);
@@ -163,7 +164,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
// we cannot schedule another task from the first vertex group
try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get();
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false, Collections.emptyList()).get();
fail("Scheduler accepted too many tasks at the same time");
}
catch (ExecutionException e) {
@@ -174,10 +175,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
}
// schedule some tasks from the second ID group
- SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup), false).get();
- SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup), false).get();
- SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup), false).get();
- SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup), false).get();
+ SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup), false, Collections.emptyList()).get();
assertNotNull(s1_2);
assertNotNull(s2_2);
@@ -186,7 +187,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
// we cannot schedule another task from the second vertex group
try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get();
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false, Collections.emptyList()).get();
fail("Scheduler accepted too many tasks at the same time");
}
catch (ExecutionException e) {
@@ -207,7 +208,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
// we can still not schedule anything from the second group of vertices
try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get();
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false, Collections.emptyList()).get();
fail("Scheduler accepted too many tasks at the same time");
}
catch (ExecutionException e) {
@@ -218,7 +219,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
}
// we can schedule something from the first vertex group
- SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get();
+ SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false, Collections.emptyList()).get();
assertNotNull(s5);
assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
@@ -228,7 +229,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
// now we release a slot from the second vertex group and schedule another task from that group
s2_2.releaseSlot();
- SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get();
+ SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false, Collections.emptyList()).get();
assertNotNull(s5_2);
// release all slots
@@ -269,10 +270,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
scheduler.newInstanceAvailable(getRandomInstance(2));
// schedule 4 tasks from the first vertex group
- SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get();
- SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get();
- SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
- SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get();
+ SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get();
assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
@@ -288,10 +289,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
// schedule some tasks from the second ID group
- SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get();
- SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get();
- SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get();
- SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get();
+ SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false, Collections.emptyList()).get();
assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
@@ -334,10 +335,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
scheduler.newInstanceAvailable(getRandomInstance(2));
// schedule 4 tasks from the first vertex group
- SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get();
- SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get();
- SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
- SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get();
+ SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get();
assertNotNull(s1_1);
assertNotNull(s2_1);
@@ -347,10 +348,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
assertTrue(areAllDistinct(s1_1, s2_1, s3_1, s4_1));
// schedule 4 tasks from the second vertex group
- SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup), false).get();
- SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup), false).get();
- SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup), false).get();
- SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup), false).get();
+ SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup), false, Collections.emptyList()).get();
assertNotNull(s1_2);
assertNotNull(s2_2);
@@ -360,10 +361,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
assertTrue(areAllDistinct(s1_2, s2_2, s3_2, s4_2));
// schedule 4 tasks from the third vertex group
- SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup), false).get();
- SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup), false).get();
- SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup), false).get();
- SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup), false).get();
+ SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup), false, Collections.emptyList()).get();
assertNotNull(s1_3);
assertNotNull(s2_3);
@@ -375,7 +376,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
// we cannot schedule another task from the second vertex group
try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get();
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false, Collections.emptyList()).get();
fail("Scheduler accepted too many tasks at the same time");
}
catch (ExecutionException e) {
@@ -391,9 +392,9 @@ public class SchedulerSlotSharingTest extends TestLogger {
s3_2.releaseSlot();
s4_2.releaseSlot();
- SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup), false).get();
- SimpleSlot s6_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup), false).get();
- SimpleSlot s7_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup), false).get();
+ SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s6_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s7_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup), false, Collections.emptyList()).get();
assertNotNull(s5_2);
assertNotNull(s6_2);
@@ -444,9 +445,9 @@ public class SchedulerSlotSharingTest extends TestLogger {
scheduler.newInstanceAvailable(getRandomInstance(2));
// schedule 1 tasks from the first vertex group and 2 from the second
- SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup), false).get();
- SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup), false).get();
- SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup), false).get();
+ SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup), false, Collections.emptyList()).get();
assertNotNull(s1_1);
assertNotNull(s2_1);
@@ -462,7 +463,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
// this should free one slot so we can allocate one non-shared
- SimpleSlot sx = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1)), false).get();
+ SimpleSlot sx = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1)), false, Collections.emptyList()).get();
assertNotNull(sx);
assertEquals(1, sharingGroup.getTaskAssignment().getNumberOfSlots());
@@ -497,28 +498,28 @@ public class SchedulerSlotSharingTest extends TestLogger {
scheduler.newInstanceAvailable(getRandomInstance(2));
// schedule some individual vertices
- SimpleSlot sA1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 0, 2)), false).get();
- SimpleSlot sA2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 1, 2)), false).get();
+ SimpleSlot sA1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 0, 2)), false, Collections.emptyList()).get();
+ SimpleSlot sA2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 1, 2)), false, Collections.emptyList()).get();
assertNotNull(sA1);
assertNotNull(sA2);
// schedule some vertices in the sharing group
- SimpleSlot s1_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get();
- SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get();
- SimpleSlot s2_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get();
- SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get();
+ SimpleSlot s1_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s2_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false, Collections.emptyList()).get();
assertNotNull(s1_0);
assertNotNull(s1_1);
assertNotNull(s2_0);
assertNotNull(s2_1);
// schedule another isolated vertex
- SimpleSlot sB1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 1, 3)), false).get();
+ SimpleSlot sB1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 1, 3)), false, Collections.emptyList()).get();
assertNotNull(sB1);
// should not be able to schedule more vertices
try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
fail("Scheduler accepted too many tasks at the same time");
}
catch (ExecutionException e) {
@@ -529,7 +530,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
}
try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get();
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get();
fail("Scheduler accepted too many tasks at the same time");
}
catch (ExecutionException e) {
@@ -540,7 +541,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
}
try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false).get();
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false, Collections.emptyList()).get();
fail("Scheduler accepted too many tasks at the same time");
}
catch (ExecutionException e) {
@@ -551,7 +552,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
}
try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1)), false).get();
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1)), false, Collections.emptyList()).get();
fail("Scheduler accepted too many tasks at the same time");
}
catch (ExecutionException e) {
@@ -564,8 +565,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
// release some isolated task and check that the sharing group may grow
sA1.releaseSlot();
- SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
- SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get();
+ SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false, Collections.emptyList()).get();
assertNotNull(s1_2);
assertNotNull(s2_2);
@@ -577,19 +578,19 @@ public class SchedulerSlotSharingTest extends TestLogger {
assertEquals(1, scheduler.getNumberOfAvailableSlots());
// schedule one more no-shared task
- SimpleSlot sB0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false).get();
+ SimpleSlot sB0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false, Collections.emptyList()).get();
assertNotNull(sB0);
// release the last of the original shared slots and allocate one more non-shared slot
s2_1.releaseSlot();
- SimpleSlot sB2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 2, 3)), false).get();
+ SimpleSlot sB2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 2, 3)), false, Collections.emptyList()).get();
assertNotNull(sB2);
// release on non-shared and add some shared slots
sA2.releaseSlot();
- SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get();
- SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get();
+ SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get();
assertNotNull(s1_3);
assertNotNull(s2_3);
@@ -599,8 +600,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
s1_3.releaseSlot();
s2_3.releaseSlot();
- SimpleSlot sC0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 1, 2)), false).get();
- SimpleSlot sC1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 2)), false).get();
+ SimpleSlot sC0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 1, 2)), false, Collections.emptyList()).get();
+ SimpleSlot sC1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 2)), false, Collections.emptyList()).get();
assertNotNull(sC0);
assertNotNull(sC1);
@@ -648,8 +649,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
// schedule one to each instance
- SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get();
- SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false).get();
+ SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+ SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
assertNotNull(s1);
assertNotNull(s2);
@@ -658,8 +659,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
assertEquals(1, i2.getNumberOfAvailableSlots());
// schedule one from the other group to each instance
- SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false).get();
- SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false).get();
+ SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+ SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
assertNotNull(s3);
assertNotNull(s4);
@@ -701,8 +702,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
// schedule one to each instance
- SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get();
- SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false).get();
+ SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+ SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
assertNotNull(s1);
assertNotNull(s2);
@@ -711,8 +712,8 @@ public class SchedulerSlotSharingTest extends TestLogger {
assertEquals(2, i2.getNumberOfAvailableSlots());
// schedule one from the other group to each instance
- SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup), false).get();
- SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false).get();
+ SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
+ SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
assertNotNull(s3);
assertNotNull(s4);
@@ -752,14 +753,14 @@ public class SchedulerSlotSharingTest extends TestLogger {
scheduler.newInstanceAvailable(i2);
// schedule until the one instance is full
- SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get();
- SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false).get();
- SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, loc1), sharingGroup), false).get();
- SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, loc1), sharingGroup), false).get();
+ SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+ SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+ SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+ SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
// schedule two more with preference of same instance --> need to go to other instance
- SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, loc1), sharingGroup), false).get();
- SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, loc1), sharingGroup), false).get();
+ SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
+ SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
assertNotNull(s1);
assertNotNull(s2);
@@ -805,19 +806,19 @@ public class SchedulerSlotSharingTest extends TestLogger {
scheduler.newInstanceAvailable(getRandomInstance(4));
// allocate something from group 1 and 2 interleaved with schedule for group 3
- SimpleSlot slot_1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get();
- SimpleSlot slot_1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get();
+ SimpleSlot slot_1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot slot_1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get();
- SimpleSlot slot_2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get();
- SimpleSlot slot_2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get();
+ SimpleSlot slot_2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot slot_2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false, Collections.emptyList()).get();
- SimpleSlot slot_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false).get();
+ SimpleSlot slot_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false, Collections.emptyList()).get();
- SimpleSlot slot_1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
- SimpleSlot slot_1_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get();
+ SimpleSlot slot_1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot slot_1_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get();
- SimpleSlot slot_2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get();
- SimpleSlot slot_2_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get();
+ SimpleSlot slot_2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot slot_2_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false, Collections.emptyList()).get();
// release groups 1 and 2
@@ -833,10 +834,10 @@ public class SchedulerSlotSharingTest extends TestLogger {
// allocate group 4
- SimpleSlot slot_4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false).get();
- SimpleSlot slot_4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false).get();
- SimpleSlot slot_4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false).get();
- SimpleSlot slot_4_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false).get();
+ SimpleSlot slot_4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot slot_4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot slot_4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot slot_4_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false, Collections.emptyList()).get();
// release groups 3 and 4
@@ -887,7 +888,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
@Override
public void run() {
try {
- SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4), sharingGroup), false).get();
+ SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4), sharingGroup), false, Collections.emptyList()).get();
sleepUninterruptibly(rnd.nextInt(5));
slot.releaseSlot();
@@ -910,7 +911,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
public void run() {
try {
if (flag3.compareAndSet(false, true)) {
- SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false).get();
+ SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false, Collections.emptyList()).get();
sleepUninterruptibly(5);
@@ -939,7 +940,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
@Override
public void run() {
try {
- SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4), sharingGroup), false).get();
+ SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4), sharingGroup), false, Collections.emptyList()).get();
// wait a bit till scheduling the successor
sleepUninterruptibly(rnd.nextInt(5));
@@ -966,7 +967,7 @@ public class SchedulerSlotSharingTest extends TestLogger {
@Override
public void run() {
try {
- SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4), sharingGroup), false).get();
+ SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4), sharingGroup), false, Collections.emptyList()).get();
// wait a bit till scheduling the successor
sleepUninterruptibly(rnd.nextInt(5));
@@ -1041,27 +1042,27 @@ public class SchedulerSlotSharingTest extends TestLogger {
scheduler.newInstanceAvailable(getRandomInstance(4));
// schedule one task for the first and second vertex
- SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup), false).get();
- SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup), false).get();
+ SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup), false, Collections.emptyList()).get();
assertTrue( s1.getParent() == s2.getParent() );
assertEquals(3, scheduler.getNumberOfAvailableSlots());
- SimpleSlot s3_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup), false).get();
- SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup), false).get();
- SimpleSlot s4_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false).get();
- SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false).get();
+ SimpleSlot s3_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s4_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false, Collections.emptyList()).get();
s1.releaseSlot();
s2.releaseSlot();
- SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup), false).get();
- SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup), false).get();
- SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false).get();
- SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false).get();
+ SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false, Collections.emptyList()).get();
+ SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false, Collections.emptyList()).get();
try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup), false).get();
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup), false, Collections.emptyList()).get();
fail("should throw an exception");
}
catch (ExecutionException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index c7d0f09..98dca03 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.jobmanager.scheduler;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -106,20 +107,26 @@ public class SchedulerTestUtils {
public static Execution getTestVertex(Iterable<TaskManagerLocation> preferredLocations) {
- ExecutionVertex vertex = mock(ExecutionVertex.class);
-
Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = new ArrayList<>(4);
for (TaskManagerLocation preferredLocation : preferredLocations) {
preferredLocationFutures.add(CompletableFuture.completedFuture(preferredLocation));
}
+
+ return getTestVertex(preferredLocationFutures);
+ }
+
+ public static Execution getTestVertex(Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures) {
+ ExecutionVertex vertex = mock(ExecutionVertex.class);
+
when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocationFutures);
when(vertex.getJobId()).thenReturn(new JobID());
when(vertex.toString()).thenReturn("TEST-VERTEX");
-
+
Execution execution = mock(Execution.class);
when(execution.getVertex()).thenReturn(vertex);
-
+ when(execution.calculatePreferredLocations(any(LocationPreferenceConstraint.class))).thenCallRealMethod();
+
return execution;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java
new file mode 100644
index 0000000..60dddbb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+import java.net.InetAddress;
+
+/**
+ * Dummy local task manager location for testing purposes.
+ */
+public class LocalTaskManagerLocation extends TaskManagerLocation {
+
+ private static final long serialVersionUID = 2396142513336559461L;
+
+ public LocalTaskManagerLocation() {
+ super(ResourceID.generate(), InetAddress.getLoopbackAddress(), -1);
+ }
+}