You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/27 17:01:20 UTC
[2/7] flink git commit: [FLINK-1952] [jobmanager] Rework and fix slot
sharing scheduler
http://git-wip-us.apache.org/repos/asf/flink/blob/017b4f0f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 9b2000b..579a6b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -36,27 +36,39 @@ import akka.dispatch.Futures;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
+
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.util.AbstractID;
+import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.instance.SharedSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceDiedException;
import org.apache.flink.runtime.instance.InstanceListener;
import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
- * The scheduler is responsible for distributing the ready-to-run tasks and assigning them to instances and
- * slots.
+ * The scheduler is responsible for distributing the ready-to-run tasks among instances and slots.
+ *
+ * <p>The scheduler supports two scheduling modes:</p>
+ * <ul>
+ * <li>Immediate scheduling: A request for a task slot immediately returns a task slot, if one is
+ * available, or throws a {@link NoResourceAvailableException}</li>.
+ * <li>Queued Scheduling: A request for a task slot is queued and returns a future that will be
+ * fulfilled as soon as a slot becomes available.</li>
+ * </ul>
*/
public class Scheduler implements InstanceListener, SlotAvailabilityListener {
- static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
+ /** Scheduler-wide logger */
+ private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
+ /** All modifications to the scheduler structures are performed under a global scheduler lock */
private final Object globalLock = new Object();
/** All instances that the scheduler can deploy to */
@@ -71,20 +83,24 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
/** All tasks pending to be scheduled */
private final Queue<QueuedTask> taskQueue = new ArrayDeque<QueuedTask>();
- private final BlockingQueue<Instance> newlyAvailableInstances;
+ private final BlockingQueue<Instance> newlyAvailableInstances = new LinkedBlockingQueue<Instance>();
+ /** The number of slot allocations that had no location preference */
private int unconstrainedAssignments;
-
+
+ /** The number of slot allocations where locality could be respected */
private int localizedAssignments;
-
+
+ /** The number of slot allocations where locality could not be respected */
private int nonLocalizedAssignments;
-
-
- public Scheduler() {
- this.newlyAvailableInstances = new LinkedBlockingQueue<Instance>();
- }
-
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new scheduler.
+ */
+ public Scheduler() {}
/**
* Shuts the scheduler down. After shut down no more tasks can be added to the scheduler.
@@ -102,9 +118,9 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
}
}
- // --------------------------------------------------------------------------------------------
+ // ------------------------------------------------------------------------
// Scheduling
- // --------------------------------------------------------------------------------------------
+ // ------------------------------------------------------------------------
public SimpleSlot scheduleImmediately(ScheduledUnit task) throws NoResourceAvailableException {
Object ret = scheduleTask(task, false);
@@ -130,13 +146,12 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
}
/**
- * Returns either an {@link org.apache.flink.runtime.instance.SimpleSlot}, or an {@link SlotAllocationFuture}.
+ * Returns either a {@link org.apache.flink.runtime.instance.SimpleSlot}, or a {@link SlotAllocationFuture}.
*/
private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {
if (task == null) {
- throw new IllegalArgumentException();
+ throw new NullPointerException();
}
-
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling task " + task);
}
@@ -148,14 +163,16 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
preferredLocations != null && preferredLocations.iterator().hasNext();
synchronized (globalLock) {
-
- // 1) === If the task has a slot sharing group, schedule with shared slots ===
SlotSharingGroup sharingUnit = task.getSlotSharingGroup();
+
if (sharingUnit != null) {
+
+ // 1) === If the task has a slot sharing group, schedule with shared slots ===
if (queueIfNoResource) {
- throw new IllegalArgumentException("A task with a vertex sharing group was scheduled in a queued fashion.");
+ throw new IllegalArgumentException(
+ "A task with a vertex sharing group was scheduled in a queued fashion.");
}
final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
@@ -163,12 +180,12 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
// sanity check that we do not use an externally forced location and a co-location constraint together
if (constraint != null && forceExternalLocation) {
- throw new IllegalArgumentException("The scheduling cannot be contrained simultaneously by a "
- + "co-location constriaint and an external location constraint.");
+ throw new IllegalArgumentException("The scheduling cannot be constrained simultaneously by a "
+ + "co-location constraint and an external location constraint.");
}
// get a slot from the group, if the group has one for us (and can fulfill the constraint)
- SimpleSlot slotFromGroup;
+ final SimpleSlot slotFromGroup;
if (constraint == null) {
slotFromGroup = assignment.getSlotForTask(vertex);
}
@@ -182,74 +199,87 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
// the following needs to make sure any allocated slot is released in case of an error
try {
- // check whether the slot from the group is already what we want
- if (slotFromGroup != null) {
- // local (or unconstrained in the current group)
- if (slotFromGroup.getLocality() != Locality.NON_LOCAL) {
- updateLocalityCounters(slotFromGroup.getLocality());
- return slotFromGroup;
+ // check whether the slot from the group is already what we want.
+ // any slot that is local, or where the assignment was unconstrained is good!
+ if (slotFromGroup != null && slotFromGroup.getLocality() != Locality.NON_LOCAL) {
+
+ // if this is the first slot for the co-location constraint, we lock
+ // the location, because we are quite happy with the slot
+ if (constraint != null && !constraint.isAssigned()) {
+ constraint.lockLocation();
}
+
+ updateLocalityCounters(slotFromGroup.getLocality(), vertex, slotFromGroup.getInstance());
+ return slotFromGroup;
}
- final Iterable<Instance> locations = (constraint == null || constraint.isUnassigned()) ?
- vertex.getPreferredLocations() : Collections.singleton(constraint.getLocation());
+ // the group did not have a local slot for us. see if we can one (or a better one)
+
+ // our location preference is either determined by the location constraint, or by the
+ // vertex's preferred locations
+ final Iterable<Instance> locations;
+ final boolean localOnly;
+ if (constraint != null && constraint.isAssigned()) {
+ locations = Collections.singleton(constraint.getLocation());
+ localOnly = true;
+ }
+ else {
+ locations = vertex.getPreferredLocations();
+ localOnly = forceExternalLocation;
+ }
- // get a new slot, since we could not place it into the group, or we could not place it locally
- newSlot = getFreeSubSlotForTask(vertex, locations, assignment, constraint, forceExternalLocation);
+ newSlot = getNewSlotForSharingGroup(vertex, locations, assignment, constraint, localOnly);
if (newSlot == null) {
if (slotFromGroup == null) {
- // both null
- if (constraint == null || constraint.isUnassigned()) {
- if (forceExternalLocation) {
- // could not satisfy the external location constraint
- String hosts = getHostnamesFromInstances(preferredLocations);
- throw new NoResourceAvailableException("Could not schedule task " + vertex
- + " to any of the required hosts: " + hosts);
- }
- else {
- // simply nothing is available
- throw new NoResourceAvailableException(task, getNumberOfAvailableInstances(),
- getTotalNumberOfSlots(), getNumberOfAvailableSlots());
- }
+ // both null, which means there is nothing available at all
+
+ if (constraint != null && constraint.isAssigned()) {
+ // nothing is available on the node where the co-location constraint forces us to
+ throw new NoResourceAvailableException("Could not allocate a slot on instance " +
+ constraint.getLocation() + ", as required by the co-location constraint.");
+ }
+ else if (forceExternalLocation) {
+ // could not satisfy the external location constraint
+ String hosts = getHostnamesFromInstances(preferredLocations);
+ throw new NoResourceAvailableException("Could not schedule task " + vertex
+ + " to any of the required hosts: " + hosts);
}
else {
- // nothing is available on the node where the co-location constraint pushes us
- throw new NoResourceAvailableException("Could not allocate a slot on instance " +
- constraint.getLocation() + ", as required by the co-location constraint.");
+ // simply nothing is available
+ throw new NoResourceAvailableException(task, getNumberOfAvailableInstances(),
+ getTotalNumberOfSlots(), getNumberOfAvailableSlots());
}
- } else {
- // got a non-local from the group, and no new one
+ }
+ else {
+ // got a non-local from the group, and no new one, so we use the non-local
+ // slot from the sharing group
toUse = slotFromGroup;
}
}
- else if (slotFromGroup == null || newSlot.getLocality() == Locality.LOCAL) {
- // new slot is preferable
+ else if (slotFromGroup == null || !slotFromGroup.isAlive() || newSlot.getLocality() == Locality.LOCAL) {
+ // if there is no slot from the group, or the new slot is local,
+ // then we use the new slot
if (slotFromGroup != null) {
slotFromGroup.releaseSlot();
}
-
toUse = newSlot;
}
else {
- // both are available and usable. neither is local
+ // both are available and usable. neither is local. in that case, we may
+ // as well use the slot from the sharing group, to minimize the number of
+ // instances that the job occupies
newSlot.releaseSlot();
toUse = slotFromGroup;
}
-
- // assign to the co-location hint, if we have one and it is unassigned
- // if it was assigned before and the new one is not local, it is a fail
- if (constraint != null) {
- if (constraint.isUnassigned() || toUse.getLocality() == Locality.LOCAL) {
- constraint.setSharedSlot(toUse.getParent());
- } else {
- // the fail
- throw new NoResourceAvailableException("Could not allocate a slot on instance " +
- constraint.getLocation() + ", as required by the co-location constraint.");
- }
+
+ // if this is the first slot for the co-location constraint, we lock
+ // the location, because we are going to use that slot
+ if (constraint != null && !constraint.isAssigned()) {
+ constraint.lockLocation();
}
- updateLocalityCounters(toUse.getLocality());
+ updateLocalityCounters(toUse.getLocality(), vertex, toUse.getInstance());
}
catch (NoResourceAvailableException e) {
throw e;
@@ -266,11 +296,14 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
}
return toUse;
- } else {
+ }
+ else {
+
// 2) === schedule without hints and sharing ===
+
SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation);
if (slot != null) {
- updateLocalityCounters(slot.getLocality());
+ updateLocalityCounters(slot.getLocality(), vertex, slot.getInstance());
return slot;
}
else {
@@ -286,30 +319,14 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
+ " to any of the required hosts: " + hosts);
}
else {
- throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots(), getNumberOfAvailableSlots());
+ throw new NoResourceAvailableException(getNumberOfAvailableInstances(),
+ getTotalNumberOfSlots(), getNumberOfAvailableSlots());
}
}
}
}
}
- private String getHostnamesFromInstances(Iterable<Instance> instances) {
- StringBuilder bld = new StringBuilder();
-
- for (Instance i : instances) {
- bld.append(i.getInstanceConnectionInfo().getHostname());
- bld.append(", ");
- }
-
- if (bld.length() == 0) {
- return "";
- }
- else {
- bld.setLength(bld.length() - 2);
- return bld.toString();
- }
- }
-
/**
* Gets a suitable instance to schedule the vertex execution to.
* <p>
@@ -318,8 +335,9 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
* @param vertex The task to run.
* @return The instance to run the vertex on, it {@code null}, if no instance is available.
*/
- protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex, Iterable<Instance> requestedLocations, boolean localOnly) {
-
+ protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex,
+ Iterable<Instance> requestedLocations,
+ boolean localOnly) {
// we need potentially to loop multiple times, because there may be false positives
// in the set-with-available-instances
while (true) {
@@ -332,18 +350,8 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
Instance instanceToUse = instanceLocalityPair.getLeft();
Locality locality = instanceLocalityPair.getRight();
- if (LOG.isDebugEnabled()){
- if(locality == Locality.LOCAL){
- LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
- }else if(locality == Locality.NON_LOCAL){
- LOG.debug("Non-local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
- }else if(locality == Locality.UNCONSTRAINED) {
- LOG.debug("Unconstrained assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
- }
- }
-
try {
- SimpleSlot slot = instanceToUse.allocateSimpleSlot(vertex.getJobId(), vertex.getJobvertexId());
+ SimpleSlot slot = instanceToUse.allocateSimpleSlot(vertex.getJobId());
// if the instance has further available slots, re-add it to the set of available resources.
if (instanceToUse.hasResourcesAvailable()) {
@@ -365,54 +373,64 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
}
}
- protected SimpleSlot getFreeSubSlotForTask(ExecutionVertex vertex,
- Iterable<Instance> requestedLocations,
- SlotSharingGroupAssignment groupAssignment,
- CoLocationConstraint constraint,
- boolean localOnly) {
+ /**
+ * Tries to allocate a new slot for a vertex that is part of a slot sharing group. If one
+ * of the instances has a slot available, the method will allocate it as a shared slot, add that
+ * shared slot to the sharing group, and allocate a simple slot from that shared slot.
+ *
+ * <p>This method will try to allocate a slot from one of the local instances, and fall back to
+ * non-local instances, if permitted.</p>
+ *
+ * @param vertex The vertex to allocate the slot for.
+ * @param requestedLocations The locations that are considered local. May be null or empty, if the
+ * vertex has no location preferences.
+ * @param groupAssignment The slot sharing group of the vertex. Mandatory parameter.
+ * @param constraint The co-location constraint of the vertex. May be null.
+ * @param localOnly Flag to indicate if non-local choices are acceptable.
+ *
+ * @return A sub-slot for the given vertex, or {@code null}, if no slot is available.
+ */
+ protected SimpleSlot getNewSlotForSharingGroup(ExecutionVertex vertex,
+ Iterable<Instance> requestedLocations,
+ SlotSharingGroupAssignment groupAssignment,
+ CoLocationConstraint constraint,
+ boolean localOnly)
+ {
// we need potentially to loop multiple times, because there may be false positives
// in the set-with-available-instances
while (true) {
Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly);
-
+
if (instanceLocalityPair == null) {
+ // nothing is available
return null;
}
- Instance instanceToUse = instanceLocalityPair.getLeft();
- Locality locality = instanceLocalityPair.getRight();
-
- if (LOG.isDebugEnabled()) {
- if (locality == Locality.LOCAL) {
- LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
- } else if(locality == Locality.NON_LOCAL) {
- LOG.debug("Non-local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
- } else if(locality == Locality.UNCONSTRAINED) {
- LOG.debug("Unconstrained assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
- }
- }
+ final Instance instanceToUse = instanceLocalityPair.getLeft();
+ final Locality locality = instanceLocalityPair.getRight();
try {
- AbstractID groupID = constraint == null ? vertex.getJobvertexId() : constraint.getGroupId();
-
- // root SharedSlot
- SharedSlot sharedSlot = instanceToUse.allocateSharedSlot(vertex.getJobId(), groupAssignment, groupID);
+ JobVertexID groupID = vertex.getJobvertexId();
+
+ // allocate a shared slot from the instance
+ SharedSlot sharedSlot = instanceToUse.allocateSharedSlot(vertex.getJobId(), groupAssignment);
// if the instance has further available slots, re-add it to the set of available resources.
if (instanceToUse.hasResourcesAvailable()) {
this.instancesWithAvailableResources.add(instanceToUse);
}
- if(sharedSlot != null){
- // If constraint != null, then slot nested in a SharedSlot nested in sharedSlot
- // If constraint == null, then slot nested in sharedSlot
- SimpleSlot slot = groupAssignment.addSharedSlotAndAllocateSubSlot(sharedSlot,
- locality, groupID, constraint);
+ if (sharedSlot != null) {
+ // add the shared slot to the assignment group and allocate a sub-slot
+ SimpleSlot slot = constraint == null ?
+ groupAssignment.addSharedSlotAndAllocateSubSlot(sharedSlot, locality, groupID) :
+ groupAssignment.addSharedSlotAndAllocateSubSlot(sharedSlot, locality, constraint);
- if(slot != null){
+ if (slot != null) {
return slot;
- } else {
- // release shared slot
+ }
+ else {
+ // could not add and allocate the sub-slot, so release shared slot
sharedSlot.releaseSlot();
}
}
@@ -428,58 +446,56 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
}
/**
- * NOTE: This method is not thread-safe, it needs to be synchronized by the caller.
- *
* Tries to find a requested instance. If no such instance is available it will return a non-
* local instance. If no such instance exists (all slots occupied), then return null.
+ *
+ * <p><b>NOTE:</b> This method is not thread-safe, it needs to be synchronized by the caller.</p>
*
- * @param requestedLocations
+ * @param requestedLocations The list of preferred instances. May be null or empty, which indicates that
+ * no locality preference exists.
+ * @param localOnly Flag to indicate whether only one of the exact local instances can be chosen.
*/
private Pair<Instance, Locality> findInstance(Iterable<Instance> requestedLocations, boolean localOnly){
- if (this.instancesWithAvailableResources.isEmpty()) {
- // check if the asynchronous calls did not yet return the queues
+ // drain the queue of newly available instances
+ while (this.newlyAvailableInstances.size() > 0) {
Instance queuedInstance = this.newlyAvailableInstances.poll();
- if (queuedInstance == null) {
- return null;
- } else {
+ if (queuedInstance != null) {
this.instancesWithAvailableResources.add(queuedInstance);
}
}
+
+ // if nothing is available at all, return null
+ if (this.instancesWithAvailableResources.isEmpty()) {
+ return null;
+ }
Iterator<Instance> locations = requestedLocations == null ? null : requestedLocations.iterator();
- Instance instanceToUse = null;
- Locality locality = Locality.UNCONSTRAINED;
-
if (locations != null && locations.hasNext()) {
// we have a locality preference
while (locations.hasNext()) {
Instance location = locations.next();
-
if (location != null && this.instancesWithAvailableResources.remove(location)) {
- instanceToUse = location;
- locality = Locality.LOCAL;
- break;
+ return new ImmutablePair<Instance, Locality>(location, Locality.LOCAL);
}
}
-
- if (instanceToUse == null) {
- if (localOnly) {
- return null;
- }
- else {
- instanceToUse = this.instancesWithAvailableResources.poll();
- locality = Locality.NON_LOCAL;
- }
+
+ // no local instance available
+ if (localOnly) {
+ return null;
+ }
+ else {
+ Instance instanceToUse = this.instancesWithAvailableResources.poll();
+ return new ImmutablePair<Instance, Locality>(instanceToUse, Locality.NON_LOCAL);
}
}
else {
- instanceToUse = this.instancesWithAvailableResources.poll();
+ // no location preference, so use some instance
+ Instance instanceToUse = this.instancesWithAvailableResources.poll();
+ return new ImmutablePair<Instance, Locality>(instanceToUse, Locality.UNCONSTRAINED);
}
-
- return new ImmutablePair<Instance, Locality>(instanceToUse, locality);
}
@Override
@@ -524,7 +540,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
ExecutionVertex vertex = task.getTaskToExecute().getVertex();
try {
- SimpleSlot newSlot = instance.allocateSimpleSlot(vertex.getJobId(), vertex.getJobvertexId());
+ SimpleSlot newSlot = instance.allocateSimpleSlot(vertex.getJobId());
if (newSlot != null) {
// success, remove from the task queue and notify the future
@@ -554,7 +570,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
}
}
- private void updateLocalityCounters(Locality locality) {
+ private void updateLocalityCounters(Locality locality, ExecutionVertex vertex, Instance location) {
switch (locality) {
case UNCONSTRAINED:
this.unconstrainedAssignments++;
@@ -568,12 +584,22 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
default:
throw new RuntimeException(locality.name());
}
+
+ if (LOG.isDebugEnabled()) {
+ switch (locality) {
+ case UNCONSTRAINED:
+ LOG.debug("Unconstrained assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + location);
+ break;
+ case LOCAL:
+ LOG.debug("Local assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + location);
+ break;
+ case NON_LOCAL:
+ LOG.debug("Non-local assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + location);
+ break;
+ }
+ }
}
-
-
-
-
// --------------------------------------------------------------------------------------------
// Instance Availability
// --------------------------------------------------------------------------------------------
@@ -646,7 +672,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
if (instance == null) {
throw new NullPointerException();
}
-
+
allInstances.remove(instance);
instancesWithAvailableResources.remove(instance);
@@ -755,7 +781,36 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
}
}
}
+
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static String getHostnamesFromInstances(Iterable<Instance> instances) {
+ StringBuilder bld = new StringBuilder();
+
+ boolean successive = false;
+ for (Instance i : instances) {
+ if (successive) {
+ bld.append(", ");
+ } else {
+ successive = true;
+ }
+ bld.append(i.getInstanceConnectionInfo().getHostname());
+ }
+
+ return bld.toString();
+ }
+ // ------------------------------------------------------------------------
+ // Nested members
+ // ------------------------------------------------------------------------
+
+ /**
+ * An entry in the queue of schedule requests. Contains the task to be scheduled and
+ * the future that tracks the completion.
+ */
private static final class QueuedTask {
private final ScheduledUnit task;
http://git-wip-us.apache.org/repos/asf/flink/blob/017b4f0f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
index dcde6b2..0fa1362 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.Set;
import java.util.TreeSet;
+import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
/**
@@ -47,7 +48,8 @@ public class SlotSharingGroup implements java.io.Serializable {
this.ids.add(id);
}
}
-
+
+ // --------------------------------------------------------------------------------------------
public void addVertexToGroup(JobVertexID id) {
this.ids.add(id);
@@ -79,7 +81,9 @@ public class SlotSharingGroup implements java.io.Serializable {
this.taskAssignment = null;
}
- // --------------------------------------------------------------------------------------------
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
@Override
public String toString() {
http://git-wip-us.apache.org/repos/asf/flink/blob/017b4f0f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
deleted file mode 100644
index 3c292f7..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
+++ /dev/null
@@ -1,485 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.flink.util.AbstractID;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.SharedSlot;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.Slot;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.slf4j.Logger;
-
-
-public class SlotSharingGroupAssignment {
-
- private static final Logger LOG = Scheduler.LOG;
-
- private final Object lock = new Object();
-
- /** All slots currently allocated to this sharing group */
- private final Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>();
-
- /** The slots available per vertex type (jid), keyed by instance, to make them locatable */
- private final Map<AbstractID, Map<Instance, List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<AbstractID, Map<Instance, List<SharedSlot>>>();
-
- // --------------------------------------------------------------------------------------------
-
- public SimpleSlot addSharedSlotAndAllocateSubSlot(SharedSlot sharedSlot, Locality locality,
- AbstractID groupId, CoLocationConstraint constraint) {
-
- final Instance location = sharedSlot.getInstance();
-
- synchronized (lock) {
- // add to the total bookkeeping
- allSlots.add(sharedSlot);
-
- SimpleSlot subSlot = null;
-
- if (constraint == null) {
- // allocate us a sub slot to return
- subSlot = sharedSlot.allocateSubSlot(groupId);
- } else {
- // we need a colocation slot --> a SimpleSlot nested in a SharedSlot to host other colocated tasks
- SharedSlot constraintGroupSlot = sharedSlot.allocateSharedSlot(groupId);
-
- if(constraintGroupSlot == null) {
- subSlot = null;
- } else {
- subSlot = constraintGroupSlot.allocateSubSlot(null);
-
- // could not create a sub slot --> release constraintGroupSlot
- if(subSlot == null){
- constraintGroupSlot.releaseSlot();
- }
- }
- }
-
- // if sharedSlot is dead, but this should never happen since we just created a fresh
- // SharedSlot in the caller
- if(subSlot == null) {
- LOG.warn("Could not allocate a sub slot.");
-
- return null;
- } else {
- // preserve the locality information
- subSlot.setLocality(locality);
-
- boolean entryForNewJidExists = false;
-
- // let the other vertex types know about this one as well
- for (Map.Entry<AbstractID, Map<Instance, List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) {
-
- if (entry.getKey().equals(groupId)) {
- entryForNewJidExists = true;
- continue;
- }
-
- Map<Instance, List<SharedSlot>> available = entry.getValue();
- putIntoMultiMap(available, location, sharedSlot);
- }
-
- // make sure an empty entry exists for this group, if no other entry exists
- if (!entryForNewJidExists) {
- availableSlotsPerJid.put(groupId, new LinkedHashMap<Instance, List<SharedSlot>>());
- }
-
- return subSlot;
- }
- }
- }
-
- /**
- * Gets a slot suitable for the given task vertex. This method will prefer slots that are local
- * (with respect to {@link ExecutionVertex#getPreferredLocations()}), but will return non local
- * slots if no local slot is available. The method returns null, when no slot is available for the
- * given JobVertexID at all.
- *
- * @param vertex
- *
- * @return A task vertex for a task with the given JobVertexID, or null, if none is available.
- */
- public SimpleSlot getSlotForTask(ExecutionVertex vertex) {
- synchronized (lock) {
- Pair<SharedSlot, Locality> p = getSlotForTaskInternal(vertex.getJobvertexId(), vertex, vertex.getPreferredLocations(), false);
-
- if (p != null) {
- SharedSlot ss = p.getLeft();
- SimpleSlot slot = ss.allocateSubSlot(vertex.getJobvertexId());
- slot.setLocality(p.getRight());
- return slot;
- }
- else {
- return null;
- }
- }
-
- }
-
- public SimpleSlot getSlotForTask(ExecutionVertex vertex, CoLocationConstraint constraint) {
-
- synchronized (lock) {
- SharedSlot shared = constraint.getSharedSlot();
-
- if (shared != null && !shared.isDead()) {
- // initialized and set
- SimpleSlot subslot = shared.allocateSubSlot(null);
- subslot.setLocality(Locality.LOCAL);
- return subslot;
- }
- else if (shared == null) {
- // not initialized, grab a new slot. preferred locations are defined by the vertex
- // we only associate the slot with the constraint, if it was a local match
- Pair<SharedSlot, Locality> p = getSlotForTaskInternal(constraint.getGroupId(), vertex, vertex.getPreferredLocations(), false);
-
- if (p == null) {
- return null;
- } else {
- shared = p.getLeft();
- Locality l = p.getRight();
-
- // we need a colocation slot --> SimpleSlot nested in a SharedSlot to host other colocated tasks
- SharedSlot constraintGroupSlot = shared.allocateSharedSlot(constraint.getGroupId());
- // Depth=3 => groupID==null
- SimpleSlot sub = constraintGroupSlot.allocateSubSlot(null);
- sub.setLocality(l);
-
- if (l != Locality.NON_LOCAL) {
- constraint.setSharedSlot(constraintGroupSlot);
- }
- return sub;
- }
- }
- else {
- // disposed. get a new slot on the same instance
- Instance location = shared.getInstance();
- Pair<SharedSlot, Locality> p = getSlotForTaskInternal(constraint.getGroupId(), vertex, Collections.singleton(location), true);
-
- if (p == null) {
- return null;
- } else {
- shared = p.getLeft();
- // we need colocation slot --> SimpleSlot nested in a SharedSlot to host other colocated tasks
- SharedSlot constraintGroupSlot = shared.allocateSharedSlot(constraint.getGroupId());
- constraint.setSharedSlot(constraintGroupSlot);
- SimpleSlot subSlot = constraintGroupSlot.allocateSubSlot(null);
- subSlot.setLocality(Locality.LOCAL);
- return subSlot;
- }
- }
- }
- }
-
- /**
- * NOTE: This method is not synchronized by itself, needs to be synchronized externally.
- *
- * @return An allocated sub slot, or {@code null}, if no slot is available.
- */
- private Pair<SharedSlot, Locality> getSlotForTaskInternal(AbstractID groupId, ExecutionVertex vertex, Iterable<Instance> preferredLocations, boolean localOnly) {
- Map<Instance, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId);
-
- if (allSlots.isEmpty()) {
- return null;
- }
-
- // get the available slots for the group
- if (slotsForGroup == null) {
- // no task is yet scheduled for that group, so all slots are available
- slotsForGroup = new LinkedHashMap<Instance, List<SharedSlot>>();
- availableSlotsPerJid.put(groupId, slotsForGroup);
-
- for (SharedSlot availableSlot : allSlots) {
- putIntoMultiMap(slotsForGroup, availableSlot.getInstance(), availableSlot);
- }
- }
- else if (slotsForGroup.isEmpty()) {
- return null;
- }
-
- // check whether we can schedule the task to a preferred location
- boolean didNotGetPreferred = false;
-
- if (preferredLocations != null) {
- for (Instance location : preferredLocations) {
-
- // set the flag that we failed a preferred location. If one will be found,
- // we return early anyways and skip the flag evaluation
- didNotGetPreferred = true;
-
- SharedSlot slot = removeFromMultiMap(slotsForGroup, location);
- if (slot != null && !slot.isDead()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Local assignment in shared group : " + vertex + " --> " + slot);
- }
-
- return new ImmutablePair<SharedSlot, Locality>(slot, Locality.LOCAL);
- }
- }
- }
-
- // if we want only local assignments, exit now with a "not found" result
- if (didNotGetPreferred && localOnly) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No local assignment in shared possible for " + vertex);
- }
- return null;
- }
-
- // schedule the task to any available location
- SharedSlot slot = pollFromMultiMap(slotsForGroup);
- if (slot != null && !slot.isDead()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug((didNotGetPreferred ? "Non-local" : "Unconstrained") + " assignment in shared group : " + vertex + " --> " + slot);
- }
-
- return new ImmutablePair<SharedSlot, Locality>(slot, didNotGetPreferred ? Locality.NON_LOCAL : Locality.UNCONSTRAINED);
- }
- else {
- return null;
- }
- }
-
- /**
- * Removes the shared slot from the assignment group.
- *
- * @param sharedSlot
- */
- private void removeSharedSlot(SharedSlot sharedSlot){
- if (!allSlots.contains(sharedSlot)) {
- throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup before.");
- }
-
- allSlots.remove(sharedSlot);
-
- Instance location = sharedSlot.getInstance();
-
- for(Map.Entry<AbstractID, Map<Instance, List<SharedSlot>>> mapEntry: availableSlotsPerJid.entrySet()){
- Map<Instance, List<SharedSlot>> map = mapEntry.getValue();
-
- List<SharedSlot> list = map.get(location);
-
- if(list == null || !list.remove(sharedSlot)){
- throw new IllegalStateException("Bug: SharedSlot was not available to another vertex type that it was not allocated for before.");
- }
-
- if(list.isEmpty()){
- map.remove(location);
- }
- }
-
- sharedSlot.markCancelled();
-
- returnAllocatedSlot(sharedSlot);
- }
-
- /**
- * Releases the shared slot from the assignment group.
- * @param sharedSlot The SharedSlot to be released
- */
- public void releaseSharedSlot(SharedSlot sharedSlot){
- synchronized (lock) {
- Set<Slot> subSlots = sharedSlot.getSubSlots();
-
- for(Slot subSlot: subSlots) {
-
- subSlot.markDisposed();
-
- if(subSlot instanceof SharedSlot){
- releaseSharedSlot((SharedSlot) subSlot);
- }else if(subSlot instanceof SimpleSlot){
- releaseSimpleSlot((SimpleSlot) subSlot);
- }
- }
-
- subSlots.clear();
-
- returnSlot(sharedSlot);
- }
- }
-
- /**
- * Releases the simple slot from the assignment group.
- * @param simpleSlot The SimpleSlot to be released
- */
- public void releaseSimpleSlot(SimpleSlot simpleSlot){
- synchronized (lock) {
- simpleSlot.cancel();
-
- returnSlot(simpleSlot);
- }
-
- }
-
- /**
- * Removes the given slot from the assignment group. If the slot is a root object, then it has
- * to be a SharedSlot and it is removed from the availableSlotsPerJid field and the slot is
- * returned to the instance. If the slot is a sub slot of the root slot, then this sub slot
- * is marked available again for tasks of the same group. Otherwise, the slot is simply removed
- * from its parent if it is not already marked as disposed. If a slot is already marked to be
- * disposed, then the releasing was called from a parent slot which will take care of the
- * disposal.
- *
- * IMPORTANT: The method is not synchronized. The caller is responsible for that.
- *
- * @param slot The slot to be returned.
- */
- private void returnSlot(Slot slot){
- // each slot can only be returned once, if a slot is returned then it should no longer be used --> markDead
- if(slot.markDead()) {
- // slot is a root slot
- if(slot.getParent() == null){
- // only SharedSlots are allowed to be root slots in a SlotSharingGroupAssignment
- if(slot instanceof SharedSlot){
- removeSharedSlot((SharedSlot) slot);
- } else {
- throw new IllegalStateException("Simple slot cannot be returned from SlotSharingGroupAssignment.");
- }
- } else {
- AbstractID groupID = slot.getGroupID();
- SharedSlot parent = slot.getParent();
-
- // Only colocation constraint slots (SimpleSlot nested in a SharedSlot nested in a SharedSlot) have a groupID==null
- // One can also say, all nested slots deeper than 2 have a groupID==null
- if(groupID != null){
- if (!allSlots.contains(parent)) {
- throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup before.");
- }
-
- // make the shared slot available to tasks within the group it available to
- Map<Instance, List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(groupID);
-
- // sanity check
- if (slotsForJid == null) {
- throw new IllegalStateException("Trying to return a slot for group " + groupID +
- " when available slots indicated that all slots were available.");
- }
-
- putIntoMultiMap(slotsForJid, parent.getInstance(), parent);
- }
-
- // if no one else takes care of disposal, then remove the slot from the parent
- if(slot.markDisposed()) {
- if (slot.getParent().freeSubSlot(slot) == 0) {
- releaseSharedSlot(slot.getParent());
- }
- }
- }
- }
- }
-
- private void returnAllocatedSlot(SharedSlot slot){
- slot.getInstance().returnAllocatedSlot(slot);
- }
-
- // --------------------------------------------------------------------------------------------
- // State
- // --------------------------------------------------------------------------------------------
-
- public int getNumberOfSlots() {
- return allSlots.size();
- }
-
- public int getNumberOfAvailableSlotsForJid(JobVertexID jid) {
- synchronized (lock) {
- Map<Instance, List<SharedSlot>> available = availableSlotsPerJid.get(jid);
-
- if (available != null) {
- Set<SharedSlot> set = new HashSet<SharedSlot>();
-
- for (List<SharedSlot> list : available.values()) {
- for (SharedSlot slot : list) {
- set.add(slot);
- }
- }
-
- return set.size();
- } else {
- return allSlots.size();
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
-
-
- // --------------------------------------------------------------------------------------------
- // Utilities
- // --------------------------------------------------------------------------------------------
-
- private static final void putIntoMultiMap(Map<Instance, List<SharedSlot>> map, Instance location, SharedSlot slot) {
- List<SharedSlot> slotsForInstance = map.get(location);
- if (slotsForInstance == null) {
- slotsForInstance = new ArrayList<SharedSlot>();
- map.put(location, slotsForInstance);
- }
- slotsForInstance.add(slot);
- }
-
- private static final SharedSlot removeFromMultiMap(Map<Instance, List<SharedSlot>> map, Instance location) {
- List<SharedSlot> slotsForLocation = map.get(location);
-
- if (slotsForLocation == null) {
- return null;
- }
- else {
- SharedSlot slot = slotsForLocation.remove(slotsForLocation.size() - 1);
- if (slotsForLocation.isEmpty()) {
- map.remove(location);
- }
-
- return slot;
- }
- }
-
- private static final SharedSlot pollFromMultiMap(Map<Instance, List<SharedSlot>> map) {
- Iterator<Map.Entry<Instance, List<SharedSlot>>> iter = map.entrySet().iterator();
-
- while (iter.hasNext()) {
- List<SharedSlot> slots = iter.next().getValue();
-
- if (slots.isEmpty()) {
- iter.remove();
- }
- else if (slots.size() == 1) {
- SharedSlot slot = slots.remove(0);
- iter.remove();
- return slot;
- }
- else {
- return slots.remove(slots.size() - 1);
- }
- }
-
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/017b4f0f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 42c3f84..a53c318 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -348,4 +348,4 @@ public class ExecutionGraphDeploymentTest {
throw new Exception();
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/017b4f0f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 1e78ac5..733ad11 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -60,31 +60,31 @@ public class ExecutionStateProgressTest {
try {
final JobID jid = new JobID();
final JobVertexID vid = new JobVertexID();
-
+
AbstractJobVertex ajv = new AbstractJobVertex("TestVertex", vid);
ajv.setParallelism(3);
ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
-
+
ExecutionGraph graph = new ExecutionGraph(jid, "test job", new Configuration(),
AkkaUtils.getDefaultTimeout());
graph.attachJobGraph(Arrays.asList(ajv));
-
+
setGraphStatus(graph, JobStatus.RUNNING);
-
+
ExecutionJobVertex ejv = graph.getJobVertex(vid);
-
+
// mock resources and mock taskmanager
ActorRef taskManager = system.actorOf(Props.create(SimpleAcknowledgingTaskManager.class));
for (ExecutionVertex ee : ejv.getTaskVertices()) {
SimpleSlot slot = getInstance(taskManager).allocateSimpleSlot(jid);
ee.deployToSlot(slot);
}
-
+
// finish all
for (ExecutionVertex ee : ejv.getTaskVertices()) {
ee.executionFinished();
}
-
+
assertTrue(ejv.isInFinalState());
assertEquals(JobStatus.FINISHED, graph.getState());
}
@@ -93,4 +93,4 @@ public class ExecutionStateProgressTest {
fail(e.getMessage());
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/017b4f0f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 757976d..eb0aab4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -45,13 +45,14 @@ import org.apache.flink.runtime.messages.Messages;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@SuppressWarnings("serial")
public class ExecutionVertexCancelTest {
-
+
private static ActorSystem system;
@BeforeClass
@@ -68,24 +69,24 @@ public class ExecutionVertexCancelTest {
// --------------------------------------------------------------------------------------------
// Canceling in different states
// --------------------------------------------------------------------------------------------
-
+
@Test
public void testCancelFromCreated() {
try {
final JobVertexID jid = new JobVertexID();
final ExecutionJobVertex ejv = getExecutionVertex(jid);
-
+
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
-
+
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
-
+
vertex.cancel();
-
+
assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
-
+
assertNull(vertex.getFailureCause());
-
+
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
@@ -95,25 +96,25 @@ public class ExecutionVertexCancelTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testCancelFromScheduled() {
try {
final JobVertexID jid = new JobVertexID();
final ExecutionJobVertex ejv = getExecutionVertex(jid);
-
+
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
-
+
setVertexState(vertex, ExecutionState.SCHEDULED);
assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
-
+
vertex.cancel();
-
+
assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
-
+
assertNull(vertex.getFailureCause());
-
+
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
@@ -123,7 +124,7 @@ public class ExecutionVertexCancelTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testCancelConcurrentlyToDeploying_CallsNotOvertaking() {
new JavaTestKit(system){{
@@ -148,7 +149,7 @@ public class ExecutionVertexCancelTest {
new TaskOperationResult(execId, false))));
Instance instance = getInstance(taskManager);
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
vertex.deployToSlot(slot);
@@ -187,15 +188,17 @@ public class ExecutionVertexCancelTest {
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
- }catch(Exception e){
+ }
+ catch(Exception e) {
e.printStackTrace();
fail(e.getMessage());
- }finally{
+ }
+ finally {
TestingUtils.setGlobalExecutionContext();
}
}};
}
-
+
@Test
public void testCancelConcurrentlyToDeploying_CallsOvertaking() {
new JavaTestKit(system){
@@ -272,7 +275,7 @@ public class ExecutionVertexCancelTest {
}
};
}
-
+
@Test
public void testCancelFromRunning() {
new JavaTestKit(system) {
@@ -291,7 +294,7 @@ public class ExecutionVertexCancelTest {
TaskOperationResult(execId, true))));
Instance instance = getInstance(taskManager);
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
setVertexState(vertex, ExecutionState.RUNNING);
setVertexResource(vertex, slot);
@@ -319,7 +322,7 @@ public class ExecutionVertexCancelTest {
}
};
}
-
+
@Test
public void testRepeatedCancelFromRunning() {
new JavaTestKit(system) {
@@ -339,7 +342,7 @@ public class ExecutionVertexCancelTest {
TaskOperationResult(execId, true))));
Instance instance = getInstance(taskManager);
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
setVertexState(vertex, ExecutionState.RUNNING);
setVertexResource(vertex, slot);
@@ -375,7 +378,7 @@ public class ExecutionVertexCancelTest {
}
};
}
-
+
@Test
public void testCancelFromRunningDidNotFindTask() {
// this may happen when the task finished or failed while the call was in progress
@@ -395,7 +398,7 @@ public class ExecutionVertexCancelTest {
TaskOperationResult(execId, false))));
Instance instance = getInstance(taskManager);
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
setVertexState(vertex, ExecutionState.RUNNING);
setVertexResource(vertex, slot);
@@ -419,7 +422,7 @@ public class ExecutionVertexCancelTest {
}
};
}
-
+
@Test
public void testCancelCallFails() {
new JavaTestKit(system) {
@@ -436,7 +439,7 @@ public class ExecutionVertexCancelTest {
CancelSequenceTaskManagerCreator()));
Instance instance = getInstance(taskManager);
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
setVertexState(vertex, ExecutionState.RUNNING);
setVertexResource(vertex, slot);
@@ -463,7 +466,7 @@ public class ExecutionVertexCancelTest {
}
};
}
-
+
@Test
public void testSendCancelAndReceiveFail() {
new JavaTestKit(system) {
@@ -482,7 +485,7 @@ public class ExecutionVertexCancelTest {
)));
Instance instance = getInstance(taskManager);
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
setVertexState(vertex, ExecutionState.RUNNING);
setVertexResource(vertex, slot);
@@ -507,11 +510,11 @@ public class ExecutionVertexCancelTest {
}
};
}
-
+
// --------------------------------------------------------------------------------------------
// Actions after a vertex has been canceled or while canceling
// --------------------------------------------------------------------------------------------
-
+
@Test
public void testScheduleOrDeployAfterCancel() {
try {
@@ -521,26 +524,26 @@ public class ExecutionVertexCancelTest {
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
setVertexState(vertex, ExecutionState.CANCELED);
-
+
assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
-
+
// 1)
// scheduling after being created should be tolerated (no exception) because
// it can occur as the result of races
{
Scheduler scheduler = mock(Scheduler.class);
vertex.scheduleForExecution(scheduler, false);
-
+
assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
}
-
+
// 2)
// deploying after canceling from CREATED needs to raise an exception, because
// the scheduler (or any caller) needs to know that the slot should be released
try {
Instance instance = getInstance(ActorRef.noSender());
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-
+
vertex.deployToSlot(slot);
fail("Method should throw an exception");
}
@@ -553,60 +556,62 @@ public class ExecutionVertexCancelTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testActionsWhileCancelling() {
-
+
try {
final JobVertexID jid = new JobVertexID();
final ExecutionJobVertex ejv = getExecutionVertex(jid);
-
+
// scheduling while canceling is an illegal state transition
try {
ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
setVertexState(vertex, ExecutionState.CANCELING);
-
+
Scheduler scheduler = mock(Scheduler.class);
vertex.scheduleForExecution(scheduler, false);
}
catch (Exception e) {
fail("should not throw an exception");
}
-
-
+
+
// deploying while in canceling state is illegal (should immediately go to canceled)
try {
ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
setVertexState(vertex, ExecutionState.CANCELING);
-
+
Instance instance = getInstance(ActorRef.noSender());
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-
+
vertex.deployToSlot(slot);
fail("Method should throw an exception");
}
- catch (IllegalStateException e) {}
-
-
+ catch (IllegalStateException e) {
+ // that is what we expect
+ }
+
+
// fail while canceling
{
ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
-
+
Instance instance = getInstance(ActorRef.noSender());
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-
+
setVertexResource(vertex, slot);
setVertexState(vertex, ExecutionState.CANCELING);
-
+
Exception failureCause = new Exception("test exception");
-
+
vertex.fail(failureCause);
assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
assertEquals(failureCause, vertex.getFailureCause());
-
+
assertTrue(slot.isReleased());
}
}
@@ -651,4 +656,4 @@ public class ExecutionVertexCancelTest {
}
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/017b4f0f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index c08ae01..eadf328 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -28,19 +28,21 @@ import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
+
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class ExecutionVertexDeploymentTest {
+
private static ActorSystem system;
@BeforeClass
@@ -61,41 +63,43 @@ public class ExecutionVertexDeploymentTest {
TestingUtils.setCallingThreadDispatcher(system);
ActorRef tm = TestActorRef.create(system, Props.create(SimpleAcknowledgingTaskManager
.class));
-
+
+ final ExecutionJobVertex ejv = getExecutionVertex(jid);
+
// mock taskmanager to simply accept the call
Instance instance = getInstance(tm);
+ final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
- final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-
- final ExecutionJobVertex ejv = getExecutionVertex(jid);
-
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
-
+
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
vertex.deployToSlot(slot);
assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
-
+
// no repeated scheduling
try {
vertex.deployToSlot(slot);
fail("Scheduled from wrong state");
}
- catch (IllegalStateException e) {}
-
+ catch (IllegalStateException e) {
+ // as expected
+ }
+
assertNull(vertex.getFailureCause());
-
+
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
- }finally{
+ }
+ finally {
TestingUtils.setGlobalExecutionContext();
}
}
-
+
@Test
public void testDeployWithSynchronousAnswer() {
try {
@@ -105,31 +109,32 @@ public class ExecutionVertexDeploymentTest {
final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
Props.create(SimpleAcknowledgingTaskManager.class));
-
- final Instance instance = getInstance(simpleTaskManager);
- final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-
final ExecutionJobVertex ejv = getExecutionVertex(jid);
-
+
+ final Instance instance = getInstance(simpleTaskManager);
+ final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
-
+
vertex.deployToSlot(slot);
-
+
assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
-
+
// no repeated scheduling
try {
vertex.deployToSlot(slot);
fail("Scheduled from wrong state");
}
- catch (IllegalStateException e) {}
-
+ catch (IllegalStateException e) {
+ // as expected
+ }
+
assertNull(vertex.getFailureCause());
-
+
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.RUNNING) == 0);
@@ -137,48 +142,51 @@ public class ExecutionVertexDeploymentTest {
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
- }finally{
+ }
+ finally {
TestingUtils.setGlobalExecutionContext();
}
}
-
+
@Test
public void testDeployWithAsynchronousAnswer() {
try {
final JobVertexID jid = new JobVertexID();
+ final ExecutionJobVertex ejv = getExecutionVertex(jid);
+
+ final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+ AkkaUtils.getDefaultTimeout());
final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
Props.create(SimpleAcknowledgingTaskManager.class));
-
- final Instance instance = getInstance(simpleTaskManager);
- final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-
- final ExecutionJobVertex ejv = getExecutionVertex(jid);
-
- final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
+ final Instance instance = getInstance(simpleTaskManager);
+ final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
-
+
vertex.deployToSlot(slot);
-
+
// no repeated scheduling
try {
vertex.deployToSlot(slot);
fail("Scheduled from wrong state");
}
- catch (IllegalStateException e) {}
+ catch (IllegalStateException e) {
+ // as expected
+ }
assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
-
+
// no repeated scheduling
try {
vertex.deployToSlot(slot);
fail("Scheduled from wrong state");
}
- catch (IllegalStateException e) {}
-
+ catch (IllegalStateException e) {
+ // as expected
+ }
+
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.RUNNING) == 0);
@@ -188,33 +196,32 @@ public class ExecutionVertexDeploymentTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testDeployFailedSynchronous() {
try {
TestingUtils.setCallingThreadDispatcher(system);
final JobVertexID jid = new JobVertexID();
+ final ExecutionJobVertex ejv = getExecutionVertex(jid);
+
+ final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+ AkkaUtils.getDefaultTimeout());
final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
Props.create(SimpleFailingTaskManager.class));
-
+
final Instance instance = getInstance(simpleTaskManager);
- final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-
- final ExecutionJobVertex ejv = getExecutionVertex(jid);
-
- final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
+ final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
-
+
vertex.deployToSlot(slot);
-
+
assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
assertNotNull(vertex.getFailureCause());
assertTrue(vertex.getFailureCause().getMessage().contains(ERROR_MESSAGE));
-
+
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
@@ -222,31 +229,30 @@ public class ExecutionVertexDeploymentTest {
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
- }finally{
+ }
+ finally {
TestingUtils.setGlobalExecutionContext();
}
}
-
+
@Test
public void testDeployFailedAsynchronously() {
try {
final JobVertexID jid = new JobVertexID();
+ final ExecutionJobVertex ejv = getExecutionVertex(jid);
+ final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+ AkkaUtils.getDefaultTimeout());
final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
Props.create(SimpleFailingTaskManager.class));
-
- final Instance instance = getInstance(simpleTaskManager);
- final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-
- final ExecutionJobVertex ejv = getExecutionVertex(jid);
- final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
+ final Instance instance = getInstance(simpleTaskManager);
+ final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
-
+
vertex.deployToSlot(slot);
-
+
// wait until the state transition must be done
for (int i = 0; i < 100; i++) {
if (vertex.getExecutionState() == ExecutionState.FAILED && vertex.getFailureCause() != null) {
@@ -255,11 +261,11 @@ public class ExecutionVertexDeploymentTest {
Thread.sleep(10);
}
}
-
+
assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
assertNotNull(vertex.getFailureCause());
assertTrue(vertex.getFailureCause().getMessage().contains(ERROR_MESSAGE));
-
+
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
@@ -269,11 +275,15 @@ public class ExecutionVertexDeploymentTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testFailExternallyDuringDeploy() {
try {
final JobVertexID jid = new JobVertexID();
+ final ExecutionJobVertex ejv = getExecutionVertex(jid);
+
+ final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+ AkkaUtils.getDefaultTimeout());
final ActionQueue queue = new ActionQueue();
final TestingUtils.QueuedActionExecutionContext ec = new TestingUtils
@@ -284,12 +294,7 @@ public class ExecutionVertexDeploymentTest {
final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
Props.create(SimpleAcknowledgingTaskManager.class));
final Instance instance = getInstance(simpleTaskManager);
- final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-
- final ExecutionJobVertex ejv = getExecutionVertex(jid);
-
- final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
+ final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
vertex.deployToSlot(slot);
@@ -306,17 +311,19 @@ public class ExecutionVertexDeploymentTest {
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
- }finally{
+ }
+ finally {
TestingUtils.setGlobalExecutionContext();
}
}
-
+
@Test
public void testFailCallOvertakesDeploymentAnswer() {
-
+
try {
ActionQueue queue = new ActionQueue();
TestingUtils.QueuedActionExecutionContext context = new TestingUtils
@@ -329,6 +336,7 @@ public class ExecutionVertexDeploymentTest {
final ExecutionJobVertex ejv = getExecutionVertex(jid);
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
+
final ExecutionAttemptID eid = vertex.getCurrentExecutionAttempt().getAttemptId();
final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system, Props.create(new
@@ -336,16 +344,16 @@ public class ExecutionVertexDeploymentTest {
TaskOperationResult(eid, false), new TaskOperationResult(eid, true))));
final Instance instance = getInstance(simpleTaskManager);
- final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
vertex.deployToSlot(slot);
assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
-
+
Exception testError = new Exception("test error");
vertex.fail(testError);
-
+
assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
// cancel call overtakes deploy call
@@ -361,7 +369,7 @@ public class ExecutionVertexDeploymentTest {
assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
assertEquals(testError, vertex.getFailureCause());
-
+
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
@@ -371,8 +379,9 @@ public class ExecutionVertexDeploymentTest {
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
- }finally{
+ }
+ finally {
TestingUtils.setGlobalExecutionContext();
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/017b4f0f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 06f0e9d..1e9c30b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -28,17 +28,17 @@ import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
+
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
-
import org.apache.flink.runtime.testingUtils.TestingUtils;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -46,6 +46,7 @@ import org.junit.Test;
import org.mockito.Matchers;
public class ExecutionVertexSchedulingTest {
+
private static ActorSystem system;
@BeforeClass
@@ -58,93 +59,91 @@ public class ExecutionVertexSchedulingTest {
JavaTestKit.shutdownActorSystem(system);
system = null;
}
-
+
@Test
public void testSlotReleasedWhenScheduledImmediately() {
-
try {
- // a slot than cannot be deployed to
- final Instance instance = getInstance(ActorRef.noSender());
- final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
- slot.cancel();
- assertFalse(slot.isReleased());
-
final ExecutionJobVertex ejv = getExecutionVertex(new JobVertexID());
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
+
+ // a slot than cannot be deployed to
+ final Instance instance = getInstance(ActorRef.noSender());
+ final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+ slot.releaseSlot();
+ assertTrue(slot.isReleased());
+
Scheduler scheduler = mock(Scheduler.class);
when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(slot);
-
+
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
// try to deploy to the slot
vertex.scheduleForExecution(scheduler, false);
-
+
// will have failed
assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
- assertTrue(slot.isReleased());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
-
+
@Test
public void testSlotReleasedWhenScheduledQueued() {
-
try {
- // a slot than cannot be deployed to
- final Instance instance = getInstance(ActorRef.noSender());
- final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
- slot.cancel();
- assertFalse(slot.isReleased());
-
- final SlotAllocationFuture future = new SlotAllocationFuture();
-
final ExecutionJobVertex ejv = getExecutionVertex(new JobVertexID());
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
-
+
+ // a slot than cannot be deployed to
+ final Instance instance = getInstance(ActorRef.noSender());
+ final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+
+ slot.releaseSlot();
+ assertTrue(slot.isReleased());
+
+ final SlotAllocationFuture future = new SlotAllocationFuture();
+
Scheduler scheduler = mock(Scheduler.class);
when(scheduler.scheduleQueued(Matchers.any(ScheduledUnit.class))).thenReturn(future);
-
+
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
// try to deploy to the slot
vertex.scheduleForExecution(scheduler, true);
-
+
// future has not yet a slot
assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
-
+
future.setSlot(slot);
-
+
// will have failed
assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
- assertTrue(slot.isReleased());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
-
+
@Test
public void testScheduleToDeploying() {
try {
+ final ExecutionJobVertex ejv = getExecutionVertex(new JobVertexID());
+ final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+ AkkaUtils.getDefaultTimeout());
+
TestingUtils.setCallingThreadDispatcher(system);
ActorRef tm = TestActorRef.create(system, Props.create(ExecutionGraphTestUtils
.SimpleAcknowledgingTaskManager.class));
final Instance instance = getInstance(tm);
- final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-
- final ExecutionJobVertex ejv = getExecutionVertex(new JobVertexID());
- final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
-
+ final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+
Scheduler scheduler = mock(Scheduler.class);
when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(slot);
-
+
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
// try to deploy to the slot
@@ -158,4 +157,4 @@ public class ExecutionVertexSchedulingTest {
TestingUtils.setGlobalExecutionContext();
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/017b4f0f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index c0ed629..595ac7e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -38,57 +38,52 @@ public class InstanceTest {
HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
InetAddress address = InetAddress.getByName("127.0.0.1");
InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
-
+
Instance instance = new Instance(ActorRef.noSender(), connection, new InstanceID(), hardwareDescription, 4);
-
+
assertEquals(4, instance.getTotalNumberOfSlots());
assertEquals(4, instance.getNumberOfAvailableSlots());
assertEquals(0, instance.getNumberOfAllocatedSlots());
-
+
SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot4 = instance.allocateSimpleSlot(new JobID());
-
+
assertNotNull(slot1);
assertNotNull(slot2);
assertNotNull(slot3);
assertNotNull(slot4);
-
+
assertEquals(0, instance.getNumberOfAvailableSlots());
assertEquals(4, instance.getNumberOfAllocatedSlots());
- assertEquals(6, slot1.getSlotNumber() + slot2.getSlotNumber() +
+ assertEquals(6, slot1.getSlotNumber() + slot2.getSlotNumber() +
slot3.getSlotNumber() + slot4.getSlotNumber());
-
+
// no more slots
assertNull(instance.allocateSimpleSlot(new JobID()));
try {
instance.returnAllocatedSlot(slot2);
fail("instance accepted a non-cancelled slot.");
- } catch (IllegalArgumentException e) {
+ }
+ catch (IllegalArgumentException e) {
// good
}
-
+
// release the slots. this returns them to the instance
slot1.releaseSlot();
slot2.releaseSlot();
- assertFalse(instance.returnAllocatedSlot(slot1));
- assertFalse(instance.returnAllocatedSlot(slot2));
-
- // cancel some slots. this does not release them, yet
- slot3.cancel();
- slot4.cancel();
- assertTrue(instance.returnAllocatedSlot(slot3));
- assertTrue(instance.returnAllocatedSlot(slot4));
-
+ slot3.releaseSlot();
+ slot4.releaseSlot();
+
assertEquals(4, instance.getNumberOfAvailableSlots());
assertEquals(0, instance.getNumberOfAllocatedSlots());
-
+
assertFalse(instance.returnAllocatedSlot(slot1));
assertFalse(instance.returnAllocatedSlot(slot2));
assertFalse(instance.returnAllocatedSlot(slot3));
assertFalse(instance.returnAllocatedSlot(slot4));
-
+
assertEquals(4, instance.getNumberOfAvailableSlots());
assertEquals(0, instance.getNumberOfAllocatedSlots());
}
@@ -97,27 +92,27 @@ public class InstanceTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testInstanceDies() {
try {
HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
InetAddress address = InetAddress.getByName("127.0.0.1");
InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
-
+
Instance instance = new Instance(ActorRef.noSender(), connection, new InstanceID(), hardwareDescription, 3);
-
+
assertEquals(3, instance.getNumberOfAvailableSlots());
-
+
SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
-
+
instance.markDead();
-
+
assertEquals(0, instance.getNumberOfAllocatedSlots());
assertEquals(0, instance.getNumberOfAvailableSlots());
-
+
assertTrue(slot1.isCanceled());
assertTrue(slot2.isCanceled());
assertTrue(slot3.isCanceled());
@@ -127,26 +122,26 @@ public class InstanceTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testCancelAllSlots() {
try {
HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
InetAddress address = InetAddress.getByName("127.0.0.1");
InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
-
+
Instance instance = new Instance(ActorRef.noSender(), connection, new InstanceID(), hardwareDescription, 3);
-
+
assertEquals(3, instance.getNumberOfAvailableSlots());
-
+
SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
-
+
instance.cancelAndReleaseAllSlots();
-
+
assertEquals(3, instance.getNumberOfAvailableSlots());
-
+
assertTrue(slot1.isCanceled());
assertTrue(slot2.isCanceled());
assertTrue(slot3.isCanceled());
@@ -156,7 +151,7 @@ public class InstanceTest {
fail(e.getMessage());
}
}
-
+
/**
* It is crucial for some portions of the code that instance objects do not override equals and
* are only considered equal, if the references are equal.
@@ -172,4 +167,4 @@ public class InstanceTest {
fail(e.getMessage());
}
}
-}
+}
\ No newline at end of file