You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2016/12/13 17:58:55 UTC
[1/2] hadoop git commit: YARN-2009. CapacityScheduler: Add
intra-queue preemption for app priority support. (Sunil G via wangda)
Repository: hadoop
Updated Branches:
refs/heads/branch-2.8 01b50b36b -> 6a18ae849
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index bf19f5a..4ff0bf3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -52,6 +52,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -91,6 +92,9 @@ public class ProportionalCapacityPreemptionPolicy
private boolean observeOnly;
private boolean lazyPreempionEnabled;
+ private float maxAllowableLimitForIntraQueuePreemption;
+ private float minimumThresholdForIntraQueuePreemption;
+
// Pointer to other RM components
private RMContext rmContext;
private ResourceCalculator rc;
@@ -102,6 +106,8 @@ public class ProportionalCapacityPreemptionPolicy
new HashMap<>();
private Map<String, Map<String, TempQueuePerPartition>> queueToPartitions =
new HashMap<>();
+ private Map<String, LinkedHashSet<String>> partitionToUnderServedQueues =
+ new HashMap<String, LinkedHashSet<String>>();
private List<PreemptionCandidatesSelector>
candidatesSelectionPolicies = new ArrayList<>();
private Set<String> allPartitions;
@@ -171,23 +177,44 @@ public class ProportionalCapacityPreemptionPolicy
CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
CapacitySchedulerConfiguration.DEFAULT_LAZY_PREEMPTION_ENABLED);
+ maxAllowableLimitForIntraQueuePreemption = csConfig.getFloat(
+ CapacitySchedulerConfiguration.
+ INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+ CapacitySchedulerConfiguration.
+ DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT);
+
+ minimumThresholdForIntraQueuePreemption = csConfig.getFloat(
+ CapacitySchedulerConfiguration.
+ INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD,
+ CapacitySchedulerConfiguration.
+ DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD);
+
rc = scheduler.getResourceCalculator();
nlm = scheduler.getRMContext().getNodeLabelManager();
// Do we need to specially consider reserved containers?
boolean selectCandidatesForResevedContainers = csConfig.getBoolean(
- CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS,
- CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS);
+ CapacitySchedulerConfiguration.
+ PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS,
+ CapacitySchedulerConfiguration.
+ DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS);
if (selectCandidatesForResevedContainers) {
- candidatesSelectionPolicies.add(
- new ReservedContainerCandidatesSelector(this));
+ candidatesSelectionPolicies
+ .add(new ReservedContainerCandidatesSelector(this));
}
// initialize candidates preemption selection policies
- candidatesSelectionPolicies.add(
- new FifoCandidatesSelector(this));
+ candidatesSelectionPolicies.add(new FifoCandidatesSelector(this));
+
+ // Do we need to specially consider intra queue
+ boolean isIntraQueuePreemptionEnabled = csConfig.getBoolean(
+ CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED,
+ CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED);
+ if (isIntraQueuePreemptionEnabled) {
+ candidatesSelectionPolicies.add(new IntraQueueCandidatesSelector(this));
+ }
}
-
+
@Override
public ResourceCalculator getResourceCalculator() {
return rc;
@@ -209,6 +236,12 @@ public class ProportionalCapacityPreemptionPolicy
@SuppressWarnings("unchecked")
private void preemptOrkillSelectedContainerAfterWait(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Starting to preempt containers for selectedCandidates and size:"
+ + selectedCandidates.size());
+ }
+
// preempt (or kill) the selected containers
for (Map.Entry<ApplicationAttemptId, Set<RMContainer>> e : selectedCandidates
.entrySet()) {
@@ -233,6 +266,7 @@ public class ProportionalCapacityPreemptionPolicy
// not have to raise another event.
continue;
}
+
//otherwise just send preemption events
rmContext.getDispatcher().getEventHandler().handle(
new ContainerPreemptEvent(appAttemptId, container,
@@ -291,7 +325,6 @@ public class ProportionalCapacityPreemptionPolicy
* @param root the root of the CapacityScheduler queue hierarchy
* @param clusterResources the total amount of resources in the cluster
*/
- @SuppressWarnings("unchecked")
private void containerBasedPreemptOrKill(CSQueue root,
Resource clusterResources) {
// Sync killable containers from scheduler when lazy preemption enabled
@@ -537,4 +570,41 @@ public class ProportionalCapacityPreemptionPolicy
Map<String, Map<String, TempQueuePerPartition>> getQueuePartitions() {
return queueToPartitions;
}
+
+ @Override
+ public int getClusterMaxApplicationPriority() {
+ return scheduler.getMaxClusterLevelAppPriority().getPriority();
+ }
+
+ @Override
+ public float getMaxAllowableLimitForIntraQueuePreemption() {
+ return maxAllowableLimitForIntraQueuePreemption;
+ }
+
+ @Override
+ public float getMinimumThresholdForIntraQueuePreemption() {
+ return minimumThresholdForIntraQueuePreemption;
+ }
+
+ @Override
+ public Resource getPartitionResource(String partition) {
+ return Resources.clone(nlm.getResourceByLabel(partition,
+ Resources.clone(scheduler.getClusterResource())));
+ }
+
+ public LinkedHashSet<String> getUnderServedQueuesPerPartition(
+ String partition) {
+ return partitionToUnderServedQueues.get(partition);
+ }
+
+ public void addPartitionToUnderServedQueues(String queueName,
+ String partition) {
+ LinkedHashSet<String> underServedQueues = partitionToUnderServedQueues
+ .get(partition);
+ if (null == underServedQueues) {
+ underServedQueues = new LinkedHashSet<String>();
+ partitionToUnderServedQueues.put(partition, underServedQueues);
+ }
+ underServedQueues.add(queueName);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java
new file mode 100644
index 0000000..fccd2a7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+
+/**
+ * Temporary data-structure tracking resource availability, pending resource
+ * need, current utilization for an application.
+ */
+public class TempAppPerPartition extends AbstractPreemptionEntity {
+
+ // Following fields are settled and used by candidate selection policies
+ private final int priority;
+ private final ApplicationId applicationId;
+
+ FiCaSchedulerApp app;
+
+ TempAppPerPartition(FiCaSchedulerApp app, Resource usedPerPartition,
+ Resource amUsedPerPartition, Resource reserved,
+ Resource pendingPerPartition) {
+ super(app.getQueueName(), usedPerPartition, amUsedPerPartition, reserved,
+ pendingPerPartition);
+
+ this.priority = app.getPriority().getPriority();
+ this.applicationId = app.getApplicationId();
+ this.app = app;
+ }
+
+ public FiCaSchedulerApp getFiCaSchedulerApp() {
+ return app;
+ }
+
+ public void assignPreemption(Resource killable) {
+ Resources.addTo(toBePreempted, killable);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(" NAME: " + getApplicationId()).append(" PRIO: ").append(priority)
+ .append(" CUR: ").append(getUsed()).append(" PEN: ").append(pending)
+ .append(" RESERVED: ").append(reserved).append(" IDEAL_ASSIGNED: ")
+ .append(idealAssigned).append(" PREEMPT_OTHER: ")
+ .append(getToBePreemptFromOther()).append(" IDEAL_PREEMPT: ")
+ .append(toBePreempted).append(" ACTUAL_PREEMPT: ")
+ .append(getActuallyToBePreempted()).append("\n");
+
+ return sb.toString();
+ }
+
+ void appendLogString(StringBuilder sb) {
+ sb.append(queueName).append(", ").append(getUsed().getMemorySize())
+ .append(", ").append(getUsed().getVirtualCores()).append(", ")
+ .append(pending.getMemorySize()).append(", ")
+ .append(pending.getVirtualCores()).append(", ")
+ .append(idealAssigned.getMemorySize()).append(", ")
+ .append(idealAssigned.getVirtualCores()).append(", ")
+ .append(toBePreempted.getMemorySize()).append(", ")
+ .append(toBePreempted.getVirtualCores()).append(", ")
+ .append(getActuallyToBePreempted().getMemorySize()).append(", ")
+ .append(getActuallyToBePreempted().getVirtualCores());
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public ApplicationId getApplicationId() {
+ return applicationId;
+ }
+
+ public void deductActuallyToBePreempted(ResourceCalculator resourceCalculator,
+ Resource cluster, Resource toBeDeduct, String partition) {
+ if (Resources.greaterThan(resourceCalculator, cluster,
+ getActuallyToBePreempted(), toBeDeduct)) {
+ Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
index 04ed135..28099c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -25,34 +25,29 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
+import java.util.Collection;
/**
* Temporary data-structure tracking resource availability, pending resource
* need, current utilization. This is per-queue-per-partition data structure
*/
-public class TempQueuePerPartition {
+public class TempQueuePerPartition extends AbstractPreemptionEntity {
// Following fields are copied from scheduler
- final String queueName;
final String partition;
- final Resource pending;
- private final Resource current;
private final Resource killable;
- private final Resource reserved;
private final float absCapacity;
private final float absMaxCapacity;
final Resource totalPartitionResource;
- // Following fields are setted and used by candidate selection policies
- Resource idealAssigned;
- Resource toBePreempted;
+ // Following fields are settled and used by candidate selection policies
Resource untouchableExtra;
Resource preemptableExtra;
- private Resource actuallyToBePreempted;
double normalizedGuarantee;
final ArrayList<TempQueuePerPartition> children;
+ private Collection<TempAppPerPartition> apps;
LeafQueue leafQueue;
boolean preemptionDisabled;
@@ -60,8 +55,8 @@ public class TempQueuePerPartition {
boolean preemptionDisabled, String partition, Resource killable,
float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
Resource reserved, CSQueue queue) {
- this.queueName = queueName;
- this.current = current;
+ super(queueName, current, Resource.newInstance(0, 0), reserved,
+ Resource.newInstance(0, 0));
if (queue instanceof LeafQueue) {
LeafQueue l = (LeafQueue) queue;
@@ -72,11 +67,9 @@ public class TempQueuePerPartition {
pending = Resources.createResource(0);
}
- this.idealAssigned = Resource.newInstance(0, 0);
- this.actuallyToBePreempted = Resource.newInstance(0, 0);
- this.toBePreempted = Resource.newInstance(0, 0);
this.normalizedGuarantee = Float.NaN;
this.children = new ArrayList<>();
+ this.apps = new ArrayList<>();
this.untouchableExtra = Resource.newInstance(0, 0);
this.preemptableExtra = Resource.newInstance(0, 0);
this.preemptionDisabled = preemptionDisabled;
@@ -85,7 +78,6 @@ public class TempQueuePerPartition {
this.absCapacity = absCapacity;
this.absMaxCapacity = absMaxCapacity;
this.totalPartitionResource = totalPartitionResource;
- this.reserved = reserved;
}
public void setLeafQueue(LeafQueue l) {
@@ -95,7 +87,9 @@ public class TempQueuePerPartition {
/**
* When adding a child we also aggregate its pending resource needs.
- * @param q the child queue to add to this queue
+ *
+ * @param q
+ * the child queue to add to this queue
*/
public void addChild(TempQueuePerPartition q) {
assert leafQueue == null;
@@ -103,14 +97,10 @@ public class TempQueuePerPartition {
Resources.addTo(pending, q.pending);
}
- public ArrayList<TempQueuePerPartition> getChildren(){
+ public ArrayList<TempQueuePerPartition> getChildren() {
return children;
}
- public Resource getUsed() {
- return current;
- }
-
public Resource getUsedDeductReservd() {
return Resources.subtract(current, reserved);
}
@@ -122,28 +112,30 @@ public class TempQueuePerPartition {
Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(
Resources.subtract(getMax(), idealAssigned),
Resource.newInstance(0, 0));
- // remain = avail - min(avail, (max - assigned), (current + pending - assigned))
+ // remain = avail - min(avail, (max - assigned), (current + pending -
+ // assigned))
Resource accepted = Resources.min(rc, clusterResource,
- absMaxCapIdealAssignedDelta, Resources.min(rc, clusterResource, avail,
- Resources
- /*
- * When we're using FifoPreemptionSelector
- * (considerReservedResource = false).
- *
- * We should deduct reserved resource to avoid excessive preemption:
- *
- * For example, if an under-utilized queue has used = reserved = 20.
- * Preemption policy will try to preempt 20 containers
- * (which is not satisfied) from different hosts.
- *
- * In FifoPreemptionSelector, there's no guarantee that preempted
- * resource can be used by pending request, so policy will preempt
- * resources repeatly.
- */
- .subtract(Resources.add(
- (considersReservedResource ? getUsed() :
- getUsedDeductReservd()),
- pending), idealAssigned)));
+ absMaxCapIdealAssignedDelta,
+ Resources.min(rc, clusterResource, avail, Resources
+ /*
+ * When we're using FifoPreemptionSelector (considerReservedResource
+ * = false).
+ *
+ * We should deduct reserved resource to avoid excessive preemption:
+ *
+ * For example, if an under-utilized queue has used = reserved = 20.
+ * Preemption policy will try to preempt 20 containers (which is not
+ * satisfied) from different hosts.
+ *
+ * In FifoPreemptionSelector, there's no guarantee that preempted
+ * resource can be used by pending request, so policy will preempt
+ * resources repeatly.
+ */
+ .subtract(
+ Resources.add((considersReservedResource
+ ? getUsed()
+ : getUsedDeductReservd()), pending),
+ idealAssigned)));
Resource remain = Resources.subtract(avail, accepted);
Resources.addTo(idealAssigned, accepted);
return remain;
@@ -162,8 +154,7 @@ public class TempQueuePerPartition {
untouchableExtra = Resources.none();
preemptableExtra = Resources.none();
- Resource extra = Resources.subtract(getUsed(),
- getGuaranteed());
+ Resource extra = Resources.subtract(getUsed(), getGuaranteed());
if (Resources.lessThan(rc, totalPartitionResource, extra,
Resources.none())) {
extra = Resources.none();
@@ -197,26 +188,21 @@ public class TempQueuePerPartition {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append(" NAME: " + queueName)
- .append(" CUR: ").append(current)
- .append(" PEN: ").append(pending)
- .append(" RESERVED: ").append(reserved)
- .append(" GAR: ").append(getGuaranteed())
- .append(" NORM: ").append(normalizedGuarantee)
- .append(" IDEAL_ASSIGNED: ").append(idealAssigned)
- .append(" IDEAL_PREEMPT: ").append(toBePreempted)
- .append(" ACTUAL_PREEMPT: ").append(actuallyToBePreempted)
+ sb.append(" NAME: " + queueName).append(" CUR: ").append(current)
+ .append(" PEN: ").append(pending).append(" RESERVED: ").append(reserved)
+ .append(" GAR: ").append(getGuaranteed()).append(" NORM: ")
+ .append(normalizedGuarantee).append(" IDEAL_ASSIGNED: ")
+ .append(idealAssigned).append(" IDEAL_PREEMPT: ").append(toBePreempted)
+ .append(" ACTUAL_PREEMPT: ").append(getActuallyToBePreempted())
.append(" UNTOUCHABLE: ").append(untouchableExtra)
- .append(" PREEMPTABLE: ").append(preemptableExtra)
- .append("\n");
+ .append(" PREEMPTABLE: ").append(preemptableExtra).append("\n");
return sb.toString();
}
public void assignPreemption(float scalingFactor, ResourceCalculator rc,
Resource clusterResource) {
- Resource usedDeductKillable = Resources.subtract(
- getUsed(), killable);
+ Resource usedDeductKillable = Resources.subtract(getUsed(), killable);
Resource totalResource = Resources.add(getUsed(), pending);
// The minimum resource that we need to keep for a queue is:
@@ -224,7 +210,8 @@ public class TempQueuePerPartition {
//
// Doing this because when we calculate ideal allocation doesn't consider
// reserved resource, ideal-allocation calculated could be less than
- // guaranteed and total. We should avoid preempt from a queue if it is already
+ // guaranteed and total. We should avoid preempt from a queue if it is
+ // already
// <= its guaranteed resource.
Resource minimumQueueResource = Resources.max(rc, clusterResource,
Resources.min(rc, clusterResource, totalResource, getGuaranteed()),
@@ -233,33 +220,26 @@ public class TempQueuePerPartition {
if (Resources.greaterThan(rc, clusterResource, usedDeductKillable,
minimumQueueResource)) {
toBePreempted = Resources.multiply(
- Resources.subtract(usedDeductKillable, minimumQueueResource), scalingFactor);
+ Resources.subtract(usedDeductKillable, minimumQueueResource),
+ scalingFactor);
} else {
toBePreempted = Resources.none();
}
}
- public Resource getActuallyToBePreempted() {
- return actuallyToBePreempted;
- }
-
- public void setActuallyToBePreempted(Resource res) {
- this.actuallyToBePreempted = res;
- }
-
public void deductActuallyToBePreempted(ResourceCalculator rc,
Resource cluster, Resource toBeDeduct) {
- if (Resources.greaterThan(rc, cluster, actuallyToBePreempted, toBeDeduct)) {
- Resources.subtractFrom(actuallyToBePreempted, toBeDeduct);
+ if (Resources.greaterThan(rc, cluster, getActuallyToBePreempted(),
+ toBeDeduct)) {
+ Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct);
}
- actuallyToBePreempted = Resources.max(rc, cluster, actuallyToBePreempted,
- Resources.none());
+ setActuallyToBePreempted(Resources.max(rc, cluster,
+ getActuallyToBePreempted(), Resources.none()));
}
void appendLogString(StringBuilder sb) {
- sb.append(queueName).append(", ")
- .append(current.getMemorySize()).append(", ")
- .append(current.getVirtualCores()).append(", ")
+ sb.append(queueName).append(", ").append(current.getMemorySize())
+ .append(", ").append(current.getVirtualCores()).append(", ")
.append(pending.getMemorySize()).append(", ")
.append(pending.getVirtualCores()).append(", ")
.append(getGuaranteed().getMemorySize()).append(", ")
@@ -267,9 +247,17 @@ public class TempQueuePerPartition {
.append(idealAssigned.getMemorySize()).append(", ")
.append(idealAssigned.getVirtualCores()).append(", ")
.append(toBePreempted.getMemorySize()).append(", ")
- .append(toBePreempted.getVirtualCores() ).append(", ")
- .append(actuallyToBePreempted.getMemorySize()).append(", ")
- .append(actuallyToBePreempted.getVirtualCores());
+ .append(toBePreempted.getVirtualCores()).append(", ")
+ .append(getActuallyToBePreempted().getMemorySize()).append(", ")
+ .append(getActuallyToBePreempted().getVirtualCores());
+ }
+
+ public void addAllApps(Collection<TempAppPerPartition> orderedApps) {
+ this.apps = orderedApps;
+ }
+
+ public Collection<TempAppPerPartition> getApps() {
+ return apps;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 6db5074..cea5aa4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -1045,6 +1045,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
private static final String PREEMPTION_CONFIG_PREFIX =
"yarn.resourcemanager.monitor.capacity.preemption.";
+ private static final String INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX =
+ "intra-queue-preemption.";
+
/** If true, run the policy but do not affect the cluster with preemption and
* kill events. */
public static final String PREEMPTION_OBSERVE_ONLY =
@@ -1098,4 +1101,32 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
PREEMPTION_CONFIG_PREFIX + "select_based_on_reserved_containers";
public static final boolean DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS =
false;
+
+ /**
+ * For intra-queue preemption, priority/user-limit/fairness based selectors
+ * can help to preempt containers.
+ */
+ public static final String INTRAQUEUE_PREEMPTION_ENABLED =
+ PREEMPTION_CONFIG_PREFIX +
+ INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "enabled";
+ public static final boolean DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED = false;
+
+ /**
+ * For intra-queue preemption, consider those queues which are above used cap
+ * limit.
+ */
+ public static final String INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD =
+ PREEMPTION_CONFIG_PREFIX +
+ INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "minimum-threshold";
+ public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD =
+ 0.5f;
+
+ /**
+ * For intra-queue preemption, allowable maximum-preemptable limit per queue.
+ */
+ public static final String INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT =
+ PREEMPTION_CONFIG_PREFIX +
+ INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "max-allowable-limit";
+ public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT =
+ 0.2f;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 20378bb..eef43c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -73,6 +73,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -1029,8 +1030,9 @@ public class LeafQueue extends AbstractCSQueue {
Resource clusterResource, FiCaSchedulerApp application,
String partition) {
return getHeadroom(user, queueCurrentLimit, clusterResource,
- computeUserLimit(application, clusterResource, user, partition,
- SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), partition);
+ computeUserLimit(application.getUser(), clusterResource, user,
+ partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
+ partition);
}
private Resource getHeadroom(User user,
@@ -1101,7 +1103,7 @@ public class LeafQueue extends AbstractCSQueue {
// Compute user limit respect requested labels,
// TODO, need consider headroom respect labels also
Resource userLimit =
- computeUserLimit(application, clusterResource, queueUser,
+ computeUserLimit(application.getUser(), clusterResource, queueUser,
nodePartition, schedulingMode);
setQueueResourceLimitsInfo(clusterResource);
@@ -1139,7 +1141,7 @@ public class LeafQueue extends AbstractCSQueue {
}
@Lock(NoLock.class)
- private Resource computeUserLimit(FiCaSchedulerApp application,
+ private Resource computeUserLimit(String userName,
Resource clusterResource, User user,
String nodePartition, SchedulingMode schedulingMode) {
Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
@@ -1239,7 +1241,6 @@ public class LeafQueue extends AbstractCSQueue {
minimumAllocation);
if (LOG.isDebugEnabled()) {
- String userName = application.getUser();
LOG.debug("User limit computation for " + userName +
" in queue " + getQueueName() +
" userLimitPercent=" + userLimit +
@@ -1815,11 +1816,22 @@ public class LeafQueue extends AbstractCSQueue {
/**
* Obtain (read-only) collection of active applications.
*/
- public Collection<FiCaSchedulerApp> getApplications() {
+ public synchronized Collection<FiCaSchedulerApp> getApplications() {
return Collections.unmodifiableCollection(orderingPolicy
.getSchedulableEntities());
}
+ /**
+ * Obtain (read-only) collection of all applications.
+ */
+ public synchronized Collection<FiCaSchedulerApp> getAllApplications() {
+ Collection<FiCaSchedulerApp> apps = new HashSet<FiCaSchedulerApp>(
+ pendingOrderingPolicy.getSchedulableEntities());
+ apps.addAll(orderingPolicy.getSchedulableEntities());
+
+ return Collections.unmodifiableCollection(apps);
+ }
+
// Consider the headroom for each user in the queue.
// Total pending for the queue =
// sum(for each user(min((user's headroom), sum(user's pending requests))))
@@ -1833,7 +1845,7 @@ public class LeafQueue extends AbstractCSQueue {
if (!userNameToHeadroom.containsKey(userName)) {
User user = getUser(userName);
Resource headroom = Resources.subtract(
- computeUserLimit(app, resources, user, partition,
+ computeUserLimit(app.getUser(), resources, user, partition,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
user.getUsed(partition));
// Make sure headroom is not negative.
@@ -1851,6 +1863,16 @@ public class LeafQueue extends AbstractCSQueue {
return pendingConsideringUserLimit;
}
+ public synchronized Resource getUserLimitPerUser(String userName,
+ Resource resources, String partition) {
+
+ // Check user resource limit
+ User user = getUser(userName);
+
+ return computeUserLimit(userName, resources, user, partition,
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ }
+
@Override
public synchronized void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
@@ -1901,8 +1923,8 @@ public class LeafQueue extends AbstractCSQueue {
}
/**
- * return all ignored partition exclusivity RMContainers in the LeafQueue, this
- * will be used by preemption policy, and use of return
+ * @return all ignored partition exclusivity RMContainers in the LeafQueue,
+ * this will be used by preemption policy, and use of return
* ignorePartitionExclusivityRMContainer should protected by LeafQueue
* synchronized lock
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 38995ff..b7b47b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -312,6 +313,23 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
return ret;
}
+ public synchronized Map<String, Resource> getTotalPendingRequestsPerPartition() {
+
+ Map<String, Resource> ret = new HashMap<String, Resource>();
+ Resource res = null;
+ for (Priority priority : appSchedulingInfo.getPriorities()) {
+ ResourceRequest rr = appSchedulingInfo.getResourceRequest(priority, "*");
+ if ((res = ret.get(rr.getNodeLabelExpression())) == null) {
+ res = Resources.createResource(0, 0);
+ ret.put(rr.getNodeLabelExpression(), res);
+ }
+
+ Resources.addTo(res,
+ Resources.multiply(rr.getCapability(), rr.getNumContainers()));
+ }
+ return ret;
+ }
+
public synchronized void markContainerForPreemption(ContainerId cont) {
// ignore already completed containers
if (liveContainers.containsKey(cont)) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
index e60c384..3c1d9f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
@@ -63,11 +63,14 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeSet;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doAnswer;
@@ -160,13 +163,17 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
mClock);
}
- private void mockContainers(String containersConfig, ApplicationAttemptId attemptId,
- String queueName, List<RMContainer> reservedContainers,
- List<RMContainer> liveContainers) {
+ private void mockContainers(String containersConfig, FiCaSchedulerApp app,
+ ApplicationAttemptId attemptId, String queueName,
+ List<RMContainer> reservedContainers, List<RMContainer> liveContainers) {
int containerId = 1;
int start = containersConfig.indexOf("=") + 1;
int end = -1;
+ Resource used = Resource.newInstance(0, 0);
+ Resource pending = Resource.newInstance(0, 0);
+ Priority pri = Priority.newInstance(0);
+
while (start < containersConfig.length()) {
while (start < containersConfig.length()
&& containersConfig.charAt(start) != '(') {
@@ -188,41 +195,50 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
// now we found start/end, get container values
String[] values = containersConfig.substring(start + 1, end).split(",");
- if (values.length != 6) {
+ if (values.length < 6 || values.length > 8) {
throw new IllegalArgumentException("Format to define container is:"
- + "(priority,resource,host,expression,repeat,reserved)");
+ + "(priority,resource,host,expression,repeat,reserved, pending)");
}
- Priority pri = Priority.newInstance(Integer.valueOf(values[0]));
+ pri.setPriority(Integer.valueOf(values[0]));
Resource res = parseResourceFromString(values[1]);
NodeId host = NodeId.newInstance(values[2], 1);
- String exp = values[3];
+ String label = values[3];
+ String userName = "user";
int repeat = Integer.valueOf(values[4]);
boolean reserved = Boolean.valueOf(values[5]);
+ if (values.length >= 7) {
+ Resources.addTo(pending, parseResourceFromString(values[6]));
+ }
+ if (values.length == 8) {
+ userName = values[7];
+ }
for (int i = 0; i < repeat; i++) {
Container c = mock(Container.class);
+ Resources.addTo(used, res);
when(c.getResource()).thenReturn(res);
when(c.getPriority()).thenReturn(pri);
RMContainerImpl rmc = mock(RMContainerImpl.class);
when(rmc.getAllocatedNode()).thenReturn(host);
- when(rmc.getNodeLabelExpression()).thenReturn(exp);
+ when(rmc.getNodeLabelExpression()).thenReturn(label);
when(rmc.getAllocatedResource()).thenReturn(res);
when(rmc.getContainer()).thenReturn(c);
when(rmc.getApplicationAttemptId()).thenReturn(attemptId);
when(rmc.getQueueName()).thenReturn(queueName);
- final ContainerId cId = ContainerId.newContainerId(attemptId, containerId);
- when(rmc.getContainerId()).thenReturn(
- cId);
+ final ContainerId cId = ContainerId.newContainerId(attemptId,
+ containerId);
+ when(rmc.getContainerId()).thenReturn(cId);
doAnswer(new Answer<Integer>() {
@Override
public Integer answer(InvocationOnMock invocation) throws Throwable {
- return cId.compareTo(((RMContainer) invocation.getArguments()[0])
- .getContainerId());
+ return cId.compareTo(
+ ((RMContainer) invocation.getArguments()[0]).getContainerId());
}
}).when(rmc).compareTo(any(RMContainer.class));
if (containerId == 1) {
when(rmc.isAMContainer()).thenReturn(true);
+ when(app.getAMResource(label)).thenReturn(res);
}
if (reserved) {
@@ -237,25 +253,44 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
// If this is a non-exclusive allocation
String partition = null;
- if (exp.isEmpty()
+ if (label.isEmpty()
&& !(partition = nodeIdToSchedulerNodes.get(host).getPartition())
- .isEmpty()) {
+ .isEmpty()) {
LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
- Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers =
- queue.getIgnoreExclusivityRMContainers();
+ Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers = queue
+ .getIgnoreExclusivityRMContainers();
if (!ignoreExclusivityContainers.containsKey(partition)) {
ignoreExclusivityContainers.put(partition,
new TreeSet<RMContainer>());
}
ignoreExclusivityContainers.get(partition).add(rmc);
}
- LOG.debug("add container to app=" + attemptId + " res=" + res
- + " node=" + host + " nodeLabelExpression=" + exp + " partition="
+ LOG.debug("add container to app=" + attemptId + " res=" + res + " node="
+ + host + " nodeLabelExpression=" + label + " partition="
+ partition);
containerId++;
}
+ // Some more app specific aggregated data can be better filled here.
+ when(app.getPriority()).thenReturn(pri);
+ when(app.getUser()).thenReturn(userName);
+ when(app.getCurrentConsumption()).thenReturn(used);
+ when(app.getCurrentReservation())
+ .thenReturn(Resources.createResource(0, 0));
+
+ Map<String, Resource> pendingForDefaultPartition =
+ new HashMap<String, Resource>();
+ // Add for default partition for now.
+ pendingForDefaultPartition.put(label, pending);
+ when(app.getTotalPendingRequestsPerPartition())
+ .thenReturn(pendingForDefaultPartition);
+
+ // need to set pending resource in resource usage as well
+ ResourceUsage ru = new ResourceUsage();
+ ru.setUsed(label, used);
+ when(app.getAppAttemptResourceUsage()).thenReturn(ru);
+
start = end + 1;
}
}
@@ -271,6 +306,8 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
*/
private void mockApplications(String appsConfig) {
int id = 1;
+ HashMap<String, HashSet<String>> userMap = new HashMap<String, HashSet<String>>();
+ LeafQueue queue = null;
for (String a : appsConfig.split(";")) {
String[] strs = a.split("\t");
String queueName = strs[0];
@@ -279,24 +316,49 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
List<RMContainer> liveContainers = new ArrayList<RMContainer>();
List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
ApplicationId appId = ApplicationId.newInstance(0L, id);
- ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId
+ .newInstance(appId, 1);
- mockContainers(strs[1], appAttemptId, queueName, reservedContainers,
+ FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
+ when(app.getAMResource(anyString()))
+ .thenReturn(Resources.createResource(0, 0));
+ mockContainers(strs[1], app, appAttemptId, queueName, reservedContainers,
liveContainers);
+ LOG.debug("Application mock: queue: " + queueName + ", appId:" + appId);
- FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
when(app.getLiveContainers()).thenReturn(liveContainers);
when(app.getReservedContainers()).thenReturn(reservedContainers);
when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
when(app.getApplicationId()).thenReturn(appId);
- when(app.getPriority()).thenReturn(Priority.newInstance(0));
// add to LeafQueue
- LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
+ queue = (LeafQueue) nameToCSQueues.get(queueName);
queue.getApplications().add(app);
+ queue.getAllApplications().add(app);
+ HashSet<String> users = userMap.get(queueName);
+ if (null == users) {
+ users = new HashSet<String>();
+ userMap.put(queueName, users);
+ }
+
+ users.add(app.getUser());
id++;
}
+
+ for (String queueName : userMap.keySet()) {
+ queue = (LeafQueue) nameToCSQueues.get(queueName);
+ // Currently we have user-limit test support only for default label.
+ Resource totResoucePerPartition = partitionToResource.get("");
+ Resource capacity = Resources.multiply(totResoucePerPartition,
+ queue.getQueueCapacities().getAbsoluteCapacity());
+ HashSet<String> users = userMap.get(queue.getQueueName());
+ Resource userLimit = Resources.divideAndCeil(rc, capacity, users.size());
+ for (String user : users) {
+ when(queue.getUserLimitPerUser(eq(user), any(Resource.class),
+ anyString())).thenReturn(userLimit);
+ }
+ }
}
private void addContainerToSchedulerNode(NodeId nodeId, RMContainer container,
@@ -430,10 +492,18 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
new Comparator<FiCaSchedulerApp>() {
@Override
public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
- return a1.getApplicationId().compareTo(a2.getApplicationId());
+ if (a1.getPriority() != null
+ && !a1.getPriority().equals(a2.getPriority())) {
+ return a1.getPriority().compareTo(a2.getPriority());
+ }
+
+ int res = a1.getApplicationId()
+ .compareTo(a2.getApplicationId());
+ return res;
}
});
when(leafQueue.getApplications()).thenReturn(apps);
+ when(leafQueue.getAllApplications()).thenReturn(apps);
OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
when(so.getPreemptionIterator()).thenAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
@@ -512,10 +582,15 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
float absUsed = Resources.divide(rc, totResoucePerPartition,
parseResourceFromString(values[2].trim()), totResoucePerPartition)
+ epsilon;
+ float used = Resources.divide(rc, totResoucePerPartition,
+ parseResourceFromString(values[2].trim()),
+ parseResourceFromString(values[0].trim())) + epsilon;
Resource pending = parseResourceFromString(values[3].trim());
qc.setAbsoluteCapacity(partitionName, absGuaranteed);
qc.setAbsoluteMaximumCapacity(partitionName, absMax);
qc.setAbsoluteUsedCapacity(partitionName, absUsed);
+ qc.setUsedCapacity(partitionName, used);
+ when(queue.getUsedCapacity()).thenReturn(used);
ru.setPending(partitionName, pending);
if (!isParent(queueExprArray, idx)) {
LeafQueue lq = (LeafQueue) queue;
@@ -530,6 +605,7 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
reserved = parseResourceFromString(values[4].trim());
ru.setReserved(partitionName, reserved);
}
+
LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
+ " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
+ ",abs_used" + absUsed + ",pending_resource=" + pending
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
new file mode 100644
index 0000000..19fb0d2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
@@ -0,0 +1,868 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test class for IntraQueuePreemption scenarios.
+ */
+public class TestProportionalCapacityPreemptionPolicyIntraQueue
+ extends
+ ProportionalCapacityPreemptionPolicyMockFramework {
+ @Before
+ public void setup() {
+ super.setup();
+ conf.setBoolean(
+ CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true);
+ policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
+ }
+
+ @Test
+ public void testSimpleIntraQueuePreemption() throws IOException {
+ /**
+ * The simplest test preemption, Queue structure is:
+ *
+ * <pre>
+ * root
+ * / | | \
+ * a b c d
+ * </pre>
+ *
+ * Guaranteed resource of a/b/c/d are 11:40:20:29 Total cluster resource =
+ * 100
+ * Scenario:
+ * Queue B has few running apps and two high priority apps have demand.
+ * Apps which are running at low priority (4) will preempt few of its
+ * resources to meet the demand.
+ */
+ String labelsConfig = "=100,true;";
+ String nodesConfig = // n1 has no label
+ "n1= res=100";
+ String queuesConfig =
+ // guaranteed,max,used,pending,reserved
+ "root(=[100 100 80 120 0]);" + // root
+ "-a(=[11 100 11 50 0]);" + // a
+ "-b(=[40 100 38 60 0]);" + // b
+ "-c(=[20 100 10 10 0]);" + // c
+ "-d(=[29 100 20 0 0])"; // d
+
+ String appsConfig =
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,
+ // pending)
+ "a\t" // app1 in a
+ + "(1,1,n1,,6,false,25);" + // app1 a
+ "a\t" // app2 in a
+ + "(1,1,n1,,5,false,25);" + // app2 a
+ "b\t" // app3 in b
+ + "(4,1,n1,,34,false,20);" + // app3 b
+ "b\t" // app4 in b
+ + "(4,1,n1,,2,false,10);" + // app4 b
+ "b\t" // app4 in b
+ + "(5,1,n1,,1,false,10);" + // app5 b
+ "b\t" // app4 in b
+ + "(6,1,n1,,1,false,10);" + // app6 in b
+ "c\t" // app1 in a
+ + "(1,1,n1,,10,false,10);" + "d\t" // app7 in c
+ + "(1,1,n1,,20,false,0)";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // For queue B, app3 and app4 were of lower priority. Hence take 8
+ // containers from them by hitting the intraQueuePreemptionDemand of 20%.
+ verify(mDisp, times(1)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(4))));
+ verify(mDisp, times(7)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(3))));
+ }
+
+ @Test
+ public void testNoPreemptionForSamePriorityApps() throws IOException {
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / | | \
+ * a b c d
+ * </pre>
+ *
+ * Guaranteed resource of a/b/c/d are 10:40:20:30 Total cluster resource =
+ * 100
+ * Scenario: In queue A/B, all apps are running at same priority. However
+ * there are many demands also from these apps. Since all apps are at same
+ * priority, preemption should not occur here.
+ */
+ String labelsConfig = "=100,true;";
+ String nodesConfig = // n1 has no label
+ "n1= res=100";
+ String queuesConfig =
+ // guaranteed,max,used,pending,reserved
+ "root(=[100 100 80 120 0]);" + // root
+ "-a(=[10 100 10 50 0]);" + // a
+ "-b(=[40 100 40 60 0]);" + // b
+ "-c(=[20 100 10 10 0]);" + // c
+ "-d(=[30 100 20 0 0])"; // d
+
+ String appsConfig =
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,
+ // pending)
+ "a\t" // app1 in a
+ + "(1,1,n1,,6,false,25);" + // app1 a
+ "a\t" // app2 in a
+ + "(1,1,n1,,5,false,25);" + // app2 a
+ "b\t" // app3 in b
+ + "(1,1,n1,,34,false,20);" + // app3 b
+ "b\t" // app4 in b
+ + "(1,1,n1,,2,false,10);" + // app4 b
+ "b\t" // app4 in b
+ + "(1,1,n1,,1,false,20);" + // app5 b
+ "b\t" // app4 in b
+ + "(1,1,n1,,1,false,10);" + // app6 in b
+ "c\t" // app1 in a
+ + "(1,1,n1,,10,false,10);" + "d\t" // app7 in c
+ + "(1,1,n1,,20,false,0)";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // For queue B, none of the apps should be preempted.
+ verify(mDisp, times(0)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(4))));
+ verify(mDisp, times(0)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(3))));
+ verify(mDisp, times(0)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(5))));
+ verify(mDisp, times(0)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(6))));
+ }
+
+ @Test
+ public void testNoPreemptionWhenQueueIsUnderCapacityLimit()
+ throws IOException {
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ *
+ * Scenario:
+ * Guaranteed resource of a/b are 40:60 Total cluster resource = 100 BY
+ * default, this limit is 50%. Test to verify that there wont be any
+ * preemption since used capacity is under 50% for queue a/b even though
+ * there are demands from high priority apps in queue.
+ */
+ String labelsConfig = "=100,true;";
+ String nodesConfig = // n1 has no label
+ "n1= res=100";
+ String queuesConfig =
+ // guaranteed,max,used,pending,reserved
+ "root(=[100 100 35 80 0]);" + // root
+ "-a(=[40 100 10 50 0]);" + // a
+ "-b(=[60 100 25 30 0])"; // b
+
+ String appsConfig =
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+ "a\t" // app1 in a
+ + "(1,1,n1,,5,false,25);" + // app1 a
+ "a\t" // app2 in a
+ + "(2,1,n1,,5,false,25);" + // app2 a
+ "b\t" // app3 in b
+ + "(4,1,n1,,40,false,20);" + // app3 b
+ "b\t" // app1 in a
+ + "(6,1,n1,,5,false,20)";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // For queue A/B, none of the apps should be preempted as used capacity
+ // is under 50%.
+ verify(mDisp, times(0)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(1))));
+ verify(mDisp, times(0)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(2))));
+ verify(mDisp, times(0)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(3))));
+ verify(mDisp, times(0)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(4))));
+ }
+
+ @Test
+ public void testLimitPreemptionWithMaxIntraQueuePreemptableLimit()
+ throws IOException {
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ *
+ * Guaranteed resource of a/b are 40:60 Total cluster resource = 100
+ * maxIntraQueuePreemptableLimit by default is 50%. This test is to verify
+ * that the maximum preemption should occur upto 50%, eventhough demand is
+ * more.
+ */
+
+ // Set max preemption limit as 50%.
+ conf.setFloat(CapacitySchedulerConfiguration.
+ INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+ (float) 0.5);
+
+ String labelsConfig = "=100,true;";
+ String nodesConfig = // n1 has no label
+ "n1= res=100";
+ String queuesConfig =
+ // guaranteed,max,used,pending,reserved
+ "root(=[100 100 55 170 0]);" + // root
+ "-a(=[40 100 10 50 0]);" + // a
+ "-b(=[60 100 45 120 0])"; // b
+
+ String appsConfig =
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+ "a\t" // app1 in a
+ + "(1,1,n1,,5,false,25);" + // app1 a
+ "a\t" // app2 in a
+ + "(2,1,n1,,5,false,25);" + // app2 a
+ "b\t" // app3 in b
+ + "(4,1,n1,,40,false,20);" + // app3 b
+ "b\t" // app1 in a
+ + "(6,1,n1,,5,false,100)";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // For queueB, eventhough app4 needs 100 resources, only 30 resources were
+ // preempted. (max is 50% of guaranteed cap of any queue
+ // "maxIntraQueuePreemptable")
+ verify(mDisp, times(30)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(3))));
+ }
+
+ @Test
+ public void testLimitPreemptionWithTotalPreemptedResourceAllowed()
+ throws IOException {
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ *
+ * Scenario:
+ * Guaranteed resource of a/b are 40:60 Total cluster resource = 100
+ * totalPreemption allowed is 10%. This test is to verify that only
+ * 10% is preempted.
+ */
+
+ // report "ideal" preempt as 10%. Ensure preemption happens only for 10%
+ conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
+ (float) 0.1);
+
+ String labelsConfig = "=100,true;";
+ String nodesConfig = // n1 has no label
+ "n1= res=100";
+ String queuesConfig =
+ // guaranteed,max,used,pending,reserved
+ "root(=[100 100 55 170 0]);" + // root
+ "-a(=[40 100 10 50 0]);" + // a
+ "-b(=[60 100 45 120 0])"; // b
+
+ String appsConfig =
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+ "a\t" // app1 in a
+ + "(1,1,n1,,5,false,25);" + // app1 a
+ "a\t" // app2 in a
+ + "(2,1,n1,,5,false,25);" + // app2 a
+ "b\t" // app3 in b
+ + "(4,1,n1,,40,false,20);" + // app3 b
+ "b\t" // app1 in a
+ + "(6,1,n1,,5,false,100)";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // For queue B eventhough app4 needs 100 resources, only 10 resources were
+ // preempted. This is the 10% limit of TOTAL_PREEMPTION_PER_ROUND.
+ verify(mDisp, times(10)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(3))));
+ }
+
+ @Test
+ public void testAlreadySelectedContainerFromInterQueuePreemption()
+ throws IOException {
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ *
+ * Scenario:
+ * Guaranteed resource of a/b are 40:60 Total cluster resource = 100
+ * QueueB is under utilized and QueueA has to release 9 containers here.
+ * However within queue A, high priority app has also a demand for 20.
+ * So additional 11 more containers will be preempted making a tota of 20.
+ */
+
+ // Set max preemption limit as 50%.
+ conf.setFloat(CapacitySchedulerConfiguration.
+ INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+ (float) 0.5);
+
+ String labelsConfig = "=100,true;";
+ String nodesConfig = // n1 has no label
+ "n1= res=100";
+ String queuesConfig =
+ // guaranteed,max,used,pending,reserved
+ "root(=[100 100 95 170 0]);" + // root
+ "-a(=[60 100 70 50 0]);" + // a
+ "-b(=[40 100 25 120 0])"; // b
+
+ String appsConfig =
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+ "a\t" // app1 in a
+ + "(1,1,n1,,50,false,15);" + // app1 a
+ "a\t" // app2 in a
+ + "(2,1,n1,,20,false,20);" + // app2 a
+ "b\t" // app3 in b
+ + "(4,1,n1,,20,false,20);" + // app3 b
+ "b\t" // app1 in a
+ + "(4,1,n1,,5,false,100)";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // As per intra queue preemption algorithm, 20 more containers were needed
+ // for app2 (in queue a). Inter queue pre-emption had already preselected 9
+ // containers and hence preempted only 11 more.
+ verify(mDisp, times(20)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(1))));
+ verify(mDisp, never()).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(2))));
+ }
+
+ @Test
+ public void testSkipAMContainersInInterQueuePreemption() throws IOException {
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ *
+ * Scenario:
+ * Guaranteed resource of a/b are 60:40 Total cluster resource = 100
+ * While preempting containers during intra-queue preemption, AM containers
+ * will be spared for now. Verify the same.
+ */
+
+ // Set max preemption limit as 50%.
+ conf.setFloat(CapacitySchedulerConfiguration.
+ INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+ (float) 0.5);
+
+ String labelsConfig = "=100,true;";
+ String nodesConfig = // n1 has no label
+ "n1= res=100";
+ String queuesConfig =
+ // guaranteed,max,used,pending,reserved
+ "root(=[100 100 100 170 0]);" + // root
+ "-a(=[60 100 60 50 0]);" + // a
+ "-b(=[40 100 40 120 0])"; // b
+
+ String appsConfig =
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+ "a\t" // app1 in a
+ + "(1,1,n1,,30,false,10);" + "a\t" // app2 in a
+ + "(1,1,n1,,10,false,20);" + "a\t" // app3 in a
+ + "(2,1,n1,,20,false,20);" + "b\t" // app4 in b
+ + "(4,1,n1,,20,false,20);" + "b\t" // app5 in a
+ + "(4,1,n1,,20,false,100)";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // Ensure that only 9 containers are preempted from app2 (sparing 1 AM)
+ verify(mDisp, times(11)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(1))));
+ verify(mDisp, times(9)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(2))));
+ }
+
+ @Test
+ public void testSkipAMContainersInInterQueuePreemptionSingleApp()
+ throws IOException {
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ *
+ * Scenario:
+ * Guaranteed resource of a/b are 50:50 Total cluster resource = 100
+ * Spare Am container from a lower priority app during its preemption
+ * cycle. Eventhough there are more demand and no other low priority
+ * apps are present, still AM contaier need to soared.
+ */
+
+ String labelsConfig = "=100,true;";
+ String nodesConfig = // n1 has no label
+ "n1= res=100";
+ String queuesConfig =
+ // guaranteed,max,used,pending,reserved
+ "root(=[100 100 100 170 0]);" + // root
+ "-a(=[50 100 50 50 0]);" + // a
+ "-b(=[50 100 50 120 0])"; // b
+
+ String appsConfig =
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+ "a\t" // app1 in a
+ + "(1,1,n1,,10,false,10);" + "a\t" // app1 in a
+ + "(2,1,n1,,40,false,10);" + "b\t" // app2 in a
+ + "(4,1,n1,,20,false,20);" + "b\t" // app3 in b
+ + "(4,1,n1,,30,false,100)";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // Make sure that app1's Am container is spared. Only 9/10 is preempted.
+ verify(mDisp, times(9)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(1))));
+ verify(mDisp, never()).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(2))));
+ }
+
+ @Test
+ public void testNoPreemptionForSingleApp() throws IOException {
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ *
+ * Scenario:
+ * Guaranteed resource of a/b are 60:40 Total cluster resource = 100
+ * Only one app is running in queue. And it has more demand but no
+ * resource are available in queue. Preemption must not occur here.
+ */
+
+ String labelsConfig = "=100,true;";
+ String nodesConfig = // n1 has no label
+ "n1= res=100";
+ String queuesConfig =
+ // guaranteed,max,used,pending,reserved
+ "root(=[100 100 20 50 0]);" + // root
+ "-a(=[60 100 20 50 0]);" + // a
+ "-b(=[40 100 0 0 0])"; // b
+
+ String appsConfig =
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+ "a\t" // app1 in a
+ + "(4,1,n1,,20,false,50)";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // Ensure there are 0 preemptions since only one app is running in queue.
+ verify(mDisp, times(0)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(1))));
+ }
+
+ @Test
+ public void testOverutilizedQueueResourceWithInterQueuePreemption()
+ throws IOException {
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ * Scenario:
+ * Guaranteed resource of a/b are 20:80 Total cluster resource = 100
+ * QueueB is under utilized and 20 resource will be released from queueA.
+ * 10 containers will also selected for intra-queue too but it will be
+ * pre-selected.
+ */
+
+ String labelsConfig = "=100,true;";
+ String nodesConfig = // n1 has no label
+ "n1= res=100";
+ String queuesConfig =
+ // guaranteed,max,used,pending,reserved
+ "root(=[100 100 100 70 0]);" + // root
+ "-a(=[20 100 100 30 0]);" + // a
+ "-b(=[80 100 0 20 0])"; // b
+
+ String appsConfig =
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+ "a\t" // app1 in a
+ + "(1,1,n1,,50,false,0);" + "a\t" // app1 in a
+ + "(3,1,n1,,50,false,30);" + "b\t" // app2 in a
+ + "(4,1,n1,,0,false,20)";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // Complete demand request from QueueB for 20 resource must be preempted.
+ verify(mDisp, times(20)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(1))));
+ verify(mDisp, times(0)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(2))));
+ verify(mDisp, times(0)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(3))));
+ }
+
+ @Test
+ public void testNodePartitionIntraQueuePreemption() throws IOException {
+ /**
+ * The simplest test of node label, Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ *
+ * Scenario:
+ * Both a/b can access x, and guaranteed capacity of them is 50:50. Two
+ * nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster,
+ * app1/app2/app3 in a, and app4/app5 in b. app1 uses 50 x, app2 uses 50
+ * NO_LABEL, app3 uses 50 x, app4 uses 50 NO_LABEL. a has 20 pending
+ * resource for x for app3 of priority 2
+ *
+ * After preemption, it should preempt 20 from app1
+ */
+
+ // Set max preemption limit as 50%.
+ conf.setFloat(CapacitySchedulerConfiguration.
+ INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+ (float) 0.5);
+
+ String labelsConfig = "=100,true;" + // default partition
+ "x=100,true"; // partition=x
+ String nodesConfig = "n1=x;" + // n1 has partition=x
+ "n2="; // n2 is default partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100 100 100 100],x=[100 100 100 100]);" + // root
+ "-a(=[50 100 50 50],x=[50 100 50 50]);" + // a
+ "-b(=[50 100 50 50],x=[50 100 50 50])"; // b
+ String appsConfig =
+ // queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t" // app1 in a
+ + "(1,1,n1,x,50,false,10);" + // 50 * x in n1
+ "a\t" // app2 in a
+ + "(2,1,n1,x,0,false,20);" + // 0 * x in n1
+ "a\t" // app2 in a
+ + "(1,1,n2,,50,false);" + // 50 default in n2
+ "b\t" // app3 in b
+ + "(1,1,n1,x,50,false);" + // 50 * x in n1
+ "b\t" // app4 in b
+ + "(1,1,n2,,50,false)"; // 50 default in n2
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // 20 preempted from app1
+ verify(mDisp, times(20))
+ .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+ verify(mDisp, never())
+ .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+ verify(mDisp, never())
+ .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
+ }
+
+ @Test
+ public void testComplexIntraQueuePreemption() throws IOException {
+ /**
+ * The complex test preemption, Queue structure is:
+ *
+ * <pre>
+ * root
+ * / | | \
+ * a b c d
+ * </pre>
+ *
+ * Scenario:
+ * Guaranteed resource of a/b/c/d are 10:40:20:30 Total cluster resource =
+ * 100
+ * All queues under its capacity, but within each queue there are many
+ * under served applications.
+ */
+
+ // report "ideal" preempt as 50%. Ensure preemption happens only for 50%
+ conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
+ (float) 0.5);
+ // Set max preemption limit as 50%.
+ conf.setFloat(CapacitySchedulerConfiguration.
+ INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+ (float) 0.5);
+
+ String labelsConfig = "=100,true;";
+ String nodesConfig = // n1 has no label
+ "n1= res=100";
+ String queuesConfig =
+ // guaranteed,max,used,pending,reserved
+ "root(=[100 100 75 130 0]);" + // root
+ "-a(=[10 100 5 50 0]);" + // a
+ "-b(=[40 100 35 60 0]);" + // b
+ "-c(=[20 100 10 10 0]);" + // c
+ "-d(=[30 100 25 10 0])"; // d
+
+ String appsConfig =
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,
+ // pending)
+ "a\t" // app1 in a
+ + "(1,1,n1,,5,false,25);" + // app1 a
+ "a\t"
+ + "(4,1,n1,,0,false,25);" + // app2 a
+ "a\t"
+ + "(5,1,n1,,0,false,2);" + // app3 a
+ "b\t"
+ + "(3,1,n1,,5,false,20);" + // app4 b
+ "b\t"
+ + "(4,1,n1,,15,false,10);" + // app5 b
+ "b\t"
+ + "(4,1,n1,,10,false,10);" + // app6 b
+ "b\t"
+ + "(5,1,n1,,3,false,5);" + // app7 b
+ "b\t"
+ + "(5,1,n1,,0,false,2);" + // app8 b
+ "b\t"
+ + "(6,1,n1,,2,false,10);" + // app9 in b
+ "c\t"
+ + "(1,1,n1,,8,false,10);" + // app10 in c
+ "c\t"
+ + "(1,1,n1,,2,false,5);" + // app11 in c
+ "c\t"
+ + "(2,1,n1,,0,false,3);" + "d\t" // app12 in c
+ + "(2,1,n1,,25,false,0);" + "d\t" // app13 in d
+ + "(1,1,n1,,0,false,20)";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // High priority app in queueA has 30 resource demand. But low priority
+ // app has only 5 resource. Hence preempt 4 here sparing AM.
+ verify(mDisp, times(4)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(1))));
+ // Multiple high priority apps has demand of 17. This will be preempted
+ // from another set of low priority apps.
+ verify(mDisp, times(4)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(4))));
+ verify(mDisp, times(9)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(6))));
+ verify(mDisp, times(4)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(5))));
+ // Only 3 resources will be freed in this round for queue C as we
+ // are trying to save AM container.
+ verify(mDisp, times(2)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(10))));
+ verify(mDisp, times(1)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(11))));
+ }
+
+ @Test
+ public void testIntraQueuePreemptionWithTwoUsers()
+ throws IOException {
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ *
+ * Scenario:
+ * Guaranteed resource of a/b are 40:60 Total cluster resource = 100
+ * Consider 2 users in a queue, assume minimum user limit factor is 50%.
+ * Hence in queueB of 40, each use has a quota of 20. app4 of high priority
+ * has a demand of 30 and its already using 5. Adhering to userlimit only
+ * 15 more must be preempted. If its same user,20 would have been preempted
+ */
+
+ // Set max preemption limit as 50%.
+ conf.setFloat(CapacitySchedulerConfiguration.
+ INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+ (float) 0.5);
+
+ String labelsConfig = "=100,true;";
+ String nodesConfig = // n1 has no label
+ "n1= res=100";
+ String queuesConfig =
+ // guaranteed,max,used,pending,reserved
+ "root(=[100 100 55 170 0]);" + // root
+ "-a(=[60 100 10 50 0]);" + // a
+ "-b(=[40 100 40 120 0])"; // b
+
+ String appsConfig =
+ // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+ "a\t" // app1 in a
+ + "(1,1,n1,,5,false,25);" + // app1 a
+ "a\t" // app2 in a
+ + "(2,1,n1,,5,false,25);" + // app2 a
+ "b\t" // app3 in b
+ + "(4,1,n1,,35,false,20,user1);" + // app3 b
+ "b\t" // app4 in b
+ + "(6,1,n1,,5,false,30,user2)";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // Considering user-limit of 50% since only 2 users are there, only preempt
+ // 15 more (5 is already running) eventhough demand is for 30.
+ verify(mDisp, times(15)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(3))));
+ }
+
+ @Test
+ public void testComplexNodePartitionIntraQueuePreemption()
+ throws IOException {
+ /**
+ * The simplest test of node label, Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ *
+ * Scenario:
+ * Both a/b can access x, and guaranteed capacity of them is 50:50. Two
+ * nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster,
+ * app1-app4 in a, and app5-app9 in b.
+ *
+ */
+
+ // Set max preemption limit as 50%.
+ conf.setFloat(CapacitySchedulerConfiguration.
+ INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+ (float) 0.5);
+
+ String labelsConfig = "=100,true;" + // default partition
+ "x=100,true"; // partition=x
+ String nodesConfig = "n1=x;" + // n1 has partition=x
+ "n2="; // n2 is default partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100 100 100 100],x=[100 100 100 100]);" + // root
+ "-a(=[50 100 50 50],x=[50 100 40 50]);" + // a
+ "-b(=[50 100 35 50],x=[50 100 50 50])"; // b
+ String appsConfig =
+ // queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t" // app1 in a
+ + "(1,1,n1,x,35,false,10);" + // 20 * x in n1
+ "a\t" // app2 in a
+ + "(1,1,n1,x,5,false,10);" + // 20 * x in n1
+ "a\t" // app3 in a
+ + "(2,1,n1,x,0,false,20);" + // 0 * x in n1
+ "a\t" // app4 in a
+ + "(1,1,n2,,50,false);" + // 50 default in n2
+ "b\t" // app5 in b
+ + "(1,1,n1,x,50,false);" + // 50 * x in n1
+ "b\t" // app6 in b
+ + "(1,1,n2,,25,false);" + // 25 * default in n2
+ "b\t" // app7 in b
+ + "(1,1,n2,,3,false);" + // 3 * default in n2
+ "b\t" // app8 in b
+ + "(1,1,n2,,2,false);" + // 2 * default in n2
+ "b\t" // app9 in b
+ + "(5,1,n2,,5,false,30)"; // 50 default in n2
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // Label X: app3 has demand of 20 for label X. Hence app2 will loose
+ // 4 (sparing AM) and 16 more from app1 till preemption limit is met.
+ verify(mDisp, times(16))
+ .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+ verify(mDisp, times(4))
+ .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+
+ // Default Label:For a demand of 30, preempt from all low priority
+ // apps of default label. 25 will be preempted as preemption limit is
+ // met.
+ verify(mDisp, times(1))
+ .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(8))));
+ verify(mDisp, times(2))
+ .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(7))));
+ verify(mDisp, times(22))
+ .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(6))));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/2] hadoop git commit: YARN-2009. CapacityScheduler: Add
intra-queue preemption for app priority support. (Sunil G via wangda)
Posted by ep...@apache.org.
YARN-2009. CapacityScheduler: Add intra-queue preemption for app priority support. (Sunil G via wangda)
(cherry-picked from commit 90dd3a8148468ac37a3f2173ad8d45e38bfcb0c9)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6a18ae84
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6a18ae84
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6a18ae84
Branch: refs/heads/branch-2.8
Commit: 6a18ae849faa9becccd66b382296213a6c08842e
Parents: 01b50b3
Author: Wangda Tan <wa...@apache.org>
Authored: Mon Oct 31 15:18:31 2016 -0700
Committer: Eric Payne <ep...@apache.org>
Committed: Mon Dec 12 23:13:08 2016 +0000
----------------------------------------------------------------------
.../AbstractPreemptableResourceCalculator.java | 244 ++++++
.../capacity/AbstractPreemptionEntity.java | 98 +++
.../CapacitySchedulerPreemptionContext.java | 14 +
.../CapacitySchedulerPreemptionUtils.java | 119 ++-
.../capacity/FifoCandidatesSelector.java | 129 +--
.../FifoIntraQueuePreemptionPlugin.java | 459 ++++++++++
.../capacity/IntraQueueCandidatesSelector.java | 238 +++++
.../IntraQueuePreemptionComputePlugin.java | 39 +
.../capacity/PreemptableResourceCalculator.java | 183 +---
.../capacity/PreemptionCandidatesSelector.java | 32 +-
.../ProportionalCapacityPreemptionPolicy.java | 86 +-
.../monitor/capacity/TempAppPerPartition.java | 101 +++
.../monitor/capacity/TempQueuePerPartition.java | 142 ++-
.../CapacitySchedulerConfiguration.java | 31 +
.../scheduler/capacity/LeafQueue.java | 40 +-
.../scheduler/common/fica/FiCaSchedulerApp.java | 18 +
...alCapacityPreemptionPolicyMockFramework.java | 126 ++-
...ionalCapacityPreemptionPolicyIntraQueue.java | 868 +++++++++++++++++++
18 files changed, 2552 insertions(+), 415 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
new file mode 100644
index 0000000..8255a30
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+/**
+ * Calculate how much resources need to be preempted for each queue,
+ * will be used by {@link PreemptionCandidatesSelector}.
+ */
+public class AbstractPreemptableResourceCalculator {
+
+ protected final CapacitySchedulerPreemptionContext context;
+ protected final ResourceCalculator rc;
+ private boolean isReservedPreemptionCandidatesSelector;
+
+ static class TQComparator implements Comparator<TempQueuePerPartition> {
+ private ResourceCalculator rc;
+ private Resource clusterRes;
+
+ TQComparator(ResourceCalculator rc, Resource clusterRes) {
+ this.rc = rc;
+ this.clusterRes = clusterRes;
+ }
+
+ @Override
+ public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) {
+ if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) {
+ return -1;
+ }
+ if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) {
+ return 1;
+ }
+ return 0;
+ }
+
+ // Calculates idealAssigned / guaranteed
+ // TempQueues with 0 guarantees are always considered the most over
+ // capacity and therefore considered last for resources.
+ private double getIdealPctOfGuaranteed(TempQueuePerPartition q) {
+ double pctOver = Integer.MAX_VALUE;
+ if (q != null && Resources.greaterThan(rc, clusterRes, q.getGuaranteed(),
+ Resources.none())) {
+ pctOver = Resources.divide(rc, clusterRes, q.idealAssigned,
+ q.getGuaranteed());
+ }
+ return (pctOver);
+ }
+ }
+
+ /**
+ * PreemptableResourceCalculator constructor.
+ *
+ * @param preemptionContext context
+ * @param isReservedPreemptionCandidatesSelector
+ * this will be set by different implementation of candidate
+ * selectors, please refer to TempQueuePerPartition#offer for
+ * details.
+ */
+ public AbstractPreemptableResourceCalculator(
+ CapacitySchedulerPreemptionContext preemptionContext,
+ boolean isReservedPreemptionCandidatesSelector) {
+ context = preemptionContext;
+ rc = preemptionContext.getResourceCalculator();
+ this.isReservedPreemptionCandidatesSelector =
+ isReservedPreemptionCandidatesSelector;
+ }
+
+ /**
+ * Given a set of queues compute the fix-point distribution of unassigned
+ * resources among them. As pending request of a queue are exhausted, the
+ * queue is removed from the set and remaining capacity redistributed among
+ * remaining queues. The distribution is weighted based on guaranteed
+ * capacity, unless asked to ignoreGuarantee, in which case resources are
+ * distributed uniformly.
+ *
+ * @param totGuarant
+ * total guaranteed resource
+ * @param qAlloc
+ * List of child queues
+ * @param unassigned
+ * Unassigned resource per queue
+ * @param ignoreGuarantee
+ * ignore guarantee per queue.
+ */
+ protected void computeFixpointAllocation(Resource totGuarant,
+ Collection<TempQueuePerPartition> qAlloc, Resource unassigned,
+ boolean ignoreGuarantee) {
+ // Prior to assigning the unused resources, process each queue as follows:
+ // If current > guaranteed, idealAssigned = guaranteed + untouchable extra
+ // Else idealAssigned = current;
+ // Subtract idealAssigned resources from unassigned.
+ // If the queue has all of its needs met (that is, if
+ // idealAssigned >= current + pending), remove it from consideration.
+ // Sort queues from most under-guaranteed to most over-guaranteed.
+ TQComparator tqComparator = new TQComparator(rc, totGuarant);
+ PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,
+ tqComparator);
+ for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
+ TempQueuePerPartition q = i.next();
+ Resource used = q.getUsed();
+
+ if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) {
+ q.idealAssigned = Resources.add(q.getGuaranteed(), q.untouchableExtra);
+ } else {
+ q.idealAssigned = Resources.clone(used);
+ }
+ Resources.subtractFrom(unassigned, q.idealAssigned);
+ // If idealAssigned < (allocated + used + pending), q needs more
+ // resources, so
+ // add it to the list of underserved queues, ordered by need.
+ Resource curPlusPend = Resources.add(q.getUsed(), q.pending);
+ if (Resources.lessThan(rc, totGuarant, q.idealAssigned, curPlusPend)) {
+ orderedByNeed.add(q);
+ }
+ }
+
+ // assign all cluster resources until no more demand, or no resources are
+ // left
+ while (!orderedByNeed.isEmpty() && Resources.greaterThan(rc, totGuarant,
+ unassigned, Resources.none())) {
+ Resource wQassigned = Resource.newInstance(0, 0);
+ // we compute normalizedGuarantees capacity based on currently active
+ // queues
+ resetCapacity(unassigned, orderedByNeed, ignoreGuarantee);
+
+ // For each underserved queue (or set of queues if multiple are equally
+ // underserved), offer its share of the unassigned resources based on its
+ // normalized guarantee. After the offer, if the queue is not satisfied,
+ // place it back in the ordered list of queues, recalculating its place
+ // in the order of most under-guaranteed to most over-guaranteed. In this
+ // way, the most underserved queue(s) are always given resources first.
+ Collection<TempQueuePerPartition> underserved = getMostUnderservedQueues(
+ orderedByNeed, tqComparator);
+ for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
+ .hasNext();) {
+ TempQueuePerPartition sub = i.next();
+ Resource wQavail = Resources.multiplyAndNormalizeUp(rc, unassigned,
+ sub.normalizedGuarantee, Resource.newInstance(1, 1));
+ Resource wQidle = sub.offer(wQavail, rc, totGuarant,
+ isReservedPreemptionCandidatesSelector);
+ Resource wQdone = Resources.subtract(wQavail, wQidle);
+
+ if (Resources.greaterThan(rc, totGuarant, wQdone, Resources.none())) {
+ // The queue is still asking for more. Put it back in the priority
+ // queue, recalculating its order based on need.
+ orderedByNeed.add(sub);
+ }
+ Resources.addTo(wQassigned, wQdone);
+ }
+ Resources.subtractFrom(unassigned, wQassigned);
+ }
+
+ // Sometimes its possible that, all queues are properly served. So intra
+ // queue preemption will not try for any preemption. How ever there are
+ // chances that within a queue, there are some imbalances. Hence make sure
+ // all queues are added to list.
+ while (!orderedByNeed.isEmpty()) {
+ TempQueuePerPartition q1 = orderedByNeed.remove();
+ context.addPartitionToUnderServedQueues(q1.queueName, q1.partition);
+ }
+ }
+
+ /**
+ * Computes a normalizedGuaranteed capacity based on active queues.
+ *
+ * @param clusterResource
+ * the total amount of resources in the cluster
+ * @param queues
+ * the list of queues to consider
+ * @param ignoreGuar
+ * ignore guarantee.
+ */
+ private void resetCapacity(Resource clusterResource,
+ Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
+ Resource activeCap = Resource.newInstance(0, 0);
+
+ if (ignoreGuar) {
+ for (TempQueuePerPartition q : queues) {
+ q.normalizedGuarantee = 1.0f / queues.size();
+ }
+ } else {
+ for (TempQueuePerPartition q : queues) {
+ Resources.addTo(activeCap, q.getGuaranteed());
+ }
+ for (TempQueuePerPartition q : queues) {
+ q.normalizedGuarantee = Resources.divide(rc, clusterResource,
+ q.getGuaranteed(), activeCap);
+ }
+ }
+ }
+
+ // Take the most underserved TempQueue (the one on the head). Collect and
+ // return the list of all queues that have the same idealAssigned
+ // percentage of guaranteed.
+ private Collection<TempQueuePerPartition> getMostUnderservedQueues(
+ PriorityQueue<TempQueuePerPartition> orderedByNeed,
+ TQComparator tqComparator) {
+ ArrayList<TempQueuePerPartition> underserved = new ArrayList<>();
+ while (!orderedByNeed.isEmpty()) {
+ TempQueuePerPartition q1 = orderedByNeed.remove();
+ underserved.add(q1);
+
+ // Add underserved queues in order for later uses
+ context.addPartitionToUnderServedQueues(q1.queueName, q1.partition);
+ TempQueuePerPartition q2 = orderedByNeed.peek();
+ // q1's pct of guaranteed won't be larger than q2's. If it's less, then
+ // return what has already been collected. Otherwise, q1's pct of
+ // guaranteed == that of q2, so add q2 to underserved list during the
+ // next pass.
+ if (q2 == null || tqComparator.compare(q1, q2) < 0) {
+ if (null != q2) {
+ context.addPartitionToUnderServedQueues(q2.queueName, q2.partition);
+ }
+ return underserved;
+ }
+ }
+ return underserved;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java
new file mode 100644
index 0000000..dbd1f0a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+
+/**
+ * Abstract temporary data-structure for tracking resource availability,pending
+ * resource need, current utilization for app/queue.
+ */
+public class AbstractPreemptionEntity {
+ // Following fields are copied from scheduler
+ final String queueName;
+
+ protected final Resource current;
+ protected final Resource amUsed;
+ protected final Resource reserved;
+ protected Resource pending;
+
+ // Following fields are settled and used by candidate selection policies
+ Resource idealAssigned;
+ Resource toBePreempted;
+ Resource selected;
+ private Resource actuallyToBePreempted;
+ private Resource toBePreemptFromOther;
+
+ AbstractPreemptionEntity(String queueName, Resource usedPerPartition,
+ Resource amUsedPerPartition, Resource reserved,
+ Resource pendingPerPartition) {
+ this.queueName = queueName;
+ this.current = usedPerPartition;
+ this.pending = pendingPerPartition;
+ this.reserved = reserved;
+ this.amUsed = amUsedPerPartition;
+
+ this.idealAssigned = Resource.newInstance(0, 0);
+ this.actuallyToBePreempted = Resource.newInstance(0, 0);
+ this.toBePreempted = Resource.newInstance(0, 0);
+ this.toBePreemptFromOther = Resource.newInstance(0, 0);
+ this.selected = Resource.newInstance(0, 0);
+ }
+
+ public Resource getUsed() {
+ return current;
+ }
+
+ public Resource getUsedDeductAM() {
+ return Resources.subtract(current, amUsed);
+ }
+
+ public Resource getAMUsed() {
+ return amUsed;
+ }
+
+ public Resource getPending() {
+ return pending;
+ }
+
+ public Resource getReserved() {
+ return reserved;
+ }
+
+ public Resource getActuallyToBePreempted() {
+ return actuallyToBePreempted;
+ }
+
+ public void setActuallyToBePreempted(Resource actuallyToBePreempted) {
+ this.actuallyToBePreempted = actuallyToBePreempted;
+ }
+
+ public Resource getToBePreemptFromOther() {
+ return toBePreemptFromOther;
+ }
+
+ public void setToBePreemptFromOther(Resource toBePreemptFromOther) {
+ this.toBePreemptFromOther = toBePreemptFromOther;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
index c52127d..982b1f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
@@ -19,11 +19,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import java.util.Collection;
+import java.util.LinkedHashSet;
import java.util.Set;
interface CapacitySchedulerPreemptionContext {
@@ -49,4 +51,16 @@ interface CapacitySchedulerPreemptionContext {
Set<String> getLeafQueueNames();
Set<String> getAllPartitions();
+
+ int getClusterMaxApplicationPriority();
+
+ Resource getPartitionResource(String partition);
+
+ LinkedHashSet<String> getUnderServedQueuesPerPartition(String partition);
+
+ void addPartitionToUnderServedQueues(String queueName, String partition);
+
+ float getMinimumThresholdForIntraQueuePreemption();
+
+ float getMaxAllowableLimitForIntraQueuePreemption();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
index 42d8730..abad2a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
@@ -19,11 +19,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -40,7 +43,8 @@ public class CapacitySchedulerPreemptionUtils {
continue;
}
- // Only add resToObtainByPartition when actuallyToBePreempted resource >= 0
+ // Only add resToObtainByPartition when actuallyToBePreempted resource >=
+ // 0
if (Resources.greaterThan(context.getResourceCalculator(),
clusterResource, qT.getActuallyToBePreempted(), Resources.none())) {
resToObtainByPartition.put(qT.partition,
@@ -57,8 +61,8 @@ public class CapacitySchedulerPreemptionUtils {
return false;
}
- Set<RMContainer> containers = selectedCandidates.get(
- container.getApplicationAttemptId());
+ Set<RMContainer> containers = selectedCandidates
+ .get(container.getApplicationAttemptId());
if (containers == null) {
return false;
}
@@ -70,8 +74,8 @@ public class CapacitySchedulerPreemptionUtils {
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) {
for (Set<RMContainer> containers : selectedCandidates.values()) {
for (RMContainer c : containers) {
- SchedulerNode schedulerNode = context.getScheduler().getSchedulerNode(
- c.getAllocatedNode());
+ SchedulerNode schedulerNode = context.getScheduler()
+ .getSchedulerNode(c.getAllocatedNode());
if (null == schedulerNode) {
continue;
}
@@ -89,8 +93,113 @@ public class CapacitySchedulerPreemptionUtils {
if (null != res) {
tq.deductActuallyToBePreempted(context.getResourceCalculator(),
tq.totalPartitionResource, res);
+ Collection<TempAppPerPartition> tas = tq.getApps();
+ if (null == tas || tas.isEmpty()) {
+ continue;
+ }
+
+ deductPreemptableResourcePerApp(context, tq.totalPartitionResource,
+ tas, res, partition);
}
}
}
}
+
+ private static void deductPreemptableResourcePerApp(
+ CapacitySchedulerPreemptionContext context,
+ Resource totalPartitionResource, Collection<TempAppPerPartition> tas,
+ Resource res, String partition) {
+ for (TempAppPerPartition ta : tas) {
+ ta.deductActuallyToBePreempted(context.getResourceCalculator(),
+ totalPartitionResource, res, partition);
+ }
+ }
+
+ /**
+ * Invoke this method to preempt container based on resToObtain.
+ *
+ * @param rc
+ * resource calculator
+ * @param context
+ * preemption context
+ * @param resourceToObtainByPartitions
+ * map to hold resource to obtain per partition
+ * @param rmContainer
+ * container
+ * @param clusterResource
+ * total resource
+ * @param preemptMap
+ * map to hold preempted containers
+ * @param totalPreemptionAllowed
+ * total preemption allowed per round
+ * @return should we preempt rmContainer. If we should, deduct from
+ * <code>resourceToObtainByPartition</code>
+ */
+ public static boolean tryPreemptContainerAndDeductResToObtain(
+ ResourceCalculator rc, CapacitySchedulerPreemptionContext context,
+ Map<String, Resource> resourceToObtainByPartitions,
+ RMContainer rmContainer, Resource clusterResource,
+ Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+ Resource totalPreemptionAllowed) {
+ ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId();
+
+ // We will not account resource of a container twice or more
+ if (preemptMapContains(preemptMap, attemptId, rmContainer)) {
+ return false;
+ }
+
+ String nodePartition = getPartitionByNodeId(context,
+ rmContainer.getAllocatedNode());
+ Resource toObtainByPartition = resourceToObtainByPartitions
+ .get(nodePartition);
+
+ if (null != toObtainByPartition
+ && Resources.greaterThan(rc, clusterResource, toObtainByPartition,
+ Resources.none())
+ && Resources.fitsIn(rc, clusterResource,
+ rmContainer.getAllocatedResource(), totalPreemptionAllowed)) {
+ Resources.subtractFrom(toObtainByPartition,
+ rmContainer.getAllocatedResource());
+ Resources.subtractFrom(totalPreemptionAllowed,
+ rmContainer.getAllocatedResource());
+
+ // When we have no more resource need to obtain, remove from map.
+ if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition,
+ Resources.none())) {
+ resourceToObtainByPartitions.remove(nodePartition);
+ }
+
+ // Add to preemptMap
+ addToPreemptMap(preemptMap, attemptId, rmContainer);
+ return true;
+ }
+
+ return false;
+ }
+
+ private static String getPartitionByNodeId(
+ CapacitySchedulerPreemptionContext context, NodeId nodeId) {
+ return context.getScheduler().getSchedulerNode(nodeId).getPartition();
+ }
+
+ private static void addToPreemptMap(
+ Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+ ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) {
+ Set<RMContainer> set = preemptMap.get(appAttemptId);
+ if (null == set) {
+ set = new HashSet<>();
+ preemptMap.put(appAttemptId, set);
+ }
+ set.add(containerToPreempt);
+ }
+
+ private static boolean preemptMapContains(
+ Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+ ApplicationAttemptId attemptId, RMContainer rmContainer) {
+ Set<RMContainer> rmContainers = preemptMap.get(attemptId);
+ if (null == rmContainers) {
+ return false;
+ }
+ return rmContainers.contains(rmContainer);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
index a8c62fd..33e4afc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
@@ -18,11 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@@ -34,9 +32,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -112,9 +107,11 @@ public class FifoCandidatesSelector
// Skip already selected containers
continue;
}
- boolean preempted = tryPreemptContainerAndDeductResToObtain(
- resToObtainByPartition, c, clusterResource, selectedCandidates,
- totalPreemptionAllowed);
+ boolean preempted = CapacitySchedulerPreemptionUtils
+ .tryPreemptContainerAndDeductResToObtain(rc,
+ preemptionContext, resToObtainByPartition, c,
+ clusterResource, selectedCandidates,
+ totalPreemptionAllowed);
if (!preempted) {
continue;
}
@@ -185,9 +182,10 @@ public class FifoCandidatesSelector
break;
}
- boolean preempted =
- tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
- clusterResource, preemptMap, totalPreemptionAllowed);
+ boolean preempted = CapacitySchedulerPreemptionUtils
+ .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
+ resToObtainByPartition, c, clusterResource, preemptMap,
+ totalPreemptionAllowed);
if (preempted) {
Resources.subtractFrom(skippedAMSize, c.getAllocatedResource());
}
@@ -195,68 +193,6 @@ public class FifoCandidatesSelector
skippedAMContainerlist.clear();
}
- private boolean preemptMapContains(
- Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
- ApplicationAttemptId attemptId, RMContainer rmContainer) {
- Set<RMContainer> rmContainers;
- if (null == (rmContainers = preemptMap.get(attemptId))) {
- return false;
- }
- return rmContainers.contains(rmContainer);
- }
-
- /**
- * Return should we preempt rmContainer. If we should, deduct from
- * <code>resourceToObtainByPartition</code>
- */
- private boolean tryPreemptContainerAndDeductResToObtain(
- Map<String, Resource> resourceToObtainByPartitions,
- RMContainer rmContainer, Resource clusterResource,
- Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
- Resource totalPreemptionAllowed) {
- ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId();
-
- // We will not account resource of a container twice or more
- if (preemptMapContains(preemptMap, attemptId, rmContainer)) {
- return false;
- }
-
- String nodePartition = getPartitionByNodeId(rmContainer.getAllocatedNode());
- Resource toObtainByPartition =
- resourceToObtainByPartitions.get(nodePartition);
-
- if (null != toObtainByPartition && Resources.greaterThan(rc,
- clusterResource, toObtainByPartition, Resources.none()) && Resources
- .fitsIn(rc, clusterResource, rmContainer.getAllocatedResource(),
- totalPreemptionAllowed)) {
- Resources.subtractFrom(toObtainByPartition,
- rmContainer.getAllocatedResource());
- Resources.subtractFrom(totalPreemptionAllowed,
- rmContainer.getAllocatedResource());
-
- // When we have no more resource need to obtain, remove from map.
- if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition,
- Resources.none())) {
- resourceToObtainByPartitions.remove(nodePartition);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug(this.getClass().getName() + " Marked container=" + rmContainer
- .getContainerId() + " from partition=" + nodePartition + " queue="
- + rmContainer.getQueueName() + " to be preemption candidates");
- }
- // Add to preemptMap
- addToPreemptMap(preemptMap, attemptId, rmContainer);
- return true;
- }
-
- return false;
- }
-
- private String getPartitionByNodeId(NodeId nodeId) {
- return preemptionContext.getScheduler().getSchedulerNode(nodeId)
- .getPartition();
- }
-
/**
* Given a target preemption for a specific application, select containers
* to preempt (after unreserving all reservation for that app).
@@ -268,10 +204,6 @@ public class FifoCandidatesSelector
Map<ApplicationAttemptId, Set<RMContainer>> selectedContainers,
Resource totalPreemptionAllowed) {
ApplicationAttemptId appId = app.getApplicationAttemptId();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Looking at application=" + app.getApplicationAttemptId()
- + " resourceToObtain=" + resToObtainByPartition);
- }
// first drop reserved containers towards rsrcPreempt
List<RMContainer> reservedContainers =
@@ -286,8 +218,9 @@ public class FifoCandidatesSelector
}
// Try to preempt this container
- tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
- clusterResource, selectedContainers, totalPreemptionAllowed);
+ CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
+ rc, preemptionContext, resToObtainByPartition, c, clusterResource,
+ selectedContainers, totalPreemptionAllowed);
if (!preemptionContext.isObserveOnly()) {
preemptionContext.getRMContext().getDispatcher().getEventHandler()
@@ -328,41 +261,9 @@ public class FifoCandidatesSelector
}
// Try to preempt this container
- tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
- clusterResource, selectedContainers, totalPreemptionAllowed);
- }
- }
-
- /**
- * Compare by reversed priority order first, and then reversed containerId
- * order
- * @param containers
- */
- @VisibleForTesting
- static void sortContainers(List<RMContainer> containers){
- Collections.sort(containers, new Comparator<RMContainer>() {
- @Override
- public int compare(RMContainer a, RMContainer b) {
- Comparator<Priority> c = new org.apache.hadoop.yarn.server
- .resourcemanager.resource.Priority.Comparator();
- int priorityComp = c.compare(b.getContainer().getPriority(),
- a.getContainer().getPriority());
- if (priorityComp != 0) {
- return priorityComp;
- }
- return b.getContainerId().compareTo(a.getContainerId());
- }
- });
- }
-
- private void addToPreemptMap(
- Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
- ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) {
- Set<RMContainer> set;
- if (null == (set = preemptMap.get(appAttemptId))) {
- set = new HashSet<>();
- preemptMap.put(appAttemptId, set);
+ CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
+ rc, preemptionContext, resToObtainByPartition, c, clusterResource,
+ selectedContainers, totalPreemptionAllowed);
}
- set.add(containerToPreempt);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
new file mode 100644
index 0000000..757f567
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * FifoIntraQueuePreemptionPlugin will handle intra-queue preemption for
+ * priority and user-limit.
+ */
+public class FifoIntraQueuePreemptionPlugin
+ implements
+ IntraQueuePreemptionComputePlugin {
+
+ protected final CapacitySchedulerPreemptionContext context;
+ protected final ResourceCalculator rc;
+
+ private static final Log LOG =
+ LogFactory.getLog(FifoIntraQueuePreemptionPlugin.class);
+
+ public FifoIntraQueuePreemptionPlugin(ResourceCalculator rc,
+ CapacitySchedulerPreemptionContext preemptionContext) {
+ this.context = preemptionContext;
+ this.rc = rc;
+ }
+
+ @Override
+ public Map<String, Resource> getResourceDemandFromAppsPerQueue(
+ String queueName, String partition) {
+
+ Map<String, Resource> resToObtainByPartition = new HashMap<>();
+ TempQueuePerPartition tq = context
+ .getQueueByPartition(queueName, partition);
+
+ Collection<TempAppPerPartition> appsOrderedByPriority = tq.getApps();
+ Resource actualPreemptNeeded = resToObtainByPartition.get(partition);
+
+ // Updating pending resource per-partition level.
+ if (actualPreemptNeeded == null) {
+ actualPreemptNeeded = Resources.createResource(0, 0);
+ resToObtainByPartition.put(partition, actualPreemptNeeded);
+ }
+
+ for (TempAppPerPartition a1 : appsOrderedByPriority) {
+ Resources.addTo(actualPreemptNeeded, a1.getActuallyToBePreempted());
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Selected to preempt " + actualPreemptNeeded
+ + " resource from partition:" + partition);
+ }
+ return resToObtainByPartition;
+ }
+
+ @Override
+ public void computeAppsIdealAllocation(Resource clusterResource,
+ Resource partitionBasedResource, TempQueuePerPartition tq,
+ Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+ Resource totalPreemptedResourceAllowed,
+ Resource queueReassignableResource, float maxAllowablePreemptLimit) {
+
+ // 1. AM used resource can be considered as a frozen resource for now.
+ // Hence such containers in a queue can be omitted from the preemption
+ // calculation.
+ Map<String, Resource> perUserAMUsed = new HashMap<String, Resource>();
+ Resource amUsed = calculateUsedAMResourcesPerQueue(tq.partition,
+ tq.leafQueue, perUserAMUsed);
+ Resources.subtractFrom(queueReassignableResource, amUsed);
+
+ // 2. tq.leafQueue will not be null as we validated it in caller side
+ Collection<FiCaSchedulerApp> apps = tq.leafQueue.getAllApplications();
+
+ // We do not need preemption for a single app
+ if (apps.size() == 1) {
+ return;
+ }
+
+ // 3. Create all tempApps for internal calculation and return a list from
+ // high priority to low priority order.
+ TAPriorityComparator taComparator = new TAPriorityComparator();
+ PriorityQueue<TempAppPerPartition> orderedByPriority =
+ createTempAppForResCalculation(tq.partition, apps, taComparator);
+
+ // 4. Calculate idealAssigned per app by checking based on queue's
+ // unallocated resource.Also return apps arranged from lower priority to
+ // higher priority.
+ TreeSet<TempAppPerPartition> orderedApps =
+ calculateIdealAssignedResourcePerApp(clusterResource,
+ partitionBasedResource, tq, selectedCandidates,
+ queueReassignableResource, orderedByPriority, perUserAMUsed);
+
+ // 5. A configurable limit that could define an ideal allowable preemption
+ // limit. Based on current queue's capacity,defined how much % could become
+ // preemptable.
+ Resource maxIntraQueuePreemptable = Resources.multiply(tq.getGuaranteed(),
+ maxAllowablePreemptLimit);
+ if (Resources.greaterThan(rc, clusterResource, maxIntraQueuePreemptable,
+ tq.getActuallyToBePreempted())) {
+ Resources.subtractFrom(maxIntraQueuePreemptable,
+ tq.getActuallyToBePreempted());
+ } else {
+ maxIntraQueuePreemptable = Resource.newInstance(0, 0);
+ }
+
+ // 6. We have two configurations here, one is intra queue limit and second
+ // one is per-round limit for any time preemption. Take a minimum of these
+ Resource preemptionLimit = Resources.min(rc, clusterResource,
+ maxIntraQueuePreemptable, totalPreemptedResourceAllowed);
+
+ // 7. From lowest priority app onwards, calculate toBePreempted resource
+ // based on demand.
+ calculateToBePreemptedResourcePerApp(clusterResource, orderedApps,
+ preemptionLimit);
+
+ // Save all apps (low to high) to temp queue for further reference
+ tq.addAllApps(orderedApps);
+
+ // 8. There are chances that we may preempt for the demand from same
+ // priority level, such cases are to be validated out.
+ validateOutSameAppPriorityFromDemand(clusterResource,
+ (TreeSet<TempAppPerPartition>) tq.getApps());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Queue Name:" + tq.queueName + ", partition:" + tq.partition);
+ for (TempAppPerPartition tmpApp : tq.getApps()) {
+ LOG.debug(tmpApp);
+ }
+ }
+ }
+
+ private void calculateToBePreemptedResourcePerApp(Resource clusterResource,
+ TreeSet<TempAppPerPartition> orderedApps, Resource preemptionLimit) {
+
+ for (TempAppPerPartition tmpApp : orderedApps) {
+ if (Resources.lessThanOrEqual(rc, clusterResource, preemptionLimit,
+ Resources.none())
+ || Resources.lessThanOrEqual(rc, clusterResource, tmpApp.getUsed(),
+ Resources.none())) {
+ continue;
+ }
+
+ Resource preemtableFromApp = Resources.subtract(tmpApp.getUsed(),
+ tmpApp.idealAssigned);
+ Resources.subtractFrom(preemtableFromApp, tmpApp.selected);
+ Resources.subtractFrom(preemtableFromApp, tmpApp.getAMUsed());
+
+ // Calculate toBePreempted from apps as follows:
+ // app.preemptable = min(max(app.used - app.selected - app.ideal, 0),
+ // intra_q_preemptable)
+ tmpApp.toBePreempted = Resources.min(rc, clusterResource, Resources
+ .max(rc, clusterResource, preemtableFromApp, Resources.none()),
+ preemptionLimit);
+
+ preemptionLimit = Resources.subtract(preemptionLimit,
+ tmpApp.toBePreempted);
+ }
+ }
+
+ /**
+ * Algorithm for calculating idealAssigned is as follows:
+ * For each partition:
+ * Q.reassignable = Q.used - Q.selected;
+ *
+ * # By default set ideal assigned 0 for app.
+ * app.idealAssigned as 0
+ * # get user limit from scheduler.
+ * userLimitRes = Q.getUserLimit(userName)
+ *
+ * # initial all value to 0
+ * Map<String, Resource> userToAllocated
+ *
+ * # Loop from highest priority to lowest priority app to calculate ideal
+ * for app in sorted-by(priority) {
+ * if Q.reassignable < 0:
+ * break;
+ *
+ * if (user-to-allocated.get(app.user) < userLimitRes) {
+ * idealAssigned = min((userLimitRes - userToAllocated.get(app.user)),
+ * (app.used + app.pending - app.selected))
+ * app.idealAssigned = min(Q.reassignable, idealAssigned)
+ * userToAllocated.get(app.user) += app.idealAssigned;
+ * } else {
+ * // skip this app because user-limit reached
+ * }
+ * Q.reassignable -= app.idealAssigned
+ * }
+ *
+ * @param clusterResource Cluster Resource
+ * @param partitionBasedResource resource per partition
+ * @param tq TempQueue
+ * @param selectedCandidates Already Selected preemption candidates
+ * @param queueReassignableResource Resource used in a queue
+ * @param orderedByPriority List of running apps
+ * @param perUserAMUsed AM used resource
+ * @return List of temp apps ordered from low to high priority
+ */
+ private TreeSet<TempAppPerPartition> calculateIdealAssignedResourcePerApp(
+ Resource clusterResource, Resource partitionBasedResource,
+ TempQueuePerPartition tq,
+ Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+ Resource queueReassignableResource,
+ PriorityQueue<TempAppPerPartition> orderedByPriority,
+ Map<String, Resource> perUserAMUsed) {
+
+ Comparator<TempAppPerPartition> reverseComp = Collections
+ .reverseOrder(new TAPriorityComparator());
+ TreeSet<TempAppPerPartition> orderedApps = new TreeSet<>(reverseComp);
+
+ Map<String, Resource> userIdealAssignedMapping = new HashMap<>();
+ String partition = tq.partition;
+
+ Map<String, Resource> preCalculatedUserLimit =
+ new HashMap<String, Resource>();
+
+ while (!orderedByPriority.isEmpty()) {
+ // Remove app from the next highest remaining priority and process it to
+ // calculate idealAssigned per app.
+ TempAppPerPartition tmpApp = orderedByPriority.remove();
+ orderedApps.add(tmpApp);
+
+ // Once unallocated resource is 0, we can stop assigning ideal per app.
+ if (Resources.lessThanOrEqual(rc, clusterResource,
+ queueReassignableResource, Resources.none())) {
+ continue;
+ }
+
+ String userName = tmpApp.app.getUser();
+ Resource userLimitResource = preCalculatedUserLimit.get(userName);
+
+ // Verify whether we already calculated headroom for this user.
+ if (userLimitResource == null) {
+ userLimitResource = Resources.clone(tq.leafQueue
+ .getUserLimitPerUser(userName, partitionBasedResource, partition));
+
+ Resource amUsed = perUserAMUsed.get(userName);
+ if (null == amUsed) {
+ amUsed = Resources.createResource(0, 0);
+ }
+
+ // Real AM used need not have to be considered for user-limit as well.
+ userLimitResource = Resources.subtract(userLimitResource, amUsed);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Userlimit for user '" + userName + "' is :"
+ + userLimitResource + ", and amUsed is:" + amUsed);
+ }
+
+ preCalculatedUserLimit.put(userName, userLimitResource);
+ }
+
+ Resource idealAssignedForUser = userIdealAssignedMapping.get(userName);
+
+ if (idealAssignedForUser == null) {
+ idealAssignedForUser = Resources.createResource(0, 0);
+ userIdealAssignedMapping.put(userName, idealAssignedForUser);
+ }
+
+ // Calculate total selected container resources from current app.
+ getAlreadySelectedPreemptionCandidatesResource(selectedCandidates,
+ tmpApp, partition);
+
+ // For any app, used+pending will give its idealAssigned. However it will
+ // be tightly linked to queue's unallocated quota. So lower priority apps
+ // idealAssigned may fall to 0 if higher priority apps demand is more.
+ Resource appIdealAssigned = Resources.add(tmpApp.getUsedDeductAM(),
+ tmpApp.getPending());
+ Resources.subtractFrom(appIdealAssigned, tmpApp.selected);
+
+ if (Resources.lessThan(rc, clusterResource, idealAssignedForUser,
+ userLimitResource)) {
+ appIdealAssigned = Resources.min(rc, clusterResource, appIdealAssigned,
+ Resources.subtract(userLimitResource, idealAssignedForUser));
+ tmpApp.idealAssigned = Resources.clone(Resources.min(rc,
+ clusterResource, queueReassignableResource, appIdealAssigned));
+ Resources.addTo(idealAssignedForUser, tmpApp.idealAssigned);
+ } else {
+ continue;
+ }
+
+ // Also set how much resource is needed by this app from others.
+ Resource appUsedExcludedSelected = Resources
+ .subtract(tmpApp.getUsedDeductAM(), tmpApp.selected);
+ if (Resources.greaterThan(rc, clusterResource, tmpApp.idealAssigned,
+ appUsedExcludedSelected)) {
+ tmpApp.setToBePreemptFromOther(
+ Resources.subtract(tmpApp.idealAssigned, appUsedExcludedSelected));
+ }
+
+ Resources.subtractFrom(queueReassignableResource, tmpApp.idealAssigned);
+ }
+
+ return orderedApps;
+ }
+
+ /*
+ * Previous policies would have already selected few containers from an
+ * application. Calculate total resource from these selected containers.
+ */
+ private void getAlreadySelectedPreemptionCandidatesResource(
+ Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+ TempAppPerPartition tmpApp, String partition) {
+ tmpApp.selected = Resources.createResource(0, 0);
+ Set<RMContainer> containers = selectedCandidates
+ .get(tmpApp.app.getApplicationAttemptId());
+
+ if (containers == null) {
+ return;
+ }
+
+ for (RMContainer cont : containers) {
+ if (partition.equals(cont.getNodeLabelExpression())) {
+ Resources.addTo(tmpApp.selected, cont.getAllocatedResource());
+ }
+ }
+ }
+
+ private PriorityQueue<TempAppPerPartition> createTempAppForResCalculation(
+ String partition, Collection<FiCaSchedulerApp> apps,
+ TAPriorityComparator taComparator) {
+ PriorityQueue<TempAppPerPartition> orderedByPriority = new PriorityQueue<>(
+ 100, taComparator);
+
+ // have an internal temp app structure to store intermediate data(priority)
+ for (FiCaSchedulerApp app : apps) {
+
+ Resource used = app.getAppAttemptResourceUsage().getUsed(partition);
+ Resource amUsed = null;
+ if (!app.isWaitingForAMContainer()) {
+ amUsed = app.getAMResource(partition);
+ }
+ Resource pending = app.getTotalPendingRequestsPerPartition()
+ .get(partition);
+ Resource reserved = app.getAppAttemptResourceUsage()
+ .getReserved(partition);
+
+ used = (used == null) ? Resources.createResource(0, 0) : used;
+ amUsed = (amUsed == null) ? Resources.createResource(0, 0) : amUsed;
+ pending = (pending == null) ? Resources.createResource(0, 0) : pending;
+ reserved = (reserved == null) ? Resources.createResource(0, 0) : reserved;
+
+ HashSet<String> partitions = new HashSet<String>(
+ app.getAppAttemptResourceUsage().getNodePartitionsSet());
+ partitions.addAll(app.getTotalPendingRequestsPerPartition().keySet());
+
+ // Create TempAppPerQueue for further calculation.
+ TempAppPerPartition tmpApp = new TempAppPerPartition(app,
+ Resources.clone(used), Resources.clone(amUsed),
+ Resources.clone(reserved), Resources.clone(pending));
+
+ // Set ideal allocation of app as 0.
+ tmpApp.idealAssigned = Resources.createResource(0, 0);
+
+ orderedByPriority.add(tmpApp);
+ }
+ return orderedByPriority;
+ }
+
+ /*
+ * Fifo+Priority based preemption policy need not have to preempt resources at
+ * same priority level. Such cases will be validated out.
+ */
+ public void validateOutSameAppPriorityFromDemand(Resource cluster,
+ TreeSet<TempAppPerPartition> appsOrderedfromLowerPriority) {
+
+ TempAppPerPartition[] apps = appsOrderedfromLowerPriority
+ .toArray(new TempAppPerPartition[appsOrderedfromLowerPriority.size()]);
+ if (apps.length <= 0) {
+ return;
+ }
+
+ int lPriority = 0;
+ int hPriority = apps.length - 1;
+
+ while (lPriority < hPriority
+ && !apps[lPriority].equals(apps[hPriority])
+ && apps[lPriority].getPriority() < apps[hPriority].getPriority()) {
+ Resource toPreemptFromOther = apps[hPriority]
+ .getToBePreemptFromOther();
+ Resource actuallyToPreempt = apps[lPriority].getActuallyToBePreempted();
+ Resource delta = Resources.subtract(apps[lPriority].toBePreempted,
+ actuallyToPreempt);
+
+ if (Resources.greaterThan(rc, cluster, delta, Resources.none())) {
+ Resource toPreempt = Resources.min(rc, cluster,
+ toPreemptFromOther, delta);
+
+ apps[hPriority].setToBePreemptFromOther(
+ Resources.subtract(toPreemptFromOther, toPreempt));
+ apps[lPriority].setActuallyToBePreempted(
+ Resources.add(actuallyToPreempt, toPreempt));
+ }
+
+ if (Resources.lessThanOrEqual(rc, cluster,
+ apps[lPriority].toBePreempted,
+ apps[lPriority].getActuallyToBePreempted())) {
+ lPriority++;
+ continue;
+ }
+
+ if (Resources.equals(apps[hPriority].getToBePreemptFromOther(),
+ Resources.none())) {
+ hPriority--;
+ continue;
+ }
+ }
+ }
+
+ private Resource calculateUsedAMResourcesPerQueue(String partition,
+ LeafQueue leafQueue, Map<String, Resource> perUserAMUsed) {
+ Collection<FiCaSchedulerApp> runningApps = leafQueue.getApplications();
+ Resource amUsed = Resources.createResource(0, 0);
+
+ for (FiCaSchedulerApp app : runningApps) {
+ Resource userAMResource = perUserAMUsed.get(app.getUser());
+ if (null == userAMResource) {
+ userAMResource = Resources.createResource(0, 0);
+ perUserAMUsed.put(app.getUser(), userAMResource);
+ }
+
+ Resources.addTo(userAMResource, app.getAMResource(partition));
+ Resources.addTo(amUsed, app.getAMResource(partition));
+ }
+ return amUsed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
new file mode 100644
index 0000000..039b53e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Identifies over utilized resources within a queue and tries to normalize
+ * them to resolve resource allocation anomalies w.r.t priority and user-limit.
+ */
+public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
+
+ @SuppressWarnings("serial")
+ static class TAPriorityComparator
+ implements
+ Serializable,
+ Comparator<TempAppPerPartition> {
+
+ @Override
+ public int compare(TempAppPerPartition tq1, TempAppPerPartition tq2) {
+ Priority p1 = Priority.newInstance(tq1.getPriority());
+ Priority p2 = Priority.newInstance(tq2.getPriority());
+
+ if (!p1.equals(p2)) {
+ return p1.compareTo(p2);
+ }
+ return tq1.getApplicationId().compareTo(tq2.getApplicationId());
+ }
+ }
+
+ IntraQueuePreemptionComputePlugin fifoPreemptionComputePlugin = null;
+ final CapacitySchedulerPreemptionContext context;
+
+ private static final Log LOG =
+ LogFactory.getLog(IntraQueueCandidatesSelector.class);
+
+ IntraQueueCandidatesSelector(
+ CapacitySchedulerPreemptionContext preemptionContext) {
+ super(preemptionContext);
+ fifoPreemptionComputePlugin = new FifoIntraQueuePreemptionPlugin(rc,
+ preemptionContext);
+ context = preemptionContext;
+ }
+
+ @Override
+ public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
+ Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+ Resource clusterResource, Resource totalPreemptedResourceAllowed) {
+
+ // 1. Calculate the abnormality within each queue one by one.
+ computeIntraQueuePreemptionDemand(
+ clusterResource, totalPreemptedResourceAllowed, selectedCandidates);
+
+ // 2. Previous selectors (with higher priority) could have already
+ // selected containers. We need to deduct pre-emptable resources
+ // based on already selected candidates.
+ CapacitySchedulerPreemptionUtils
+ .deductPreemptableResourcesBasedSelectedCandidates(preemptionContext,
+ selectedCandidates);
+
+ // 3. Loop through all partitions to select containers for preemption.
+ for (String partition : preemptionContext.getAllPartitions()) {
+ LinkedHashSet<String> queueNames = preemptionContext
+ .getUnderServedQueuesPerPartition(partition);
+
+ // Error check to handle non-mapped labels to queue.
+ if (null == queueNames) {
+ continue;
+ }
+
+ // 4. Iterate from most under-served queue in order.
+ for (String queueName : queueNames) {
+ LeafQueue leafQueue = preemptionContext.getQueueByPartition(queueName,
+ RMNodeLabelsManager.NO_LABEL).leafQueue;
+
+ // skip if not a leafqueue
+ if (null == leafQueue) {
+ continue;
+ }
+
+ // 5. Calculate the resource to obtain per partition
+ Map<String, Resource> resToObtainByPartition = fifoPreemptionComputePlugin
+ .getResourceDemandFromAppsPerQueue(queueName, partition);
+
+ // 6. Based on the selected resource demand per partition, select
+ // containers with known policy from inter-queue preemption.
+ synchronized (leafQueue) {
+ Iterator<FiCaSchedulerApp> desc = leafQueue.getOrderingPolicy()
+ .getPreemptionIterator();
+ while (desc.hasNext()) {
+ FiCaSchedulerApp app = desc.next();
+ preemptFromLeastStarvedApp(selectedCandidates, clusterResource,
+ totalPreemptedResourceAllowed, resToObtainByPartition,
+ leafQueue, app);
+ }
+ }
+ }
+ }
+
+ return selectedCandidates;
+ }
+
+ private void preemptFromLeastStarvedApp(
+ Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+ Resource clusterResource, Resource totalPreemptedResourceAllowed,
+ Map<String, Resource> resToObtainByPartition, LeafQueue leafQueue,
+ FiCaSchedulerApp app) {
+
+ // ToDo: Reuse reservation selector here.
+
+ List<RMContainer> liveContainers = new ArrayList<>(
+ app.getLiveContainers());
+ sortContainers(liveContainers);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "totalPreemptedResourceAllowed for preemption at this round is :"
+ + totalPreemptedResourceAllowed);
+ }
+
+ for (RMContainer c : liveContainers) {
+
+ // if there are no demand, return.
+ if (resToObtainByPartition.isEmpty()) {
+ return;
+ }
+
+ // skip preselected containers.
+ if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,
+ selectedCandidates)) {
+ continue;
+ }
+
+ // Skip already marked to killable containers
+ if (null != preemptionContext.getKillableContainers() && preemptionContext
+ .getKillableContainers().contains(c.getContainerId())) {
+ continue;
+ }
+
+ // Skip AM Container from preemption for now.
+ if (c.isAMContainer()) {
+ continue;
+ }
+
+ // Try to preempt this container
+ CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
+ rc, preemptionContext, resToObtainByPartition, c, clusterResource,
+ selectedCandidates, totalPreemptedResourceAllowed);
+ }
+
+ }
+
+ private void computeIntraQueuePreemptionDemand(Resource clusterResource,
+ Resource totalPreemptedResourceAllowed,
+ Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) {
+
+ // 1. Iterate through all partition to calculate demand within a partition.
+ for (String partition : context.getAllPartitions()) {
+ LinkedHashSet<String> queueNames = context
+ .getUnderServedQueuesPerPartition(partition);
+
+ if (null == queueNames) {
+ continue;
+ }
+
+ // 2. Its better to get partition based resource limit earlier before
+ // starting calculation
+ Resource partitionBasedResource =
+ context.getPartitionResource(partition);
+
+ // 3. loop through all queues corresponding to a partition.
+ for (String queueName : queueNames) {
+ TempQueuePerPartition tq = context.getQueueByPartition(queueName,
+ partition);
+ LeafQueue leafQueue = tq.leafQueue;
+
+ // skip if its parent queue
+ if (null == leafQueue) {
+ continue;
+ }
+
+ // 4. Consider reassignableResource as (used - actuallyToBePreempted).
+ // This provides as upper limit to split apps quota in a queue.
+ Resource queueReassignableResource = Resources.subtract(tq.getUsed(),
+ tq.getActuallyToBePreempted());
+
+ // 5. Check queue's used capacity. Make sure that the used capacity is
+ // above certain limit to consider for intra queue preemption.
+ if (leafQueue.getQueueCapacities().getUsedCapacity(partition) < context
+ .getMinimumThresholdForIntraQueuePreemption()) {
+ continue;
+ }
+
+ // 6. compute the allocation of all apps based on queue's unallocated
+ // capacity
+ fifoPreemptionComputePlugin.computeAppsIdealAllocation(clusterResource,
+ partitionBasedResource, tq, selectedCandidates,
+ totalPreemptedResourceAllowed,
+ queueReassignableResource,
+ context.getMaxAllowableLimitForIntraQueuePreemption());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java
new file mode 100644
index 0000000..93ebe65
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
+
+interface IntraQueuePreemptionComputePlugin {
+
+ Map<String, Resource> getResourceDemandFromAppsPerQueue(String queueName,
+ String partition);
+
+ void computeAppsIdealAllocation(Resource clusterResource,
+ Resource partitionBasedResource, TempQueuePerPartition tq,
+ Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+ Resource totalPreemptedResourceAllowed, Resource queueTotalUnassigned,
+ float maxAllowablePreemptLimit);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
index 103b419..3017e8f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
@@ -27,61 +27,22 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
-import java.util.PriorityQueue;
import java.util.Set;
/**
* Calculate how much resources need to be preempted for each queue,
* will be used by {@link PreemptionCandidatesSelector}
*/
-public class PreemptableResourceCalculator {
+public class PreemptableResourceCalculator
+ extends
+ AbstractPreemptableResourceCalculator {
private static final Log LOG =
LogFactory.getLog(PreemptableResourceCalculator.class);
- private final CapacitySchedulerPreemptionContext context;
- private final ResourceCalculator rc;
private boolean isReservedPreemptionCandidatesSelector;
- static class TQComparator implements Comparator<TempQueuePerPartition> {
- private ResourceCalculator rc;
- private Resource clusterRes;
-
- TQComparator(ResourceCalculator rc, Resource clusterRes) {
- this.rc = rc;
- this.clusterRes = clusterRes;
- }
-
- @Override
- public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) {
- if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) {
- return -1;
- }
- if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) {
- return 1;
- }
- return 0;
- }
-
- // Calculates idealAssigned / guaranteed
- // TempQueues with 0 guarantees are always considered the most over
- // capacity and therefore considered last for resources.
- private double getIdealPctOfGuaranteed(TempQueuePerPartition q) {
- double pctOver = Integer.MAX_VALUE;
- if (q != null && Resources.greaterThan(rc, clusterRes,
- q.getGuaranteed(),
- Resources.none())) {
- pctOver = Resources.divide(rc, clusterRes, q.idealAssigned,
- q.getGuaranteed());
- }
- return (pctOver);
- }
- }
-
/**
* PreemptableResourceCalculator constructor
*
@@ -93,136 +54,7 @@ public class PreemptableResourceCalculator {
public PreemptableResourceCalculator(
CapacitySchedulerPreemptionContext preemptionContext,
boolean isReservedPreemptionCandidatesSelector) {
- context = preemptionContext;
- rc = preemptionContext.getResourceCalculator();
- this.isReservedPreemptionCandidatesSelector =
- isReservedPreemptionCandidatesSelector;
- }
-
- /**
- * Computes a normalizedGuaranteed capacity based on active queues
- * @param rc resource calculator
- * @param clusterResource the total amount of resources in the cluster
- * @param queues the list of queues to consider
- */
- private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
- Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
- Resource activeCap = Resource.newInstance(0, 0);
-
- if (ignoreGuar) {
- for (TempQueuePerPartition q : queues) {
- q.normalizedGuarantee = 1.0f / queues.size();
- }
- } else {
- for (TempQueuePerPartition q : queues) {
- Resources.addTo(activeCap, q.getGuaranteed());
- }
- for (TempQueuePerPartition q : queues) {
- q.normalizedGuarantee = Resources.divide(rc, clusterResource,
- q.getGuaranteed(), activeCap);
- }
- }
- }
-
- // Take the most underserved TempQueue (the one on the head). Collect and
- // return the list of all queues that have the same idealAssigned
- // percentage of guaranteed.
- protected Collection<TempQueuePerPartition> getMostUnderservedQueues(
- PriorityQueue<TempQueuePerPartition> orderedByNeed,
- TQComparator tqComparator) {
- ArrayList<TempQueuePerPartition> underserved = new ArrayList<>();
- while (!orderedByNeed.isEmpty()) {
- TempQueuePerPartition q1 = orderedByNeed.remove();
- underserved.add(q1);
- TempQueuePerPartition q2 = orderedByNeed.peek();
- // q1's pct of guaranteed won't be larger than q2's. If it's less, then
- // return what has already been collected. Otherwise, q1's pct of
- // guaranteed == that of q2, so add q2 to underserved list during the
- // next pass.
- if (q2 == null || tqComparator.compare(q1,q2) < 0) {
- return underserved;
- }
- }
- return underserved;
- }
-
-
- /**
- * Given a set of queues compute the fix-point distribution of unassigned
- * resources among them. As pending request of a queue are exhausted, the
- * queue is removed from the set and remaining capacity redistributed among
- * remaining queues. The distribution is weighted based on guaranteed
- * capacity, unless asked to ignoreGuarantee, in which case resources are
- * distributed uniformly.
- */
- private void computeFixpointAllocation(ResourceCalculator rc,
- Resource tot_guarant, Collection<TempQueuePerPartition> qAlloc,
- Resource unassigned, boolean ignoreGuarantee) {
- // Prior to assigning the unused resources, process each queue as follows:
- // If current > guaranteed, idealAssigned = guaranteed + untouchable extra
- // Else idealAssigned = current;
- // Subtract idealAssigned resources from unassigned.
- // If the queue has all of its needs met (that is, if
- // idealAssigned >= current + pending), remove it from consideration.
- // Sort queues from most under-guaranteed to most over-guaranteed.
- TQComparator tqComparator = new TQComparator(rc, tot_guarant);
- PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,
- tqComparator);
- for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
- TempQueuePerPartition q = i.next();
- Resource used = q.getUsed();
-
- if (Resources.greaterThan(rc, tot_guarant, used,
- q.getGuaranteed())) {
- q.idealAssigned = Resources.add(
- q.getGuaranteed(), q.untouchableExtra);
- } else {
- q.idealAssigned = Resources.clone(used);
- }
- Resources.subtractFrom(unassigned, q.idealAssigned);
- // If idealAssigned < (allocated + used + pending), q needs more resources, so
- // add it to the list of underserved queues, ordered by need.
- Resource curPlusPend = Resources.add(q.getUsed(), q.pending);
- if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) {
- orderedByNeed.add(q);
- }
- }
-
- //assign all cluster resources until no more demand, or no resources are left
- while (!orderedByNeed.isEmpty()
- && Resources.greaterThan(rc,tot_guarant, unassigned,Resources.none())) {
- Resource wQassigned = Resource.newInstance(0, 0);
- // we compute normalizedGuarantees capacity based on currently active
- // queues
- resetCapacity(rc, unassigned, orderedByNeed, ignoreGuarantee);
-
- // For each underserved queue (or set of queues if multiple are equally
- // underserved), offer its share of the unassigned resources based on its
- // normalized guarantee. After the offer, if the queue is not satisfied,
- // place it back in the ordered list of queues, recalculating its place
- // in the order of most under-guaranteed to most over-guaranteed. In this
- // way, the most underserved queue(s) are always given resources first.
- Collection<TempQueuePerPartition> underserved =
- getMostUnderservedQueues(orderedByNeed, tqComparator);
- for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
- .hasNext();) {
- TempQueuePerPartition sub = i.next();
- Resource wQavail = Resources.multiplyAndNormalizeUp(rc,
- unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1));
- Resource wQidle = sub.offer(wQavail, rc, tot_guarant,
- isReservedPreemptionCandidatesSelector);
- Resource wQdone = Resources.subtract(wQavail, wQidle);
-
- if (Resources.greaterThan(rc, tot_guarant,
- wQdone, Resources.none())) {
- // The queue is still asking for more. Put it back in the priority
- // queue, recalculating its order based on need.
- orderedByNeed.add(sub);
- }
- Resources.addTo(wQassigned, wQdone);
- }
- Resources.subtractFrom(unassigned, wQassigned);
- }
+ super(preemptionContext, isReservedPreemptionCandidatesSelector);
}
/**
@@ -263,14 +95,14 @@ public class PreemptableResourceCalculator {
}
// first compute the allocation as a fixpoint based on guaranteed capacity
- computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned,
+ computeFixpointAllocation(tot_guarant, nonZeroGuarQueues, unassigned,
false);
// if any capacity is left unassigned, distributed among zero-guarantee
// queues uniformly (i.e., not based on guaranteed capacity, as this is zero)
if (!zeroGuarQueues.isEmpty()
&& Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) {
- computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned,
+ computeFixpointAllocation(tot_guarant, zeroGuarQueues, unassigned,
true);
}
@@ -321,13 +153,12 @@ public class PreemptableResourceCalculator {
computeIdealResourceDistribution(rc, root.getChildren(),
totalPreemptionAllowed, root.idealAssigned);
// compute recursively for lower levels and build list of leafs
- for(TempQueuePerPartition t : root.getChildren()) {
+ for (TempQueuePerPartition t : root.getChildren()) {
recursivelyComputeIdealAssignment(t, totalPreemptionAllowed);
}
}
}
-
private void calculateResToObtainByPartitionForLeafQueues(
Set<String> leafQueueNames, Resource clusterResource) {
// Loop all leaf queues
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
index dd33d8f..c74e34e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
@@ -19,10 +19,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -41,7 +47,7 @@ public abstract class PreemptionCandidatesSelector {
* selected candidates.
*
* @param selectedCandidates already selected candidates from previous policies
- * @param clusterResource
+ * @param clusterResource total resource
* @param totalPreemptedResourceAllowed how many resources allowed to be
* preempted in this round
* @return merged selected candidates.
@@ -49,4 +55,28 @@ public abstract class PreemptionCandidatesSelector {
public abstract Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource, Resource totalPreemptedResourceAllowed);
+
+ /**
+ * Compare by reversed priority order first, and then reversed containerId
+ * order.
+ *
+ * @param containers list of containers to sort for.
+ */
+ @VisibleForTesting
+ static void sortContainers(List<RMContainer> containers) {
+ Collections.sort(containers, new Comparator<RMContainer>() {
+ @Override
+ public int compare(RMContainer a, RMContainer b) {
+ Comparator<Priority> c = new org.apache.hadoop.yarn.server
+ .resourcemanager.resource.Priority.Comparator();
+ int priorityComp = c.compare(b.getContainer().getPriority(),
+ a.getContainer().getPriority());
+ if (priorityComp != 0) {
+ return priorityComp;
+ }
+ return b.getContainerId().compareTo(a.getContainerId());
+ }
+ });
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org