You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:13:11 UTC
[47/63] [abbrv] git commit: Add options strict co-location
constraints to scheduler
Add options strict co-location constraints to scheduler
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/91871757
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/91871757
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/91871757
Branch: refs/heads/master
Commit: 91871757bebf7404324334d79755e3f117752966
Parents: b3c30ca
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 15 00:39:00 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:02:49 2014 +0200
----------------------------------------------------------------------
.../scheduler/CoLocationConstraint.java | 64 ++++++
.../jobmanager/scheduler/ScheduledUnit.java | 36 +--
.../runtime/jobmanager/scheduler/Scheduler.java | 58 ++++-
.../jobmanager/scheduler/SharedSlot.java | 27 ++-
.../runtime/jobmanager/scheduler/SubSlot.java | 11 +-
.../ScheduleWithCoLocationHintTest.java | 225 +++++++++++++++++++
6 files changed, 400 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/91871757/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
new file mode 100644
index 0000000..26332c8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+public class CoLocationConstraint {
+
+ private static final AtomicReferenceFieldUpdater<CoLocationConstraint, SharedSlot> UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(CoLocationConstraint.class, SharedSlot.class, "slot");
+
+ private volatile SharedSlot slot;
+
+
+ public boolean isUnassigned() {
+ return slot == null;
+ }
+
+ public SharedSlot getSlot() {
+ return slot;
+ }
+
+ public SharedSlot swapInNewSlot(AllocatedSlot newSlot) {
+ SharedSlot newShared = new SharedSlot(newSlot);
+
+ // atomic swap/release-other to prevent resource leaks
+ while (true) {
+ SharedSlot current = this.slot;
+ if (UPDATER.compareAndSet(this, current, newShared)) {
+ if (current != null) {
+ current.rease();
+ }
+ return newShared;
+ }
+ }
+ }
+
+ public SubSlot allocateSubSlot(JobVertexID jid) {
+ if (this.slot == null) {
+ throw new IllegalStateException("Location constraint has not yet been assigned a slot.");
+ }
+
+ return slot.allocateSubSlot(jid);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/91871757/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
index 10190f5..28fd916 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
@@ -21,35 +21,40 @@ package org.apache.flink.runtime.jobmanager.scheduler;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import com.google.common.base.Preconditions;
+
public class ScheduledUnit {
private final Execution vertexExecution;
private final SlotSharingGroup sharingGroup;
+ private final CoLocationConstraint locationConstraint;
+
// --------------------------------------------------------------------------------------------
- public ScheduledUnit(Execution taskVertex) {
- if (taskVertex == null) {
- throw new NullPointerException();
- }
+ public ScheduledUnit(Execution task) {
+ Preconditions.checkNotNull(task);
- this.vertexExecution = taskVertex;
+ this.vertexExecution = task;
this.sharingGroup = null;
+ this.locationConstraint = null;
}
- public ScheduledUnit(Execution taskVertex, SlotSharingGroup sharingUnit) {
- if (taskVertex == null) {
- throw new NullPointerException();
- }
+ public ScheduledUnit(Execution task, SlotSharingGroup sharingUnit) {
+ Preconditions.checkNotNull(task);
- this.vertexExecution = taskVertex;
+ this.vertexExecution = task;
this.sharingGroup = sharingUnit;
+ this.locationConstraint = null;
}
- ScheduledUnit() {
- this.vertexExecution = null;
+ public ScheduledUnit(Execution task, CoLocationConstraint locationConstraint) {
+ Preconditions.checkNotNull(task);
+
+ this.vertexExecution = task;
this.sharingGroup = null;
+ this.locationConstraint = locationConstraint;
}
// --------------------------------------------------------------------------------------------
@@ -65,11 +70,16 @@ public class ScheduledUnit {
public SlotSharingGroup getSlotSharingGroup() {
return sharingGroup;
}
+
+ public CoLocationConstraint getLocationConstraint() {
+ return locationConstraint;
+ }
// --------------------------------------------------------------------------------------------
@Override
public String toString() {
- return "{vertex=" + vertexExecution.getVertexWithAttempt() + ", sharingUnit=" + sharingGroup + '}';
+ return "{task=" + vertexExecution.getVertexWithAttempt() + ", sharingUnit=" + sharingGroup +
+ ", locationConstraint=" + locationConstraint + '}';
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/91871757/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index cec8fb7..eb2c0a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
@@ -149,13 +148,66 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
synchronized (globalLock) {
+
// 1) === If the task has a strict co-schedule hint, obey it ===
-
+
+ CoLocationConstraint locationConstraint = task.getLocationConstraint();
+ if (locationConstraint != null) {
+ // location constraints can never be scheduled in a queued fashion
+ if (queueIfNoResource) {
+ throw new IllegalArgumentException("A task with a location constraint was scheduled in a queued fashion.");
+ }
+
+ // since we are inside the global lock scope, we can check, allocate, and assign
+ // in one atomic action. however, slots may die and be deallocated
+
+ // (a) is the constraint has not yet has a slot, get one
+ if (locationConstraint.isUnassigned()) {
+ // try and get a slot
+ AllocatedSlot newSlot = getFreeSlotForTask(vertex);
+ if (newSlot == null) {
+ throw new NoResourceAvailableException();
+ }
+ SharedSlot sl = locationConstraint.swapInNewSlot(newSlot);
+ SubSlot slot = sl.allocateSubSlot(vertex.getJobvertexId());
+
+ updateLocalityCounters(newSlot.getLocality());
+ return slot;
+ }
+ else {
+ // try to get a subslot. returns null, if the location's slot has been released
+ // in the meantime
+ SubSlot slot = locationConstraint.allocateSubSlot(vertex.getJobvertexId());
+ if (slot == null) {
+ // get a new slot. at the same instance!!!
+ Instance location = locationConstraint.getSlot().getAllocatedSlot().getInstance();
+ AllocatedSlot newSlot;
+ try {
+ newSlot = location.allocateSlot(vertex.getJobId());
+ } catch (InstanceDiedException e) {
+ throw new NoResourceAvailableException("The instance of the required location died.");
+ }
+ if (newSlot == null) {
+ throw new NoResourceAvailableException();
+ }
+ SharedSlot sharedSlot = locationConstraint.swapInNewSlot(newSlot);
+ slot = sharedSlot.allocateSubSlot(vertex.getJobvertexId());
+ }
+
+ updateLocalityCounters(Locality.LOCAL);
+ return slot;
+ }
+ }
// 2) === If the task has a slot sharing group, schedule with shared slots ===
SlotSharingGroup sharingUnit = task.getSlotSharingGroup();
if (sharingUnit != null) {
+
+ if (queueIfNoResource) {
+ throw new IllegalArgumentException("A task with a vertex sharing group was scheduled in a queued fashion.");
+ }
+
final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
AllocatedSlot newSlot = null;
@@ -263,7 +315,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
}
}
- if (instanceToUse == null) {
+ if (instanceToUse == null) {
instanceToUse = this.instancesWithAvailableResources.poll();
locality = Locality.NON_LOCAL;
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/91871757/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
index 36d8a8b..0f3687a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
@@ -48,6 +48,16 @@ public class SharedSlot {
this.subSlots = new HashSet<SubSlot>();
}
+ public SharedSlot(AllocatedSlot allocatedSlot) {
+ if (allocatedSlot == null) {
+ throw new NullPointerException();
+ }
+
+ this.allocatedSlot = allocatedSlot;
+ this.assignmentGroup = null;;
+ this.subSlots = new HashSet<SubSlot>();
+ }
+
// --------------------------------------------------------------------------------------------
public AllocatedSlot getAllocatedSlot() {
@@ -76,6 +86,17 @@ public class SharedSlot {
}
}
+ public void rease() {
+ synchronized (this.subSlots) {
+ disposed = true;
+ for (SubSlot ss : subSlots) {
+ ss.releaseSlot();
+ }
+ }
+
+ allocatedSlot.releaseSlot();
+ }
+
void returnAllocatedSlot(SubSlot slot) {
boolean release;
@@ -84,7 +105,11 @@ public class SharedSlot {
throw new IllegalArgumentException("Wrong shared slot for subslot.");
}
- release = assignmentGroup.sharedSlotAvailableForJid(this, slot.getJobVertexId(), this.subSlots.isEmpty());
+ if (assignmentGroup != null) {
+ release = assignmentGroup.sharedSlotAvailableForJid(this, slot.getJobVertexId(), this.subSlots.isEmpty());
+ } else {
+ release = subSlots.isEmpty();
+ }
if (release) {
disposed = true;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/91871757/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
index 003239d..ca2fb5e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
@@ -45,10 +45,13 @@ public class SubSlot extends AllocatedSlot {
public void releaseSlot() {
// cancel everything, if there is something. since this is atomically status based,
// it will not happen twice if another attempt happened before or concurrently
- cancel();
-
- if (markReleased()) {
- this.sharedSlot.returnAllocatedSlot(this);
+ try {
+ cancel();
+ }
+ finally {
+ if (markReleased()) {
+ this.sharedSlot.returnAllocatedSlot(this);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/91871757/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
new file mode 100644
index 0000000..0ee9346
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
+import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getTestVertex;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Test;
+
+public class ScheduleWithCoLocationHintTest {
+
+ @Test
+ public void schedule() {
+ try {
+ JobVertexID jid1 = new JobVertexID();
+ JobVertexID jid2 = new JobVertexID();
+
+ Scheduler scheduler = new Scheduler();
+
+ Instance i1 = getRandomInstance(2);
+ Instance i2 = getRandomInstance(2);
+ Instance i3 = getRandomInstance(2);
+
+ scheduler.newInstanceAvailable(i1);
+ scheduler.newInstanceAvailable(i2);
+ scheduler.newInstanceAvailable(i3);
+
+ assertEquals(6, scheduler.getNumberOfAvailableSlots());
+
+ CoLocationConstraint c1 = new CoLocationConstraint();
+ CoLocationConstraint c2 = new CoLocationConstraint();
+ CoLocationConstraint c3 = new CoLocationConstraint();
+ CoLocationConstraint c4 = new CoLocationConstraint();
+ CoLocationConstraint c5 = new CoLocationConstraint();
+ CoLocationConstraint c6 = new CoLocationConstraint();
+
+ // schedule 4 tasks from the first vertex group
+ AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 6), c1));
+ AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 6), c2));
+ AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 6), c3));
+ AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 6), c4));
+ AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 6), c1));
+ AllocatedSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 6), c2));
+ AllocatedSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 6), c3));
+ AllocatedSlot s8 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 6), c5));
+ AllocatedSlot s9 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 5, 6), c6));
+ AllocatedSlot s10 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 6), c4));
+ AllocatedSlot s11 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 6), c5));
+ AllocatedSlot s12 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 5, 6), c6));
+
+ assertNotNull(s1);
+ assertNotNull(s2);
+ assertNotNull(s3);
+ assertNotNull(s4);
+ assertNotNull(s5);
+ assertNotNull(s6);
+ assertNotNull(s7);
+ assertNotNull(s8);
+ assertNotNull(s9);
+ assertNotNull(s10);
+ assertNotNull(s11);
+ assertNotNull(s12);
+
+ assertEquals(s1.getInstance(), s5.getInstance());
+ assertEquals(s2.getInstance(), s6.getInstance());
+ assertEquals(s3.getInstance(), s7.getInstance());
+ assertEquals(s4.getInstance(), s10.getInstance());
+ assertEquals(s8.getInstance(), s11.getInstance());
+ assertEquals(s9.getInstance(), s12.getInstance());
+
+ assertEquals(c1.getSlot().getAllocatedSlot().getInstance(), s1.getInstance());
+ assertEquals(c2.getSlot().getAllocatedSlot().getInstance(), s2.getInstance());
+ assertEquals(c3.getSlot().getAllocatedSlot().getInstance(), s3.getInstance());
+ assertEquals(c4.getSlot().getAllocatedSlot().getInstance(), s4.getInstance());
+ assertEquals(c5.getSlot().getAllocatedSlot().getInstance(), s8.getInstance());
+ assertEquals(c6.getSlot().getAllocatedSlot().getInstance(), s9.getInstance());
+
+ // check the scheduler's bookkeeping
+ assertEquals(0, scheduler.getNumberOfAvailableSlots());
+
+ // the first assignments are unconstrained, co.-schedulings are constrained
+ assertEquals(6, scheduler.getNumberOfLocalizedAssignments());
+ assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
+ assertEquals(6, scheduler.getNumberOfUnconstrainedAssignments());
+
+ // release some slots, be sure that new available ones come up
+ s4.releaseSlot();
+ s10.releaseSlot();
+ assertEquals(1, scheduler.getNumberOfAvailableSlots());
+
+ AllocatedSlot single = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1)));
+ assertNotNull(single);
+
+ s1.releaseSlot();
+ s2.releaseSlot();
+ s3.releaseSlot();
+ s5.releaseSlot();
+ s6.releaseSlot();
+ s7.releaseSlot();
+ s8.releaseSlot();
+ s9.releaseSlot();
+ s11.releaseSlot();
+ s12.releaseSlot();
+
+ assertEquals(5, scheduler.getNumberOfAvailableSlots());
+
+ assertEquals(6, scheduler.getNumberOfLocalizedAssignments());
+ assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
+ assertEquals(7, scheduler.getNumberOfUnconstrainedAssignments());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void scheduleWithIntermediateRelease() {
+ try {
+ JobVertexID jid1 = new JobVertexID();
+ JobVertexID jid2 = new JobVertexID();
+ JobVertexID jid3 = new JobVertexID();
+ JobVertexID jid4 = new JobVertexID();
+
+ Scheduler scheduler = new Scheduler();
+
+ Instance i1 = getRandomInstance(1);
+ Instance i2 = getRandomInstance(1);
+
+ scheduler.newInstanceAvailable(i1);
+ scheduler.newInstanceAvailable(i2);
+
+ assertEquals(2, scheduler.getNumberOfAvailableSlots());
+
+ CoLocationConstraint c1 = new CoLocationConstraint();
+
+ AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), c1));
+ AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1), c1));
+
+ AllocatedSlot sSolo = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 1)));
+
+ Instance loc = s1.getInstance();
+
+ s1.releaseSlot();
+ s2.releaseSlot();
+ sSolo.releaseSlot();
+
+ AllocatedSlot sNew = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), c1));
+ assertEquals(loc, sNew.getInstance());
+
+ assertEquals(2, scheduler.getNumberOfLocalizedAssignments());
+ assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
+ assertEquals(2, scheduler.getNumberOfUnconstrainedAssignments());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void scheduleWithReleaseNoResource() {
+ try {
+ JobVertexID jid1 = new JobVertexID();
+ JobVertexID jid2 = new JobVertexID();
+ JobVertexID jid3 = new JobVertexID();
+
+ Scheduler scheduler = new Scheduler();
+
+ Instance i1 = getRandomInstance(1);
+ Instance i2 = getRandomInstance(1);
+
+ scheduler.newInstanceAvailable(i1);
+ scheduler.newInstanceAvailable(i2);
+
+ assertEquals(2, scheduler.getNumberOfAvailableSlots());
+
+ CoLocationConstraint c1 = new CoLocationConstraint();
+
+ AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), c1));
+ s1.releaseSlot();
+
+ scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1)));
+ scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 2)));
+
+
+ try {
+ scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), c1));
+ fail("Scheduled even though no resource was available.");
+ } catch (NoResourceAvailableException e) {
+ // expected
+ }
+
+ assertEquals(0, scheduler.getNumberOfLocalizedAssignments());
+ assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
+ assertEquals(3, scheduler.getNumberOfUnconstrainedAssignments());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}