You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:13:19 UTC
[55/63] [abbrv] git commit: Make Co-Location constraints resilient
aginst out of order scheduling and depply integrate them with slot sharing
Fix miscellaneous checkstyle errors/warnings
Make Co-Location constraints resilient aginst out of order scheduling and depply integrate them with slot sharing
Fix miscellaneous checkstyle errors/warnings
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/cdee8750
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/cdee8750
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/cdee8750
Branch: refs/heads/master
Commit: cdee87501762c092c216adf35fceeea339e0c4c4
Parents: 8e7216a
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 17 05:22:24 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:21:01 2014 +0200
----------------------------------------------------------------------
.../flink/client/CliFrontendListCancelTest.java | 7 +-
.../plantranslate/NepheleJobGraphGenerator.java | 7 +-
.../flink/runtime/jobmanager/JobManager.java | 7 +-
.../scheduler/CoLocationConstraint.java | 46 ++-
.../jobmanager/scheduler/CoLocationGroup.java | 10 +-
.../runtime/jobmanager/scheduler/Scheduler.java | 102 ++----
.../jobmanager/scheduler/SharedSlot.java | 93 +++---
.../scheduler/SlotSharingGroupAssignment.java | 258 ++++++++++-----
.../runtime/jobmanager/scheduler/SubSlot.java | 12 +-
.../protocols/ExtendedManagementProtocol.java | 4 +-
.../runtime/jobgraph/JobManagerTestUtils.java | 4 +-
.../runtime/jobmanager/JobManagerITCase.java | 14 +-
.../ScheduleWithCoLocationHintTest.java | 322 ++++++++++++++++++-
.../scheduler/SchedulerTestUtils.java | 1 +
.../jobmanager/scheduler/SharedSlotsTest.java | 32 +-
.../runtime/testutils/ServerTestUtils.java | 2 +-
16 files changed, 655 insertions(+), 266 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index 2f31181..40d10b6 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -183,7 +183,12 @@ public class CliFrontendListCancelTest {
}
@Override
- public int getAvailableSlots() {
+ public int getTotalNumberOfRegisteredSlots() {
+ return 1;
+ }
+
+ @Override
+ public int getNumberOfSlotsAvailableToScheduler() throws IOException {
return 1;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index a3fef17..a224324 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -493,11 +493,10 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
if (this.currentIteration != null) {
AbstractJobVertex head = this.iterations.get(this.currentIteration).getHeadTask();
- if (head == null) {
- throw new CompilerException("Found no iteration head task in the postVisit of translating a task inside an iteration");
+ // the head may still be null if we descend into the static parts first
+ if (head != null) {
+ targetVertex.setStrictlyCoLocatedWith(head);
}
-
- targetVertex.setStrictlyCoLocatedWith(head);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index f79fecb..1c02127 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -574,10 +574,15 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
}
@Override
- public int getAvailableSlots() {
+ public int getTotalNumberOfRegisteredSlots() {
return getInstanceManager().getTotalNumberOfSlots();
}
+ @Override
+ public int getNumberOfSlotsAvailableToScheduler() {
+ return scheduler.getNumberOfAvailableSlots();
+ }
+
/**
* Starts the Jetty Infoserver for the Jobmanager
*
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
index 64f7ffc..89b644b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -18,30 +18,52 @@
package org.apache.flink.runtime.jobmanager.scheduler;
+import org.apache.flink.runtime.AbstractID;
import org.apache.flink.runtime.instance.Instance;
+import com.google.common.base.Preconditions;
+
public class CoLocationConstraint {
- private volatile Instance location;
+ private final CoLocationGroup group;
+ private volatile SharedSlot sharedSlot;
- public void setLocation(Instance location) {
- if (location == null) {
- throw new IllegalArgumentException();
- }
-
- if (this.location == null) {
- this.location = location;
+
+ CoLocationConstraint(CoLocationGroup group) {
+ Preconditions.checkNotNull(group);
+ this.group = group;
+ }
+
+
+ public SharedSlot getSharedSlot() {
+ return sharedSlot;
+ }
+
+ public Instance getLocation() {
+ if (sharedSlot != null) {
+ return sharedSlot.getAllocatedSlot().getInstance();
} else {
- throw new IllegalStateException("The constraint has already been assigned a location.");
+ throw new IllegalStateException("Not assigned");
}
}
- public Instance getLocation() {
- return location;
+ public void setSharedSlot(SharedSlot sharedSlot) {
+ if (this.sharedSlot == sharedSlot) {
+ return;
+ }
+ else if (this.sharedSlot == null || this.sharedSlot.isDisposed()) {
+ this.sharedSlot = sharedSlot;
+ } else {
+ throw new IllegalStateException("Overriding shared slot that is still alive.");
+ }
}
public boolean isUnassigned() {
- return this.location == null;
+ return this.sharedSlot == null;
+ }
+
+ public AbstractID getGroupId() {
+ return this.group.getId();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
index 2398334..84692a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager.scheduler;
import java.util.ArrayList;
import java.util.List;
+import org.apache.flink.runtime.AbstractID;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import com.google.common.base.Preconditions;
@@ -29,6 +30,9 @@ public class CoLocationGroup implements java.io.Serializable {
private static final long serialVersionUID = -2605819490401895297L;
+ // we use a job vertex ID, because the co location group acts as a unit inside which exclusive sharing of
+ // slots is used
+ private final AbstractID id = new AbstractID();
private final List<AbstractJobVertex> vertices = new ArrayList<AbstractJobVertex>();
@@ -80,8 +84,12 @@ public class CoLocationGroup implements java.io.Serializable {
if (num > constraints.size()) {
constraints.ensureCapacity(num);
for (int i = constraints.size(); i < num; i++) {
- constraints.add(new CoLocationConstraint());
+ constraints.add(new CoLocationConstraint(this));
}
}
}
+
+ public AbstractID getId() {
+ return id;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index b9f83fc..20e4d17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -41,7 +41,7 @@ import org.apache.flink.util.ExceptionUtils;
*/
public class Scheduler implements InstanceListener, SlotAvailablilityListener {
- private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
+ static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
private final Object globalLock = new Object();
@@ -150,56 +150,6 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
synchronized (globalLock) {
-
-// // 1) === If the task has a strict co-schedule hint, obey it ===
-//
-// CoLocationConstraint locationConstraint = task.getLocationConstraint();
-// if (locationConstraint != null) {
-// // location constraints can never be scheduled in a queued fashion
-// if (queueIfNoResource) {
-// throw new IllegalArgumentException("A task with a location constraint was scheduled in a queued fashion.");
-// }
-//
-// // since we are inside the global lock scope, we can check, allocate, and assign
-// // in one atomic action. however, slots may die and be deallocated
-//
-// // (a) is the constraint has not yet has a slot, get one
-// if (locationConstraint.isUnassigned()) {
-// // try and get a slot
-// AllocatedSlot newSlot = getFreeSlotForTask(vertex);
-// if (newSlot == null) {
-// throw new NoResourceAvailableException();
-// }
-// SharedSlot sl = locationConstraint.swapInNewSlot(newSlot);
-// SubSlot slot = sl.allocateSubSlot(vertex.getJobvertexId());
-//
-// updateLocalityCounters(newSlot.getLocality());
-// return slot;
-// }
-// else {
-// // try to get a subslot. returns null, if the location's slot has been released
-// // in the meantime
-// SubSlot slot = locationConstraint.allocateSubSlot(vertex.getJobvertexId());
-// if (slot == null) {
-// // get a new slot. at the same instance!!!
-// Instance location = locationConstraint.getSlot().getAllocatedSlot().getInstance();
-// AllocatedSlot newSlot;
-// try {
-// newSlot = location.allocateSlot(vertex.getJobId());
-// } catch (InstanceDiedException e) {
-// throw new NoResourceAvailableException("The instance of the required location died.");
-// }
-// if (newSlot == null) {
-// throw new NoResourceAvailableException();
-// }
-// SharedSlot sharedSlot = locationConstraint.swapInNewSlot(newSlot);
-// slot = sharedSlot.allocateSubSlot(vertex.getJobvertexId());
-// }
-//
-// updateLocalityCounters(Locality.LOCAL);
-// return slot;
-// }
-// }
// 1) === If the task has a slot sharing group, schedule with shared slots ===
@@ -213,18 +163,17 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
final CoLocationConstraint constraint = task.getLocationConstraint();
- AllocatedSlot newSlot = null;
-
- // get a slot from the group. obey location constraints, if existing and assigned
- AllocatedSlot slotFromGroup;
- if (constraint == null || constraint.isUnassigned()) {
- slotFromGroup = assignment.getSlotForTask(vertex.getJobvertexId(), vertex);
+ // get a slot from the group, if the group has one for us (and can fulfill the constraint)
+ SubSlot slotFromGroup;
+ if (constraint == null) {
+ slotFromGroup = assignment.getSlotForTask(vertex);
}
else {
- // this returns null, if the constraint cannot be fulfilled
- slotFromGroup = assignment.getSlotForTask(vertex.getJobvertexId(), constraint);
+ slotFromGroup = assignment.getSlotForTask(vertex, constraint);
}
+ AllocatedSlot newSlot = null;
+
// the following needs to make sure any allocated slot is released in case of an error
try {
@@ -232,12 +181,6 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
if (slotFromGroup != null) {
// local (or unconstrained in the current group)
if (slotFromGroup.getLocality() != Locality.NON_LOCAL) {
-
- // attach to the locality constraint
- if (constraint != null && constraint.isUnassigned()) {
- constraint.setLocation(slotFromGroup.getInstance());
- }
-
updateLocalityCounters(slotFromGroup.getLocality());
return slotFromGroup;
}
@@ -249,13 +192,19 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
// get a new slot, since we could not place it into the group, or we could not place it locally
newSlot = getFreeSlotForTask(vertex, locations);
- AllocatedSlot toUse;
+ SubSlot toUse;
if (newSlot == null) {
if (slotFromGroup == null) {
// both null
- throw new NoResourceAvailableException();
+ if (constraint == null || constraint.isUnassigned()) {
+ throw new NoResourceAvailableException();
+ } else {
+ throw new NoResourceAvailableException("Could not allocate a slot on instance " +
+ constraint.getLocation() + ", as required by the co-location constraint.");
+ }
} else {
+ // got a non-local from the group, and no new one
toUse = slotFromGroup;
}
}
@@ -265,17 +214,28 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
slotFromGroup.releaseSlot();
}
- toUse = sharingUnit.getTaskAssignment().addSlotWithTask(newSlot, task.getJobVertexId());
+ if (constraint == null) {
+ toUse = assignment.addNewSlotWithTask(newSlot, vertex);
+ } else {
+ toUse = assignment.addNewSlotWithTask(newSlot, vertex, constraint);
+ }
}
else {
- // both are available and potentially usable
+ // both are available and usable. neither is local
newSlot.releaseSlot();
toUse = slotFromGroup;
}
// assign to the co-location hint, if we have one and it is unassigned
- if (constraint != null && constraint.isUnassigned()) {
- constraint.setLocation(toUse.getInstance());
+ // if it was assigned before and the new one is not local, it is a fail
+ if (constraint != null) {
+ if (constraint.isUnassigned() || toUse.getLocality() == Locality.LOCAL) {
+ constraint.setSharedSlot(toUse.getSharedSlot());
+ } else {
+ // the fail
+ throw new NoResourceAvailableException("Could not allocate a slot on instance " +
+ constraint.getLocation() + ", as required by the co-location constraint.");
+ }
}
updateLocalityCounters(toUse.getLocality());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
index 0f3687a..5d87e07 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
@@ -24,7 +24,13 @@ import java.util.Set;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-public class SharedSlot {
+/**
+ *
+ * NOTE: This class does no synchronization by itself and its mutating
+ * methods may only be called from within the synchronization scope of
+ * it associated SlotSharingGroupAssignment.
+ */
+class SharedSlot {
private final AllocatedSlot allocatedSlot;
@@ -48,77 +54,58 @@ public class SharedSlot {
this.subSlots = new HashSet<SubSlot>();
}
- public SharedSlot(AllocatedSlot allocatedSlot) {
- if (allocatedSlot == null) {
- throw new NullPointerException();
- }
-
- this.allocatedSlot = allocatedSlot;
- this.assignmentGroup = null;;
- this.subSlots = new HashSet<SubSlot>();
- }
-
// --------------------------------------------------------------------------------------------
- public AllocatedSlot getAllocatedSlot() {
+ AllocatedSlot getAllocatedSlot() {
return this.allocatedSlot;
}
- public boolean isDisposed() {
+ boolean isDisposed() {
return disposed;
}
- public int getNumberOfAllocatedSubSlots() {
- synchronized (this.subSlots) {
- return this.subSlots.size();
- }
+ int getNumberOfAllocatedSubSlots() {
+ return this.subSlots.size();
}
- public SubSlot allocateSubSlot(JobVertexID jid) {
- synchronized (this.subSlots) {
- if (isDisposed()) {
- return null;
- } else {
- SubSlot ss = new SubSlot(this, subSlotNumber++, jid);
- this.subSlots.add(ss);
- return ss;
- }
+ SubSlot allocateSubSlot(JobVertexID jid) {
+ if (disposed) {
+ return null;
+ } else {
+ SubSlot ss = new SubSlot(this, subSlotNumber++, jid);
+ this.subSlots.add(ss);
+ return ss;
}
}
- public void rease() {
- synchronized (this.subSlots) {
- disposed = true;
- for (SubSlot ss : subSlots) {
- ss.releaseSlot();
- }
+ void returnAllocatedSlot(SubSlot slot) {
+ if (!slot.isReleased()) {
+ throw new IllegalArgumentException("SubSlot is not released.");
}
- allocatedSlot.releaseSlot();
+ this.assignmentGroup.releaseSubSlot(slot, this);
}
- void returnAllocatedSlot(SubSlot slot) {
- boolean release;
-
- synchronized (this.subSlots) {
- if (!this.subSlots.remove(slot)) {
- throw new IllegalArgumentException("Wrong shared slot for subslot.");
- }
-
- if (assignmentGroup != null) {
- release = assignmentGroup.sharedSlotAvailableForJid(this, slot.getJobVertexId(), this.subSlots.isEmpty());
- } else {
- release = subSlots.isEmpty();
- }
-
- if (release) {
- disposed = true;
- }
+ int releaseSlot(SubSlot slot) {
+ if (!this.subSlots.remove(slot)) {
+ throw new IllegalArgumentException("Wrong shared slot for subslot.");
}
-
- // do this call outside the lock, because releasing the allocated slot may go into further scheduler calls
- if (release) {
+ return subSlots.size();
+ }
+
+ void dispose() {
+ if (subSlots.isEmpty()) {
+ disposed = true;
this.allocatedSlot.releaseSlot();
+ } else {
+ throw new IllegalStateException("Cannot dispose while subslots are still alive.");
}
}
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "Shared " + allocatedSlot.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
index 474fdbe..68d3888 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
@@ -28,34 +28,53 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.flink.runtime.AbstractID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.slf4j.Logger;
+
public class SlotSharingGroupAssignment {
+ private static final Logger LOG = Scheduler.LOG;
+
+ private final Object lock = new Object();
+
/** All slots currently allocated to this sharing group */
private final Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>();
/** The slots available per vertex type (jid), keyed by instance, to make them locatable */
- private final Map<JobVertexID, Map<Instance, List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<JobVertexID, Map<Instance, List<SharedSlot>>>();
+ private final Map<AbstractID, Map<Instance, List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<AbstractID, Map<Instance, List<SharedSlot>>>();
// --------------------------------------------------------------------------------------------
- public SubSlot addSlotWithTask(AllocatedSlot slot, JobVertexID jid) {
+ public SubSlot addNewSlotWithTask(AllocatedSlot slot, ExecutionVertex vertex) {
+ JobVertexID id = vertex.getJobvertexId();
+ return addNewSlotWithTask(slot, id, id);
+ }
+
+ public SubSlot addNewSlotWithTask(AllocatedSlot slot, ExecutionVertex vertex, CoLocationConstraint constraint) {
+ AbstractID groupId = constraint.getGroupId();
+ return addNewSlotWithTask(slot, groupId, null);
+ }
+
+ private SubSlot addNewSlotWithTask(AllocatedSlot slot, AbstractID groupId, JobVertexID vertexId) {
final SharedSlot sharedSlot = new SharedSlot(slot, this);
final Instance location = slot.getInstance();
- synchronized (allSlots) {
+ synchronized (lock) {
// add to the total bookkeeping
allSlots.add(sharedSlot);
// allocate us a sub slot to return
- SubSlot subslot = sharedSlot.allocateSubSlot(jid);
+ SubSlot subslot = sharedSlot.allocateSubSlot(vertexId);
// preserve the locality information
subslot.setLocality(slot.getLocality());
@@ -63,10 +82,9 @@ public class SlotSharingGroupAssignment {
boolean entryForNewJidExists = false;
// let the other vertex types know about this one as well
-
- for (Map.Entry<JobVertexID, Map<Instance, List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) {
+ for (Map.Entry<AbstractID, Map<Instance, List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) {
- if (entry.getKey().equals(jid)) {
+ if (entry.getKey().equals(groupId)) {
entryForNewJidExists = true;
continue;
}
@@ -75,9 +93,9 @@ public class SlotSharingGroupAssignment {
putIntoMultiMap(available, location, sharedSlot);
}
- // make sure an empty entry exists for this jid, if no other entry exists
+ // make sure an empty entry exists for this group, if no other entry exists
if (!entryForNewJidExists) {
- availableSlotsPerJid.put(jid, new LinkedHashMap<Instance, List<SharedSlot>>());
+ availableSlotsPerJid.put(groupId, new LinkedHashMap<Instance, List<SharedSlot>>());
}
return subslot;
@@ -90,105 +108,97 @@ public class SlotSharingGroupAssignment {
* slots if no local slot is available. The method returns null, when no slot is available for the
* given JobVertexID at all.
*
- * @param jid
* @param vertex
*
* @return A task vertex for a task with the given JobVertexID, or null, if none is available.
*/
- public AllocatedSlot getSlotForTask(JobVertexID jid, ExecutionVertex vertex) {
- synchronized (allSlots) {
- return getSlotForTaskInternal(jid, vertex.getPreferredLocations(), false);
- }
- }
-
-
- public AllocatedSlot getSlotForTask(JobVertexID jid, CoLocationConstraint constraint) {
- if (constraint.isUnassigned()) {
- throw new IllegalArgumentException("CoLocationConstraint is unassigned");
+ public SubSlot getSlotForTask(ExecutionVertex vertex) {
+ synchronized (lock) {
+ Pair<SharedSlot, Locality> p = getSlotForTaskInternal(vertex.getJobvertexId(), vertex, vertex.getPreferredLocations(), false);
+
+ if (p != null) {
+ SharedSlot ss = p.getLeft();
+ SubSlot slot = ss.allocateSubSlot(vertex.getJobvertexId());
+ slot.setLocality(p.getRight());
+ return slot;
+ }
+ else {
+ return null;
+ }
}
- synchronized (allSlots) {
- return getSlotForTaskInternal(jid, Collections.singleton(constraint.getLocation()), true);
- }
}
-
- public boolean sharedSlotAvailableForJid(SharedSlot slot, JobVertexID jid, boolean lastSubSlot) {
- if (slot == null || jid == null) {
- throw new NullPointerException();
- }
+ public SubSlot getSlotForTask(ExecutionVertex vertex, CoLocationConstraint constraint) {
- synchronized (allSlots) {
- if (!allSlots.contains(slot)) {
- throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup before.");
- }
+ synchronized (lock) {
+ SharedSlot shared = constraint.getSharedSlot();
- if (lastSubSlot) {
- // this was the last sub slot. unless there is something pending for this jid
- // remove this from the availability list of all jids and
- // return that this one is good to release
- allSlots.remove(slot);
-
- Instance location = slot.getAllocatedSlot().getInstance();
-
- for (Map.Entry<JobVertexID, Map<Instance, List<SharedSlot>>> mapEntry : availableSlotsPerJid.entrySet()) {
- if (mapEntry.getKey().equals(jid)) {
- continue;
- }
+ if (shared != null && !shared.isDisposed()) {
+ // initialized and set
+ SubSlot subslot = shared.allocateSubSlot(null);
+ subslot.setLocality(Locality.LOCAL);
+ return subslot;
+ }
+ else if (shared == null) {
+ // not initialized, grab a new slot. preferred locations are defined by the vertex
+ // we only associate the slot with the constraint, if it was a local match
+ Pair<SharedSlot, Locality> p = getSlotForTaskInternal(constraint.getGroupId(), vertex, vertex.getPreferredLocations(), false);
+ if (p == null) {
+ return null;
+ } else {
+ shared = p.getLeft();
+ Locality l = p.getRight();
- Map<Instance, List<SharedSlot>> map = mapEntry.getValue();
- List<SharedSlot> list = map.get(location);
- if (list == null || !list.remove(slot)) {
- throw new IllegalStateException("SharedSlot was not available to another vertex type that it was not allocated for before.");
- }
- if (list.isEmpty()) {
- map.remove(location);
+ SubSlot sub = shared.allocateSubSlot(null);
+ sub.setLocality(l);
+
+ if (l != Locality.NON_LOCAL) {
+ constraint.setSharedSlot(shared);
}
+ return sub;
}
-
- return true;
}
-
- Map<Instance, List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(jid);
-
- // sanity check
- if (slotsForJid == null) {
- throw new IllegalStateException("Trying to return a slot for jid " + jid +
- " when available slots indicated that all slots were available.");
+ else {
+ // disposed. get a new slot on the same instance
+ Instance location = shared.getAllocatedSlot().getInstance();
+ Pair<SharedSlot, Locality> p = getSlotForTaskInternal(constraint.getGroupId(), vertex, Collections.singleton(location), true);
+ if (p == null) {
+ return null;
+ } else {
+ shared = p.getLeft();
+ constraint.setSharedSlot(shared);
+ SubSlot subslot = shared.allocateSubSlot(null);
+ subslot.setLocality(Locality.LOCAL);
+ return subslot;
+ }
}
-
- putIntoMultiMap(slotsForJid, slot.getAllocatedSlot().getInstance(), slot);
-
- // do not release, we are still depending on this shared slot
- return false;
}
}
-
/**
* NOTE: This method is not synchronized by itself, needs to be synchronized externally.
*
- * @param jid
* @return An allocated sub slot, or {@code null}, if no slot is available.
*/
- private AllocatedSlot getSlotForTaskInternal(JobVertexID jid, Iterable<Instance> preferredLocations, boolean localOnly) {
+ private Pair<SharedSlot, Locality> getSlotForTaskInternal(AbstractID groupId, ExecutionVertex vertex, Iterable<Instance> preferredLocations, boolean localOnly) {
if (allSlots.isEmpty()) {
return null;
}
- Map<Instance, List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(jid);
+ Map<Instance, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId);
- // get the available slots for the vertex type (jid)
- if (slotsForJid == null) {
- // no task is yet scheduled for that jid, so all slots are available
- slotsForJid = new LinkedHashMap<Instance, List<SharedSlot>>();
- availableSlotsPerJid.put(jid, slotsForJid);
+ // get the available slots for the group
+ if (slotsForGroup == null) {
+ // no task is yet scheduled for that group, so all slots are available
+ slotsForGroup = new LinkedHashMap<Instance, List<SharedSlot>>();
+ availableSlotsPerJid.put(groupId, slotsForGroup);
for (SharedSlot availableSlot : allSlots) {
- putIntoMultiMap(slotsForJid, availableSlot.getAllocatedSlot().getInstance(), availableSlot);
+ putIntoMultiMap(slotsForGroup, availableSlot.getAllocatedSlot().getInstance(), availableSlot);
}
}
- else if (slotsForJid.isEmpty()) {
+ else if (slotsForGroup.isEmpty()) {
return null;
}
@@ -202,32 +212,102 @@ public class SlotSharingGroupAssignment {
// we return early anyways and skip the flag evaluation
didNotGetPreferred = true;
- SharedSlot slot = removeFromMultiMap(slotsForJid, location);
- if (slot != null) {
- SubSlot subslot = slot.allocateSubSlot(jid);
- subslot.setLocality(Locality.LOCAL);
- return subslot;
+ SharedSlot slot = removeFromMultiMap(slotsForGroup, location);
+ if (slot != null && !slot.isDisposed()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Local assignment in shared group : " + vertex + " --> " + slot);
+ }
+
+ return new ImmutablePair<SharedSlot, Locality>(slot, Locality.LOCAL);
}
}
}
// if we want only local assignments, exit now with a "not found" result
if (didNotGetPreferred && localOnly) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No local assignment in shared possible for " + vertex);
+ }
return null;
}
// schedule the task to any available location
- SharedSlot slot = pollFromMultiMap(slotsForJid);
- if (slot != null) {
- SubSlot subslot = slot.allocateSubSlot(jid);
- subslot.setLocality(didNotGetPreferred ? Locality.NON_LOCAL : Locality.UNCONSTRAINED);
- return subslot;
+ SharedSlot slot = pollFromMultiMap(slotsForGroup);
+ if (slot != null && !slot.isDisposed()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug((didNotGetPreferred ? "Non-local" : "Unconstrained") + " assignment in shared group : " + vertex + " --> " + slot);
+ }
+
+ return new ImmutablePair<SharedSlot, Locality>(slot, didNotGetPreferred ? Locality.NON_LOCAL : Locality.UNCONSTRAINED);
}
else {
return null;
}
}
+
+ void releaseSubSlot(SubSlot subslot, SharedSlot sharedSlot) {
+
+ AbstractID groupId = subslot.getGroupId();
+
+ synchronized (lock) {
+
+ if (!allSlots.contains(sharedSlot)) {
+ throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup before.");
+ }
+
+ int slotsRemaining = sharedSlot.releaseSlot(subslot);
+
+ if (slotsRemaining == 0) {
+ // this was the last sub slot. remove this from the availability list
+ // and trigger disposal
+ try {
+ allSlots.remove(sharedSlot);
+
+ Instance location = sharedSlot.getAllocatedSlot().getInstance();
+
+ if (groupId != null) {
+ for (Map.Entry<AbstractID, Map<Instance, List<SharedSlot>>> mapEntry : availableSlotsPerJid.entrySet()) {
+ AbstractID id = mapEntry.getKey();
+
+ // hack: we identify co location hint entries by the fact that they are keyed
+ // by an abstract id, rather than a job vertex id
+ if (id.getClass() == AbstractID.class || id.equals(groupId)) {
+ continue;
+ }
+
+ Map<Instance, List<SharedSlot>> map = mapEntry.getValue();
+ List<SharedSlot> list = map.get(location);
+ if (list == null || !list.remove(sharedSlot)) {
+ throw new IllegalStateException("Bug: SharedSlot was not available to another vertex type that it was not allocated for before.");
+ }
+ if (list.isEmpty()) {
+ map.remove(location);
+ }
+ }
+ }
+ } finally {
+ sharedSlot.dispose();
+ }
+ }
+ else if (groupId != null) {
+ // make the shared slot available to tasks within the group it available to
+ Map<Instance, List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(groupId);
+
+ // sanity check
+ if (slotsForJid == null) {
+ throw new IllegalStateException("Trying to return a slot for group " + groupId +
+ " when available slots indicated that all slots were available.");
+ }
+
+ putIntoMultiMap(slotsForJid, sharedSlot.getAllocatedSlot().getInstance(), sharedSlot);
+ }
+ }
+ }
+
+
+
+
// --------------------------------------------------------------------------------------------
// State
// --------------------------------------------------------------------------------------------
@@ -237,7 +317,7 @@ public class SlotSharingGroupAssignment {
}
public int getNumberOfAvailableSlotsForJid(JobVertexID jid) {
- synchronized (allSlots) {
+ synchronized (lock) {
Map<Instance, List<SharedSlot>> available = availableSlotsPerJid.get(jid);
if (available != null) {
@@ -255,6 +335,10 @@ public class SlotSharingGroupAssignment {
}
}
}
+
+ // --------------------------------------------------------------------------------------------
+
+
// --------------------------------------------------------------------------------------------
// Utilities
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
index ca2fb5e..7ff2990 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
@@ -18,25 +18,25 @@
package org.apache.flink.runtime.jobmanager.scheduler;
+import org.apache.flink.runtime.AbstractID;
import org.apache.flink.runtime.instance.AllocatedSlot;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
public class SubSlot extends AllocatedSlot {
private final SharedSlot sharedSlot;
- private final JobVertexID jid;
+ private final AbstractID groupId;
private final int subSlotNumber;
- public SubSlot(SharedSlot sharedSlot, int subSlotNumber, JobVertexID jid) {
+ public SubSlot(SharedSlot sharedSlot, int subSlotNumber, AbstractID groupId) {
super(sharedSlot.getAllocatedSlot().getJobID(),
sharedSlot.getAllocatedSlot().getInstance(),
sharedSlot.getAllocatedSlot().getSlotNumber());
this.sharedSlot = sharedSlot;
- this.jid = jid;
+ this.groupId = groupId;
this.subSlotNumber = subSlotNumber;
}
@@ -59,8 +59,8 @@ public class SubSlot extends AllocatedSlot {
return this.sharedSlot;
}
- public JobVertexID getJobVertexId() {
- return jid;
+ public AbstractID getGroupId() {
+ return groupId;
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/ExtendedManagementProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/ExtendedManagementProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/ExtendedManagementProtocol.java
index 9e3e22e..909a595 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/ExtendedManagementProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/ExtendedManagementProtocol.java
@@ -59,5 +59,7 @@ public interface ExtendedManagementProtocol extends JobManagementProtocol {
* @return number of available slots
* @throws IOException
*/
- int getAvailableSlots() throws IOException;
+ int getTotalNumberOfRegisteredSlots() throws IOException;
+
+ int getNumberOfSlotsAvailableToScheduler() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
index 14a73e1..1d5471e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
@@ -46,11 +46,11 @@ public class JobManagerTestUtils {
// max time is 5 seconds
long deadline = System.currentTimeMillis() + 5000;
- while (jm.getAvailableSlots() < numSlots && System.currentTimeMillis() < deadline) {
+ while (jm.getNumberOfSlotsAvailableToScheduler() < numSlots && System.currentTimeMillis() < deadline) {
Thread.sleep(10);
}
- assertEquals(numSlots, jm.getAvailableSlots());
+ assertEquals(numSlots, jm.getNumberOfSlotsAvailableToScheduler());
return jm;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
index 70b3ad9..b07cc87 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
@@ -68,7 +68,7 @@ public class JobManagerITCase {
try {
- assertEquals(1, jm.getAvailableSlots());
+ assertEquals(1, jm.getTotalNumberOfRegisteredSlots());
// we need to register the job at the library cache manager (with no libraries)
LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
@@ -141,7 +141,7 @@ public class JobManagerITCase {
try {
- assertEquals(NUM_TASKS, jm.getAvailableSlots());
+ assertEquals(NUM_TASKS, jm.getTotalNumberOfRegisteredSlots());
// we need to register the job at the library cache manager (with no libraries)
LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
@@ -535,7 +535,7 @@ public class JobManagerITCase {
.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
try {
- assertEquals(NUM_TASKS, jm.getAvailableSlots());
+ assertEquals(NUM_TASKS, jm.getTotalNumberOfRegisteredSlots());
// we need to register the job at the library cache manager (with no libraries)
LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
@@ -599,7 +599,7 @@ public class JobManagerITCase {
.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
try {
- assertEquals(NUM_TASKS, jm.getAvailableSlots());
+ assertEquals(NUM_TASKS, jm.getTotalNumberOfRegisteredSlots());
// we need to register the job at the library cache manager (with no libraries)
LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
@@ -727,7 +727,7 @@ public class JobManagerITCase {
.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
try {
- assertEquals(NUM_TASKS, jm.getAvailableSlots());
+ assertEquals(NUM_TASKS, jm.getTotalNumberOfRegisteredSlots());
// we need to register the job at the library cache manager (with no libraries)
LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
@@ -788,13 +788,13 @@ public class JobManagerITCase {
final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
- final JobManager jm = startJobManager(NUM_TASKS);
+ final JobManager jm = startJobManager(2*NUM_TASKS);
final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
try {
- assertEquals(NUM_TASKS, jm.getAvailableSlots());
+ assertEquals(2*NUM_TASKS, jm.getNumberOfSlotsAvailableToScheduler());
// we need to register the job at the library cache manager (with no libraries)
LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index 0efe10b..7ea4fd5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager.scheduler;
import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getTestVertex;
+import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getTestVertexWithLocation;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -48,12 +49,13 @@ public class ScheduleWithCoLocationHintTest {
SlotSharingGroup sharingGroup = new SlotSharingGroup();
- CoLocationConstraint c1 = new CoLocationConstraint();
- CoLocationConstraint c2 = new CoLocationConstraint();
- CoLocationConstraint c3 = new CoLocationConstraint();
- CoLocationConstraint c4 = new CoLocationConstraint();
- CoLocationConstraint c5 = new CoLocationConstraint();
- CoLocationConstraint c6 = new CoLocationConstraint();
+ CoLocationGroup ccg = new CoLocationGroup();
+ CoLocationConstraint c1 = new CoLocationConstraint(ccg);
+ CoLocationConstraint c2 = new CoLocationConstraint(ccg);
+ CoLocationConstraint c3 = new CoLocationConstraint(ccg);
+ CoLocationConstraint c4 = new CoLocationConstraint(ccg);
+ CoLocationConstraint c5 = new CoLocationConstraint(ccg);
+ CoLocationConstraint c6 = new CoLocationConstraint(ccg);
// schedule 4 tasks from the first vertex group
AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1));
@@ -174,7 +176,7 @@ public class ScheduleWithCoLocationHintTest {
assertEquals(2, scheduler.getNumberOfAvailableSlots());
SlotSharingGroup sharingGroup = new SlotSharingGroup();
- CoLocationConstraint c1 = new CoLocationConstraint();
+ CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup());
AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1));
AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1));
@@ -218,7 +220,7 @@ public class ScheduleWithCoLocationHintTest {
assertEquals(2, scheduler.getNumberOfAvailableSlots());
SlotSharingGroup sharingGroup = new SlotSharingGroup();
- CoLocationConstraint c1 = new CoLocationConstraint();
+ CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup());
AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1));
s1.releaseSlot();
@@ -260,10 +262,12 @@ public class ScheduleWithCoLocationHintTest {
assertEquals(4, scheduler.getNumberOfAvailableSlots());
- CoLocationConstraint clc1 = new CoLocationConstraint();
- CoLocationConstraint clc2 = new CoLocationConstraint();
- CoLocationConstraint clc3 = new CoLocationConstraint();
- CoLocationConstraint clc4 = new CoLocationConstraint();
+ CoLocationGroup grp = new CoLocationGroup();
+ CoLocationConstraint clc1 = new CoLocationConstraint(grp);
+ CoLocationConstraint clc2 = new CoLocationConstraint(grp);
+ CoLocationConstraint clc3 = new CoLocationConstraint(grp);
+ CoLocationConstraint clc4 = new CoLocationConstraint(grp);
+
SlotSharingGroup shareGroup = new SlotSharingGroup();
// first wave
@@ -303,4 +307,298 @@ public class ScheduleWithCoLocationHintTest {
fail(e.getMessage());
}
}
+
+
+ @Test
+ public void testGetsNonLocalFromSharingGroupFirst() {
+ try {
+ JobVertexID jid1 = new JobVertexID();
+ JobVertexID jid2 = new JobVertexID();
+ JobVertexID jid3 = new JobVertexID();
+
+ Scheduler scheduler = new Scheduler();
+
+ Instance i1 = getRandomInstance(1);
+ Instance i2 = getRandomInstance(1);
+
+ scheduler.newInstanceAvailable(i2);
+ scheduler.newInstanceAvailable(i1);
+
+ assertEquals(2, scheduler.getNumberOfAvailableSlots());
+
+ SlotSharingGroup sharingGroup = new SlotSharingGroup();
+
+ CoLocationGroup ccg = new CoLocationGroup();
+ CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
+ CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
+
+ // schedule something into the shared group so that both instances are in the sharing group
+ AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup));
+ AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup));
+
+ // schedule one locally to instance 1
+ AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup, cc1));
+
+ // schedule with co location constraint (yet unassigned) and a preference for
+ // instance 1, but it can only get instance 2
+ AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc2));
+
+ // schedule something into the assigned co-location constraints and check that they override the
+ // other preferences
+ AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, i2), sharingGroup, cc1));
+ AllocatedSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, i1), sharingGroup, cc2));
+
+ // check that each slot got three
+ assertEquals(3, ((SubSlot) s1).getSharedSlot().getNumberOfAllocatedSubSlots());
+ assertEquals(3, ((SubSlot) s2).getSharedSlot().getNumberOfAllocatedSubSlots());
+
+ assertEquals(s1.getInstance(), s3.getInstance());
+ assertEquals(s2.getInstance(), s4.getInstance());
+ assertEquals(s1.getInstance(), s5.getInstance());
+ assertEquals(s2.getInstance(), s6.getInstance());
+
+ // check the scheduler's bookkeeping
+ assertEquals(0, scheduler.getNumberOfAvailableSlots());
+
+ assertEquals(5, scheduler.getNumberOfLocalizedAssignments());
+ assertEquals(1, scheduler.getNumberOfNonLocalizedAssignments());
+ assertEquals(0, scheduler.getNumberOfUnconstrainedAssignments());
+
+ // release some slots, be sure that new available ones come up
+ s1.releaseSlot();
+ s2.releaseSlot();
+ s3.releaseSlot();
+ s4.releaseSlot();
+ s5.releaseSlot();
+ s6.releaseSlot();
+ assertEquals(2, scheduler.getNumberOfAvailableSlots());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSlotReleasedInBetween() {
+ try {
+ JobVertexID jid1 = new JobVertexID();
+ JobVertexID jid2 = new JobVertexID();
+
+ Scheduler scheduler = new Scheduler();
+
+ Instance i1 = getRandomInstance(1);
+ Instance i2 = getRandomInstance(1);
+
+ scheduler.newInstanceAvailable(i2);
+ scheduler.newInstanceAvailable(i1);
+
+ assertEquals(2, scheduler.getNumberOfAvailableSlots());
+
+ SlotSharingGroup sharingGroup = new SlotSharingGroup();
+
+ CoLocationGroup ccg = new CoLocationGroup();
+ CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
+ CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
+
+ AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
+ AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2));
+
+ s1.releaseSlot();
+ s2.releaseSlot();
+
+ assertEquals(2, scheduler.getNumberOfAvailableSlots());
+ assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
+
+ AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i2), sharingGroup, cc1));
+ AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc2));
+
+ // still preserves the previous instance mapping)
+ assertEquals(i1, s3.getInstance());
+ assertEquals(i2, s4.getInstance());
+
+ s3.releaseSlot();
+ s4.releaseSlot();
+
+ assertEquals(2, scheduler.getNumberOfAvailableSlots());
+
+ assertEquals(4, scheduler.getNumberOfLocalizedAssignments());
+ assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
+ assertEquals(0, scheduler.getNumberOfUnconstrainedAssignments());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSlotReleasedInBetweenAndNoNewLocal() {
+ try {
+ JobVertexID jid1 = new JobVertexID();
+ JobVertexID jid2 = new JobVertexID();
+ JobVertexID jidx = new JobVertexID();
+
+ Scheduler scheduler = new Scheduler();
+
+ Instance i1 = getRandomInstance(1);
+ Instance i2 = getRandomInstance(1);
+
+ scheduler.newInstanceAvailable(i2);
+ scheduler.newInstanceAvailable(i1);
+
+ assertEquals(2, scheduler.getNumberOfAvailableSlots());
+
+ SlotSharingGroup sharingGroup = new SlotSharingGroup();
+
+ CoLocationGroup ccg = new CoLocationGroup();
+ CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
+ CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
+
+ AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
+ AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2));
+
+ s1.releaseSlot();
+ s2.releaseSlot();
+
+ assertEquals(2, scheduler.getNumberOfAvailableSlots());
+ assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
+
+ AllocatedSlot sa = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)));
+ AllocatedSlot sb = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)));
+
+ try {
+ scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i2), sharingGroup, cc1));
+ fail("should not be able to find a resource");
+ } catch (NoResourceAvailableException e) {
+ // good
+ } catch (Exception e) {
+ fail("wrong exception");
+ }
+
+ sa.releaseSlot();
+ sb.releaseSlot();
+
+ assertEquals(2, scheduler.getNumberOfAvailableSlots());
+
+ assertEquals(2, scheduler.getNumberOfLocalizedAssignments());
+ assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
+ assertEquals(2, scheduler.getNumberOfUnconstrainedAssignments());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testScheduleOutOfOrder() {
+ try {
+ JobVertexID jid1 = new JobVertexID();
+ JobVertexID jid2 = new JobVertexID();
+
+ Scheduler scheduler = new Scheduler();
+
+ Instance i1 = getRandomInstance(1);
+ Instance i2 = getRandomInstance(1);
+
+ scheduler.newInstanceAvailable(i2);
+ scheduler.newInstanceAvailable(i1);
+
+ assertEquals(2, scheduler.getNumberOfAvailableSlots());
+
+ SlotSharingGroup sharingGroup = new SlotSharingGroup();
+
+ CoLocationGroup ccg = new CoLocationGroup();
+ CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
+ CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
+
+ // schedule something from the second job vertex id before the first is filled,
+ // and give locality preferences that hint at using the same shared slot for both
+ // co location constraints (which we seek to prevent)
+ AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
+ AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup, cc2));
+
+ AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc1));
+ AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i1), sharingGroup, cc2));
+
+ // check that each slot got three
+ assertEquals(2, ((SubSlot) s1).getSharedSlot().getNumberOfAllocatedSubSlots());
+ assertEquals(2, ((SubSlot) s2).getSharedSlot().getNumberOfAllocatedSubSlots());
+
+ assertEquals(s1.getInstance(), s3.getInstance());
+ assertEquals(s2.getInstance(), s4.getInstance());
+
+ // check the scheduler's bookkeeping
+ assertEquals(0, scheduler.getNumberOfAvailableSlots());
+
+ assertEquals(3, scheduler.getNumberOfLocalizedAssignments());
+ assertEquals(1, scheduler.getNumberOfNonLocalizedAssignments());
+ assertEquals(0, scheduler.getNumberOfUnconstrainedAssignments());
+
+ // release some slots, be sure that new available ones come up
+ s1.releaseSlot();
+ s2.releaseSlot();
+ s3.releaseSlot();
+ s4.releaseSlot();
+ assertEquals(2, scheduler.getNumberOfAvailableSlots());
+
+ assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
+ assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid1));
+ assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid2));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void nonColocationFollowsCoLocation() {
+ try {
+ JobVertexID jid1 = new JobVertexID();
+ JobVertexID jid2 = new JobVertexID();
+
+ Scheduler scheduler = new Scheduler();
+
+ Instance i1 = getRandomInstance(1);
+ Instance i2 = getRandomInstance(1);
+
+ scheduler.newInstanceAvailable(i2);
+ scheduler.newInstanceAvailable(i1);
+
+ assertEquals(2, scheduler.getNumberOfAvailableSlots());
+
+ SlotSharingGroup sharingGroup = new SlotSharingGroup();
+
+ CoLocationGroup ccg = new CoLocationGroup();
+ CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
+ CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
+
+ AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
+ AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2));
+
+ AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup));
+ AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup));
+
+ // check that each slot got three
+ assertEquals(2, ((SubSlot) s1).getSharedSlot().getNumberOfAllocatedSubSlots());
+ assertEquals(2, ((SubSlot) s2).getSharedSlot().getNumberOfAllocatedSubSlots());
+
+ s1.releaseSlot();
+ s2.releaseSlot();
+ s3.releaseSlot();
+ s4.releaseSlot();
+
+ assertEquals(2, scheduler.getNumberOfAvailableSlots());
+
+ assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
+ assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid1));
+ assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid2));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index d2e7598..09a416e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -101,6 +101,7 @@ public class SchedulerTestUtils {
when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);
when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(numTasks);
when(vertex.toString()).thenReturn("TEST-VERTEX");
+ when(vertex.getSimpleName()).thenReturn("TEST-VERTEX");
Execution execution = mock(Execution.class);
when(execution.getVertex()).thenReturn(vertex);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
index de22999..7859b33 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
@@ -19,14 +19,13 @@
package org.apache.flink.runtime.jobmanager.scheduler;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-
import static org.junit.Assert.*;
import org.junit.Test;
-
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -37,7 +36,16 @@ public class SharedSlotsTest {
public void createAndDoNotRelease() {
try {
SlotSharingGroupAssignment assignment = mock(SlotSharingGroupAssignment.class);
- when(assignment.sharedSlotAvailableForJid(any(SharedSlot.class), any(JobVertexID.class), any(boolean.class))).thenReturn(false);
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ final SubSlot sub = (SubSlot) invocation.getArguments()[0];
+ final SharedSlot shared = (SharedSlot) invocation.getArguments()[1];
+ shared.releaseSlot(sub);
+ return null;
+ }
+
+ }).when(assignment).releaseSubSlot(any(SubSlot.class), any(SharedSlot.class));
Instance instance = SchedulerTestUtils.getRandomInstance(1);
@@ -77,8 +85,18 @@ public class SharedSlotsTest {
public void createAndRelease() {
try {
SlotSharingGroupAssignment assignment = mock(SlotSharingGroupAssignment.class);
- when(assignment.sharedSlotAvailableForJid(any(SharedSlot.class), any(JobVertexID.class), eq(false))).thenReturn(false);
- when(assignment.sharedSlotAvailableForJid(any(SharedSlot.class), any(JobVertexID.class), eq(true))).thenReturn(true);
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ final SubSlot sub = (SubSlot) invocation.getArguments()[0];
+ final SharedSlot shared = (SharedSlot) invocation.getArguments()[1];
+ if (shared.releaseSlot(sub) == 0) {
+ shared.dispose();
+ }
+ return null;
+ }
+
+ }).when(assignment).releaseSubSlot(any(SubSlot.class), any(SharedSlot.class));
Instance instance = SchedulerTestUtils.getRandomInstance(1);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cdee8750/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ServerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ServerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ServerTestUtils.java
index 3b4f293..61ac104 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ServerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ServerTestUtils.java
@@ -184,7 +184,7 @@ public final class ServerTestUtils {
public static void waitForJobManagerToBecomeReady(final ExtendedManagementProtocol jobManager) throws IOException,
InterruptedException {
- while (jobManager.getAvailableSlots() == 0) {
+ while (jobManager.getTotalNumberOfRegisteredSlots() == 0) {
Thread.sleep(100);
}
}