You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2016/12/13 17:58:56 UTC
[2/2] hadoop git commit: YARN-2009. CapacityScheduler: Add
intra-queue preemption for app priority support. (Sunil G via wangda)
YARN-2009. CapacityScheduler: Add intra-queue preemption for app priority support. (Sunil G via wangda)
(cherry-picked from commit 90dd3a8148468ac37a3f2173ad8d45e38bfcb0c9)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6a18ae84
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6a18ae84
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6a18ae84
Branch: refs/heads/branch-2.8
Commit: 6a18ae849faa9becccd66b382296213a6c08842e
Parents: 01b50b3
Author: Wangda Tan <wa...@apache.org>
Authored: Mon Oct 31 15:18:31 2016 -0700
Committer: Eric Payne <ep...@apache.org>
Committed: Mon Dec 12 23:13:08 2016 +0000
----------------------------------------------------------------------
.../AbstractPreemptableResourceCalculator.java | 244 ++++++
.../capacity/AbstractPreemptionEntity.java | 98 +++
.../CapacitySchedulerPreemptionContext.java | 14 +
.../CapacitySchedulerPreemptionUtils.java | 119 ++-
.../capacity/FifoCandidatesSelector.java | 129 +--
.../FifoIntraQueuePreemptionPlugin.java | 459 ++++++++++
.../capacity/IntraQueueCandidatesSelector.java | 238 +++++
.../IntraQueuePreemptionComputePlugin.java | 39 +
.../capacity/PreemptableResourceCalculator.java | 183 +---
.../capacity/PreemptionCandidatesSelector.java | 32 +-
.../ProportionalCapacityPreemptionPolicy.java | 86 +-
.../monitor/capacity/TempAppPerPartition.java | 101 +++
.../monitor/capacity/TempQueuePerPartition.java | 142 ++-
.../CapacitySchedulerConfiguration.java | 31 +
.../scheduler/capacity/LeafQueue.java | 40 +-
.../scheduler/common/fica/FiCaSchedulerApp.java | 18 +
...alCapacityPreemptionPolicyMockFramework.java | 126 ++-
...ionalCapacityPreemptionPolicyIntraQueue.java | 868 +++++++++++++++++++
18 files changed, 2552 insertions(+), 415 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
new file mode 100644
index 0000000..8255a30
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+/**
+ * Calculate how much resources need to be preempted for each queue,
+ * will be used by {@link PreemptionCandidatesSelector}.
+ */
+public class AbstractPreemptableResourceCalculator {
+
+ protected final CapacitySchedulerPreemptionContext context;
+ protected final ResourceCalculator rc;
+ private boolean isReservedPreemptionCandidatesSelector;
+
+ static class TQComparator implements Comparator<TempQueuePerPartition> {
+ private ResourceCalculator rc;
+ private Resource clusterRes;
+
+ TQComparator(ResourceCalculator rc, Resource clusterRes) {
+ this.rc = rc;
+ this.clusterRes = clusterRes;
+ }
+
+ @Override
+ public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) {
+ if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) {
+ return -1;
+ }
+ if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) {
+ return 1;
+ }
+ return 0;
+ }
+
+ // Calculates idealAssigned / guaranteed
+ // TempQueues with 0 guarantees are always considered the most over
+ // capacity and therefore considered last for resources.
+ private double getIdealPctOfGuaranteed(TempQueuePerPartition q) {
+ double pctOver = Integer.MAX_VALUE;
+ if (q != null && Resources.greaterThan(rc, clusterRes, q.getGuaranteed(),
+ Resources.none())) {
+ pctOver = Resources.divide(rc, clusterRes, q.idealAssigned,
+ q.getGuaranteed());
+ }
+ return (pctOver);
+ }
+ }
+
+ /**
+ * PreemptableResourceCalculator constructor.
+ *
+ * @param preemptionContext context
+ * @param isReservedPreemptionCandidatesSelector
+ * this will be set by different implementation of candidate
+ * selectors, please refer to TempQueuePerPartition#offer for
+ * details.
+ */
+ public AbstractPreemptableResourceCalculator(
+ CapacitySchedulerPreemptionContext preemptionContext,
+ boolean isReservedPreemptionCandidatesSelector) {
+ context = preemptionContext;
+ rc = preemptionContext.getResourceCalculator();
+ this.isReservedPreemptionCandidatesSelector =
+ isReservedPreemptionCandidatesSelector;
+ }
+
+ /**
+ * Given a set of queues compute the fix-point distribution of unassigned
+ * resources among them. As pending request of a queue are exhausted, the
+ * queue is removed from the set and remaining capacity redistributed among
+ * remaining queues. The distribution is weighted based on guaranteed
+ * capacity, unless asked to ignoreGuarantee, in which case resources are
+ * distributed uniformly.
+ *
+ * @param totGuarant
+ * total guaranteed resource
+ * @param qAlloc
+ * List of child queues
+ * @param unassigned
+ * Unassigned resource per queue
+ * @param ignoreGuarantee
+ * ignore guarantee per queue.
+ */
+ protected void computeFixpointAllocation(Resource totGuarant,
+ Collection<TempQueuePerPartition> qAlloc, Resource unassigned,
+ boolean ignoreGuarantee) {
+ // Prior to assigning the unused resources, process each queue as follows:
+ // If current > guaranteed, idealAssigned = guaranteed + untouchable extra
+ // Else idealAssigned = current;
+ // Subtract idealAssigned resources from unassigned.
+ // If the queue has all of its needs met (that is, if
+ // idealAssigned >= current + pending), remove it from consideration.
+ // Sort queues from most under-guaranteed to most over-guaranteed.
+ TQComparator tqComparator = new TQComparator(rc, totGuarant);
+ PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,
+ tqComparator);
+ for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
+ TempQueuePerPartition q = i.next();
+ Resource used = q.getUsed();
+
+ if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) {
+ q.idealAssigned = Resources.add(q.getGuaranteed(), q.untouchableExtra);
+ } else {
+ q.idealAssigned = Resources.clone(used);
+ }
+ Resources.subtractFrom(unassigned, q.idealAssigned);
+ // If idealAssigned < (allocated + used + pending), q needs more
+ // resources, so
+ // add it to the list of underserved queues, ordered by need.
+ Resource curPlusPend = Resources.add(q.getUsed(), q.pending);
+ if (Resources.lessThan(rc, totGuarant, q.idealAssigned, curPlusPend)) {
+ orderedByNeed.add(q);
+ }
+ }
+
+ // assign all cluster resources until no more demand, or no resources are
+ // left
+ while (!orderedByNeed.isEmpty() && Resources.greaterThan(rc, totGuarant,
+ unassigned, Resources.none())) {
+ Resource wQassigned = Resource.newInstance(0, 0);
+ // we compute normalizedGuarantees capacity based on currently active
+ // queues
+ resetCapacity(unassigned, orderedByNeed, ignoreGuarantee);
+
+ // For each underserved queue (or set of queues if multiple are equally
+ // underserved), offer its share of the unassigned resources based on its
+ // normalized guarantee. After the offer, if the queue is not satisfied,
+ // place it back in the ordered list of queues, recalculating its place
+ // in the order of most under-guaranteed to most over-guaranteed. In this
+ // way, the most underserved queue(s) are always given resources first.
+ Collection<TempQueuePerPartition> underserved = getMostUnderservedQueues(
+ orderedByNeed, tqComparator);
+ for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
+ .hasNext();) {
+ TempQueuePerPartition sub = i.next();
+ Resource wQavail = Resources.multiplyAndNormalizeUp(rc, unassigned,
+ sub.normalizedGuarantee, Resource.newInstance(1, 1));
+ Resource wQidle = sub.offer(wQavail, rc, totGuarant,
+ isReservedPreemptionCandidatesSelector);
+ Resource wQdone = Resources.subtract(wQavail, wQidle);
+
+ if (Resources.greaterThan(rc, totGuarant, wQdone, Resources.none())) {
+ // The queue is still asking for more. Put it back in the priority
+ // queue, recalculating its order based on need.
+ orderedByNeed.add(sub);
+ }
+ Resources.addTo(wQassigned, wQdone);
+ }
+ Resources.subtractFrom(unassigned, wQassigned);
+ }
+
+ // Sometimes its possible that, all queues are properly served. So intra
+ // queue preemption will not try for any preemption. How ever there are
+ // chances that within a queue, there are some imbalances. Hence make sure
+ // all queues are added to list.
+ while (!orderedByNeed.isEmpty()) {
+ TempQueuePerPartition q1 = orderedByNeed.remove();
+ context.addPartitionToUnderServedQueues(q1.queueName, q1.partition);
+ }
+ }
+
+ /**
+ * Computes a normalizedGuaranteed capacity based on active queues.
+ *
+ * @param clusterResource
+ * the total amount of resources in the cluster
+ * @param queues
+ * the list of queues to consider
+ * @param ignoreGuar
+ * ignore guarantee.
+ */
+ private void resetCapacity(Resource clusterResource,
+ Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
+ Resource activeCap = Resource.newInstance(0, 0);
+
+ if (ignoreGuar) {
+ for (TempQueuePerPartition q : queues) {
+ q.normalizedGuarantee = 1.0f / queues.size();
+ }
+ } else {
+ for (TempQueuePerPartition q : queues) {
+ Resources.addTo(activeCap, q.getGuaranteed());
+ }
+ for (TempQueuePerPartition q : queues) {
+ q.normalizedGuarantee = Resources.divide(rc, clusterResource,
+ q.getGuaranteed(), activeCap);
+ }
+ }
+ }
+
+ // Take the most underserved TempQueue (the one on the head). Collect and
+ // return the list of all queues that have the same idealAssigned
+ // percentage of guaranteed.
+ private Collection<TempQueuePerPartition> getMostUnderservedQueues(
+ PriorityQueue<TempQueuePerPartition> orderedByNeed,
+ TQComparator tqComparator) {
+ ArrayList<TempQueuePerPartition> underserved = new ArrayList<>();
+ while (!orderedByNeed.isEmpty()) {
+ TempQueuePerPartition q1 = orderedByNeed.remove();
+ underserved.add(q1);
+
+ // Add underserved queues in order for later uses
+ context.addPartitionToUnderServedQueues(q1.queueName, q1.partition);
+ TempQueuePerPartition q2 = orderedByNeed.peek();
+ // q1's pct of guaranteed won't be larger than q2's. If it's less, then
+ // return what has already been collected. Otherwise, q1's pct of
+ // guaranteed == that of q2, so add q2 to underserved list during the
+ // next pass.
+ if (q2 == null || tqComparator.compare(q1, q2) < 0) {
+ if (null != q2) {
+ context.addPartitionToUnderServedQueues(q2.queueName, q2.partition);
+ }
+ return underserved;
+ }
+ }
+ return underserved;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java
new file mode 100644
index 0000000..dbd1f0a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+
+/**
+ * Abstract temporary data-structure for tracking resource availability,pending
+ * resource need, current utilization for app/queue.
+ */
+public class AbstractPreemptionEntity {
+ // Following fields are copied from scheduler
+ final String queueName;
+
+ protected final Resource current;
+ protected final Resource amUsed;
+ protected final Resource reserved;
+ protected Resource pending;
+
+ // Following fields are settled and used by candidate selection policies
+ Resource idealAssigned;
+ Resource toBePreempted;
+ Resource selected;
+ private Resource actuallyToBePreempted;
+ private Resource toBePreemptFromOther;
+
+ AbstractPreemptionEntity(String queueName, Resource usedPerPartition,
+ Resource amUsedPerPartition, Resource reserved,
+ Resource pendingPerPartition) {
+ this.queueName = queueName;
+ this.current = usedPerPartition;
+ this.pending = pendingPerPartition;
+ this.reserved = reserved;
+ this.amUsed = amUsedPerPartition;
+
+ this.idealAssigned = Resource.newInstance(0, 0);
+ this.actuallyToBePreempted = Resource.newInstance(0, 0);
+ this.toBePreempted = Resource.newInstance(0, 0);
+ this.toBePreemptFromOther = Resource.newInstance(0, 0);
+ this.selected = Resource.newInstance(0, 0);
+ }
+
+ public Resource getUsed() {
+ return current;
+ }
+
+ public Resource getUsedDeductAM() {
+ return Resources.subtract(current, amUsed);
+ }
+
+ public Resource getAMUsed() {
+ return amUsed;
+ }
+
+ public Resource getPending() {
+ return pending;
+ }
+
+ public Resource getReserved() {
+ return reserved;
+ }
+
+ public Resource getActuallyToBePreempted() {
+ return actuallyToBePreempted;
+ }
+
+ public void setActuallyToBePreempted(Resource actuallyToBePreempted) {
+ this.actuallyToBePreempted = actuallyToBePreempted;
+ }
+
+ public Resource getToBePreemptFromOther() {
+ return toBePreemptFromOther;
+ }
+
+ public void setToBePreemptFromOther(Resource toBePreemptFromOther) {
+ this.toBePreemptFromOther = toBePreemptFromOther;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
index c52127d..982b1f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
@@ -19,11 +19,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import java.util.Collection;
+import java.util.LinkedHashSet;
import java.util.Set;
interface CapacitySchedulerPreemptionContext {
@@ -49,4 +51,16 @@ interface CapacitySchedulerPreemptionContext {
Set<String> getLeafQueueNames();
Set<String> getAllPartitions();
+
+ int getClusterMaxApplicationPriority();
+
+ Resource getPartitionResource(String partition);
+
+ LinkedHashSet<String> getUnderServedQueuesPerPartition(String partition);
+
+ void addPartitionToUnderServedQueues(String queueName, String partition);
+
+ float getMinimumThresholdForIntraQueuePreemption();
+
+ float getMaxAllowableLimitForIntraQueuePreemption();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
index 42d8730..abad2a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
@@ -19,11 +19,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -40,7 +43,8 @@ public class CapacitySchedulerPreemptionUtils {
continue;
}
- // Only add resToObtainByPartition when actuallyToBePreempted resource >= 0
+ // Only add resToObtainByPartition when actuallyToBePreempted resource >=
+ // 0
if (Resources.greaterThan(context.getResourceCalculator(),
clusterResource, qT.getActuallyToBePreempted(), Resources.none())) {
resToObtainByPartition.put(qT.partition,
@@ -57,8 +61,8 @@ public class CapacitySchedulerPreemptionUtils {
return false;
}
- Set<RMContainer> containers = selectedCandidates.get(
- container.getApplicationAttemptId());
+ Set<RMContainer> containers = selectedCandidates
+ .get(container.getApplicationAttemptId());
if (containers == null) {
return false;
}
@@ -70,8 +74,8 @@ public class CapacitySchedulerPreemptionUtils {
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) {
for (Set<RMContainer> containers : selectedCandidates.values()) {
for (RMContainer c : containers) {
- SchedulerNode schedulerNode = context.getScheduler().getSchedulerNode(
- c.getAllocatedNode());
+ SchedulerNode schedulerNode = context.getScheduler()
+ .getSchedulerNode(c.getAllocatedNode());
if (null == schedulerNode) {
continue;
}
@@ -89,8 +93,113 @@ public class CapacitySchedulerPreemptionUtils {
if (null != res) {
tq.deductActuallyToBePreempted(context.getResourceCalculator(),
tq.totalPartitionResource, res);
+ Collection<TempAppPerPartition> tas = tq.getApps();
+ if (null == tas || tas.isEmpty()) {
+ continue;
+ }
+
+ deductPreemptableResourcePerApp(context, tq.totalPartitionResource,
+ tas, res, partition);
}
}
}
}
+
+ private static void deductPreemptableResourcePerApp(
+ CapacitySchedulerPreemptionContext context,
+ Resource totalPartitionResource, Collection<TempAppPerPartition> tas,
+ Resource res, String partition) {
+ for (TempAppPerPartition ta : tas) {
+ ta.deductActuallyToBePreempted(context.getResourceCalculator(),
+ totalPartitionResource, res, partition);
+ }
+ }
+
+ /**
+ * Invoke this method to preempt container based on resToObtain.
+ *
+ * @param rc
+ * resource calculator
+ * @param context
+ * preemption context
+ * @param resourceToObtainByPartitions
+ * map to hold resource to obtain per partition
+ * @param rmContainer
+ * container
+ * @param clusterResource
+ * total resource
+ * @param preemptMap
+ * map to hold preempted containers
+ * @param totalPreemptionAllowed
+ * total preemption allowed per round
+ * @return should we preempt rmContainer. If we should, deduct from
+ * <code>resourceToObtainByPartition</code>
+ */
+ public static boolean tryPreemptContainerAndDeductResToObtain(
+ ResourceCalculator rc, CapacitySchedulerPreemptionContext context,
+ Map<String, Resource> resourceToObtainByPartitions,
+ RMContainer rmContainer, Resource clusterResource,
+ Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+ Resource totalPreemptionAllowed) {
+ ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId();
+
+ // We will not account resource of a container twice or more
+ if (preemptMapContains(preemptMap, attemptId, rmContainer)) {
+ return false;
+ }
+
+ String nodePartition = getPartitionByNodeId(context,
+ rmContainer.getAllocatedNode());
+ Resource toObtainByPartition = resourceToObtainByPartitions
+ .get(nodePartition);
+
+ if (null != toObtainByPartition
+ && Resources.greaterThan(rc, clusterResource, toObtainByPartition,
+ Resources.none())
+ && Resources.fitsIn(rc, clusterResource,
+ rmContainer.getAllocatedResource(), totalPreemptionAllowed)) {
+ Resources.subtractFrom(toObtainByPartition,
+ rmContainer.getAllocatedResource());
+ Resources.subtractFrom(totalPreemptionAllowed,
+ rmContainer.getAllocatedResource());
+
+ // When we have no more resource need to obtain, remove from map.
+ if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition,
+ Resources.none())) {
+ resourceToObtainByPartitions.remove(nodePartition);
+ }
+
+ // Add to preemptMap
+ addToPreemptMap(preemptMap, attemptId, rmContainer);
+ return true;
+ }
+
+ return false;
+ }
+
+ private static String getPartitionByNodeId(
+ CapacitySchedulerPreemptionContext context, NodeId nodeId) {
+ return context.getScheduler().getSchedulerNode(nodeId).getPartition();
+ }
+
+ private static void addToPreemptMap(
+ Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+ ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) {
+ Set<RMContainer> set = preemptMap.get(appAttemptId);
+ if (null == set) {
+ set = new HashSet<>();
+ preemptMap.put(appAttemptId, set);
+ }
+ set.add(containerToPreempt);
+ }
+
+ private static boolean preemptMapContains(
+ Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+ ApplicationAttemptId attemptId, RMContainer rmContainer) {
+ Set<RMContainer> rmContainers = preemptMap.get(attemptId);
+ if (null == rmContainers) {
+ return false;
+ }
+ return rmContainers.contains(rmContainer);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
index a8c62fd..33e4afc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
@@ -18,11 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@@ -34,9 +32,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -112,9 +107,11 @@ public class FifoCandidatesSelector
// Skip already selected containers
continue;
}
- boolean preempted = tryPreemptContainerAndDeductResToObtain(
- resToObtainByPartition, c, clusterResource, selectedCandidates,
- totalPreemptionAllowed);
+ boolean preempted = CapacitySchedulerPreemptionUtils
+ .tryPreemptContainerAndDeductResToObtain(rc,
+ preemptionContext, resToObtainByPartition, c,
+ clusterResource, selectedCandidates,
+ totalPreemptionAllowed);
if (!preempted) {
continue;
}
@@ -185,9 +182,10 @@ public class FifoCandidatesSelector
break;
}
- boolean preempted =
- tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
- clusterResource, preemptMap, totalPreemptionAllowed);
+ boolean preempted = CapacitySchedulerPreemptionUtils
+ .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
+ resToObtainByPartition, c, clusterResource, preemptMap,
+ totalPreemptionAllowed);
if (preempted) {
Resources.subtractFrom(skippedAMSize, c.getAllocatedResource());
}
@@ -195,68 +193,6 @@ public class FifoCandidatesSelector
skippedAMContainerlist.clear();
}
- private boolean preemptMapContains(
- Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
- ApplicationAttemptId attemptId, RMContainer rmContainer) {
- Set<RMContainer> rmContainers;
- if (null == (rmContainers = preemptMap.get(attemptId))) {
- return false;
- }
- return rmContainers.contains(rmContainer);
- }
-
- /**
- * Return should we preempt rmContainer. If we should, deduct from
- * <code>resourceToObtainByPartition</code>
- */
- private boolean tryPreemptContainerAndDeductResToObtain(
- Map<String, Resource> resourceToObtainByPartitions,
- RMContainer rmContainer, Resource clusterResource,
- Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
- Resource totalPreemptionAllowed) {
- ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId();
-
- // We will not account resource of a container twice or more
- if (preemptMapContains(preemptMap, attemptId, rmContainer)) {
- return false;
- }
-
- String nodePartition = getPartitionByNodeId(rmContainer.getAllocatedNode());
- Resource toObtainByPartition =
- resourceToObtainByPartitions.get(nodePartition);
-
- if (null != toObtainByPartition && Resources.greaterThan(rc,
- clusterResource, toObtainByPartition, Resources.none()) && Resources
- .fitsIn(rc, clusterResource, rmContainer.getAllocatedResource(),
- totalPreemptionAllowed)) {
- Resources.subtractFrom(toObtainByPartition,
- rmContainer.getAllocatedResource());
- Resources.subtractFrom(totalPreemptionAllowed,
- rmContainer.getAllocatedResource());
-
- // When we have no more resource need to obtain, remove from map.
- if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition,
- Resources.none())) {
- resourceToObtainByPartitions.remove(nodePartition);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug(this.getClass().getName() + " Marked container=" + rmContainer
- .getContainerId() + " from partition=" + nodePartition + " queue="
- + rmContainer.getQueueName() + " to be preemption candidates");
- }
- // Add to preemptMap
- addToPreemptMap(preemptMap, attemptId, rmContainer);
- return true;
- }
-
- return false;
- }
-
- private String getPartitionByNodeId(NodeId nodeId) {
- return preemptionContext.getScheduler().getSchedulerNode(nodeId)
- .getPartition();
- }
-
/**
* Given a target preemption for a specific application, select containers
* to preempt (after unreserving all reservation for that app).
@@ -268,10 +204,6 @@ public class FifoCandidatesSelector
Map<ApplicationAttemptId, Set<RMContainer>> selectedContainers,
Resource totalPreemptionAllowed) {
ApplicationAttemptId appId = app.getApplicationAttemptId();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Looking at application=" + app.getApplicationAttemptId()
- + " resourceToObtain=" + resToObtainByPartition);
- }
// first drop reserved containers towards rsrcPreempt
List<RMContainer> reservedContainers =
@@ -286,8 +218,9 @@ public class FifoCandidatesSelector
}
// Try to preempt this container
- tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
- clusterResource, selectedContainers, totalPreemptionAllowed);
+ CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
+ rc, preemptionContext, resToObtainByPartition, c, clusterResource,
+ selectedContainers, totalPreemptionAllowed);
if (!preemptionContext.isObserveOnly()) {
preemptionContext.getRMContext().getDispatcher().getEventHandler()
@@ -328,41 +261,9 @@ public class FifoCandidatesSelector
}
// Try to preempt this container
- tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
- clusterResource, selectedContainers, totalPreemptionAllowed);
- }
- }
-
- /**
- * Compare by reversed priority order first, and then reversed containerId
- * order
- * @param containers
- */
- @VisibleForTesting
- static void sortContainers(List<RMContainer> containers){
- Collections.sort(containers, new Comparator<RMContainer>() {
- @Override
- public int compare(RMContainer a, RMContainer b) {
- Comparator<Priority> c = new org.apache.hadoop.yarn.server
- .resourcemanager.resource.Priority.Comparator();
- int priorityComp = c.compare(b.getContainer().getPriority(),
- a.getContainer().getPriority());
- if (priorityComp != 0) {
- return priorityComp;
- }
- return b.getContainerId().compareTo(a.getContainerId());
- }
- });
- }
-
- private void addToPreemptMap(
- Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
- ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) {
- Set<RMContainer> set;
- if (null == (set = preemptMap.get(appAttemptId))) {
- set = new HashSet<>();
- preemptMap.put(appAttemptId, set);
+ CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
+ rc, preemptionContext, resToObtainByPartition, c, clusterResource,
+ selectedContainers, totalPreemptionAllowed);
}
- set.add(containerToPreempt);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
new file mode 100644
index 0000000..757f567
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * FifoIntraQueuePreemptionPlugin will handle intra-queue preemption for
+ * priority and user-limit.
+ */
+public class FifoIntraQueuePreemptionPlugin
+ implements
+ IntraQueuePreemptionComputePlugin {
+
+ protected final CapacitySchedulerPreemptionContext context;
+ protected final ResourceCalculator rc;
+
+ private static final Log LOG =
+ LogFactory.getLog(FifoIntraQueuePreemptionPlugin.class);
+
+ public FifoIntraQueuePreemptionPlugin(ResourceCalculator rc,
+ CapacitySchedulerPreemptionContext preemptionContext) {
+ this.context = preemptionContext;
+ this.rc = rc;
+ }
+
+ @Override
+ public Map<String, Resource> getResourceDemandFromAppsPerQueue(
+ String queueName, String partition) {
+
+ Map<String, Resource> resToObtainByPartition = new HashMap<>();
+ TempQueuePerPartition tq = context
+ .getQueueByPartition(queueName, partition);
+
+ Collection<TempAppPerPartition> appsOrderedByPriority = tq.getApps();
+ Resource actualPreemptNeeded = resToObtainByPartition.get(partition);
+
+ // Updating pending resource per-partition level.
+ if (actualPreemptNeeded == null) {
+ actualPreemptNeeded = Resources.createResource(0, 0);
+ resToObtainByPartition.put(partition, actualPreemptNeeded);
+ }
+
+ for (TempAppPerPartition a1 : appsOrderedByPriority) {
+ Resources.addTo(actualPreemptNeeded, a1.getActuallyToBePreempted());
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Selected to preempt " + actualPreemptNeeded
+ + " resource from partition:" + partition);
+ }
+ return resToObtainByPartition;
+ }
+
+ @Override
+ public void computeAppsIdealAllocation(Resource clusterResource,
+ Resource partitionBasedResource, TempQueuePerPartition tq,
+ Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+ Resource totalPreemptedResourceAllowed,
+ Resource queueReassignableResource, float maxAllowablePreemptLimit) {
+
+ // 1. AM used resource can be considered as a frozen resource for now.
+ // Hence such containers in a queue can be omitted from the preemption
+ // calculation.
+ Map<String, Resource> perUserAMUsed = new HashMap<String, Resource>();
+ Resource amUsed = calculateUsedAMResourcesPerQueue(tq.partition,
+ tq.leafQueue, perUserAMUsed);
+ Resources.subtractFrom(queueReassignableResource, amUsed);
+
+ // 2. tq.leafQueue will not be null as we validated it in caller side
+ Collection<FiCaSchedulerApp> apps = tq.leafQueue.getAllApplications();
+
+ // We do not need preemption for a single app
+ if (apps.size() == 1) {
+ return;
+ }
+
+ // 3. Create all tempApps for internal calculation and return a list from
+ // high priority to low priority order.
+ TAPriorityComparator taComparator = new TAPriorityComparator();
+ PriorityQueue<TempAppPerPartition> orderedByPriority =
+ createTempAppForResCalculation(tq.partition, apps, taComparator);
+
+ // 4. Calculate idealAssigned per app by checking based on queue's
+ // unallocated resource.Also return apps arranged from lower priority to
+ // higher priority.
+ TreeSet<TempAppPerPartition> orderedApps =
+ calculateIdealAssignedResourcePerApp(clusterResource,
+ partitionBasedResource, tq, selectedCandidates,
+ queueReassignableResource, orderedByPriority, perUserAMUsed);
+
+ // 5. A configurable limit that could define an ideal allowable preemption
+ // limit. Based on current queue's capacity,defined how much % could become
+ // preemptable.
+ Resource maxIntraQueuePreemptable = Resources.multiply(tq.getGuaranteed(),
+ maxAllowablePreemptLimit);
+ if (Resources.greaterThan(rc, clusterResource, maxIntraQueuePreemptable,
+ tq.getActuallyToBePreempted())) {
+ Resources.subtractFrom(maxIntraQueuePreemptable,
+ tq.getActuallyToBePreempted());
+ } else {
+ maxIntraQueuePreemptable = Resource.newInstance(0, 0);
+ }
+
+ // 6. We have two configurations here, one is intra queue limit and second
+ // one is per-round limit for any time preemption. Take a minimum of these
+ Resource preemptionLimit = Resources.min(rc, clusterResource,
+ maxIntraQueuePreemptable, totalPreemptedResourceAllowed);
+
+ // 7. From lowest priority app onwards, calculate toBePreempted resource
+ // based on demand.
+ calculateToBePreemptedResourcePerApp(clusterResource, orderedApps,
+ preemptionLimit);
+
+ // Save all apps (low to high) to temp queue for further reference
+ tq.addAllApps(orderedApps);
+
+ // 8. There are chances that we may preempt for the demand from same
+ // priority level, such cases are to be validated out.
+ validateOutSameAppPriorityFromDemand(clusterResource,
+ (TreeSet<TempAppPerPartition>) tq.getApps());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Queue Name:" + tq.queueName + ", partition:" + tq.partition);
+ for (TempAppPerPartition tmpApp : tq.getApps()) {
+ LOG.debug(tmpApp);
+ }
+ }
+ }
+
+ private void calculateToBePreemptedResourcePerApp(Resource clusterResource,
+ TreeSet<TempAppPerPartition> orderedApps, Resource preemptionLimit) {
+
+ for (TempAppPerPartition tmpApp : orderedApps) {
+ if (Resources.lessThanOrEqual(rc, clusterResource, preemptionLimit,
+ Resources.none())
+ || Resources.lessThanOrEqual(rc, clusterResource, tmpApp.getUsed(),
+ Resources.none())) {
+ continue;
+ }
+
+ Resource preemtableFromApp = Resources.subtract(tmpApp.getUsed(),
+ tmpApp.idealAssigned);
+ Resources.subtractFrom(preemtableFromApp, tmpApp.selected);
+ Resources.subtractFrom(preemtableFromApp, tmpApp.getAMUsed());
+
+ // Calculate toBePreempted from apps as follows:
+ // app.preemptable = min(max(app.used - app.selected - app.ideal, 0),
+ // intra_q_preemptable)
+ tmpApp.toBePreempted = Resources.min(rc, clusterResource, Resources
+ .max(rc, clusterResource, preemtableFromApp, Resources.none()),
+ preemptionLimit);
+
+ preemptionLimit = Resources.subtract(preemptionLimit,
+ tmpApp.toBePreempted);
+ }
+ }
+
+ /**
+ * Algorithm for calculating idealAssigned is as follows:
+ * For each partition:
+ * Q.reassignable = Q.used - Q.selected;
+ *
+ * # By default set ideal assigned 0 for app.
+ * app.idealAssigned as 0
+ * # get user limit from scheduler.
+ * userLimitRes = Q.getUserLimit(userName)
+ *
+ * # initial all value to 0
+ * Map<String, Resource> userToAllocated
+ *
+ * # Loop from highest priority to lowest priority app to calculate ideal
+ * for app in sorted-by(priority) {
+ * if Q.reassignable < 0:
+ * break;
+ *
+ * if (user-to-allocated.get(app.user) < userLimitRes) {
+ * idealAssigned = min((userLimitRes - userToAllocated.get(app.user)),
+ * (app.used + app.pending - app.selected))
+ * app.idealAssigned = min(Q.reassignable, idealAssigned)
+ * userToAllocated.get(app.user) += app.idealAssigned;
+ * } else {
+ * // skip this app because user-limit reached
+ * }
+ * Q.reassignable -= app.idealAssigned
+ * }
+ *
+ * @param clusterResource Cluster Resource
+ * @param partitionBasedResource resource per partition
+ * @param tq TempQueue
+ * @param selectedCandidates Already Selected preemption candidates
+ * @param queueReassignableResource Resource used in a queue
+ * @param orderedByPriority List of running apps
+ * @param perUserAMUsed AM used resource
+ * @return List of temp apps ordered from low to high priority
+ */
+ private TreeSet<TempAppPerPartition> calculateIdealAssignedResourcePerApp(
+ Resource clusterResource, Resource partitionBasedResource,
+ TempQueuePerPartition tq,
+ Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+ Resource queueReassignableResource,
+ PriorityQueue<TempAppPerPartition> orderedByPriority,
+ Map<String, Resource> perUserAMUsed) {
+
+ Comparator<TempAppPerPartition> reverseComp = Collections
+ .reverseOrder(new TAPriorityComparator());
+ TreeSet<TempAppPerPartition> orderedApps = new TreeSet<>(reverseComp);
+
+ Map<String, Resource> userIdealAssignedMapping = new HashMap<>();
+ String partition = tq.partition;
+
+ Map<String, Resource> preCalculatedUserLimit =
+ new HashMap<String, Resource>();
+
+ while (!orderedByPriority.isEmpty()) {
+ // Remove app from the next highest remaining priority and process it to
+ // calculate idealAssigned per app.
+ TempAppPerPartition tmpApp = orderedByPriority.remove();
+ orderedApps.add(tmpApp);
+
+ // Once unallocated resource is 0, we can stop assigning ideal per app.
+ if (Resources.lessThanOrEqual(rc, clusterResource,
+ queueReassignableResource, Resources.none())) {
+ continue;
+ }
+
+ String userName = tmpApp.app.getUser();
+ Resource userLimitResource = preCalculatedUserLimit.get(userName);
+
+ // Verify whether we already calculated headroom for this user.
+ if (userLimitResource == null) {
+ userLimitResource = Resources.clone(tq.leafQueue
+ .getUserLimitPerUser(userName, partitionBasedResource, partition));
+
+ Resource amUsed = perUserAMUsed.get(userName);
+ if (null == amUsed) {
+ amUsed = Resources.createResource(0, 0);
+ }
+
+ // Real AM used need not have to be considered for user-limit as well.
+ userLimitResource = Resources.subtract(userLimitResource, amUsed);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Userlimit for user '" + userName + "' is :"
+ + userLimitResource + ", and amUsed is:" + amUsed);
+ }
+
+ preCalculatedUserLimit.put(userName, userLimitResource);
+ }
+
+ Resource idealAssignedForUser = userIdealAssignedMapping.get(userName);
+
+ if (idealAssignedForUser == null) {
+ idealAssignedForUser = Resources.createResource(0, 0);
+ userIdealAssignedMapping.put(userName, idealAssignedForUser);
+ }
+
+ // Calculate total selected container resources from current app.
+ getAlreadySelectedPreemptionCandidatesResource(selectedCandidates,
+ tmpApp, partition);
+
+ // For any app, used+pending will give its idealAssigned. However it will
+ // be tightly linked to queue's unallocated quota. So lower priority apps
+ // idealAssigned may fall to 0 if higher priority apps demand is more.
+ Resource appIdealAssigned = Resources.add(tmpApp.getUsedDeductAM(),
+ tmpApp.getPending());
+ Resources.subtractFrom(appIdealAssigned, tmpApp.selected);
+
+ if (Resources.lessThan(rc, clusterResource, idealAssignedForUser,
+ userLimitResource)) {
+ appIdealAssigned = Resources.min(rc, clusterResource, appIdealAssigned,
+ Resources.subtract(userLimitResource, idealAssignedForUser));
+ tmpApp.idealAssigned = Resources.clone(Resources.min(rc,
+ clusterResource, queueReassignableResource, appIdealAssigned));
+ Resources.addTo(idealAssignedForUser, tmpApp.idealAssigned);
+ } else {
+ continue;
+ }
+
+ // Also set how much resource is needed by this app from others.
+ Resource appUsedExcludedSelected = Resources
+ .subtract(tmpApp.getUsedDeductAM(), tmpApp.selected);
+ if (Resources.greaterThan(rc, clusterResource, tmpApp.idealAssigned,
+ appUsedExcludedSelected)) {
+ tmpApp.setToBePreemptFromOther(
+ Resources.subtract(tmpApp.idealAssigned, appUsedExcludedSelected));
+ }
+
+ Resources.subtractFrom(queueReassignableResource, tmpApp.idealAssigned);
+ }
+
+ return orderedApps;
+ }
+
+ /*
+ * Previous policies would have already selected few containers from an
+ * application. Calculate total resource from these selected containers.
+ */
+ private void getAlreadySelectedPreemptionCandidatesResource(
+ Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+ TempAppPerPartition tmpApp, String partition) {
+ tmpApp.selected = Resources.createResource(0, 0);
+ Set<RMContainer> containers = selectedCandidates
+ .get(tmpApp.app.getApplicationAttemptId());
+
+ if (containers == null) {
+ return;
+ }
+
+ for (RMContainer cont : containers) {
+ if (partition.equals(cont.getNodeLabelExpression())) {
+ Resources.addTo(tmpApp.selected, cont.getAllocatedResource());
+ }
+ }
+ }
+
+ private PriorityQueue<TempAppPerPartition> createTempAppForResCalculation(
+ String partition, Collection<FiCaSchedulerApp> apps,
+ TAPriorityComparator taComparator) {
+ PriorityQueue<TempAppPerPartition> orderedByPriority = new PriorityQueue<>(
+ 100, taComparator);
+
+ // have an internal temp app structure to store intermediate data(priority)
+ for (FiCaSchedulerApp app : apps) {
+
+ Resource used = app.getAppAttemptResourceUsage().getUsed(partition);
+ Resource amUsed = null;
+ if (!app.isWaitingForAMContainer()) {
+ amUsed = app.getAMResource(partition);
+ }
+ Resource pending = app.getTotalPendingRequestsPerPartition()
+ .get(partition);
+ Resource reserved = app.getAppAttemptResourceUsage()
+ .getReserved(partition);
+
+ used = (used == null) ? Resources.createResource(0, 0) : used;
+ amUsed = (amUsed == null) ? Resources.createResource(0, 0) : amUsed;
+ pending = (pending == null) ? Resources.createResource(0, 0) : pending;
+ reserved = (reserved == null) ? Resources.createResource(0, 0) : reserved;
+
+ HashSet<String> partitions = new HashSet<String>(
+ app.getAppAttemptResourceUsage().getNodePartitionsSet());
+ partitions.addAll(app.getTotalPendingRequestsPerPartition().keySet());
+
+ // Create TempAppPerQueue for further calculation.
+ TempAppPerPartition tmpApp = new TempAppPerPartition(app,
+ Resources.clone(used), Resources.clone(amUsed),
+ Resources.clone(reserved), Resources.clone(pending));
+
+ // Set ideal allocation of app as 0.
+ tmpApp.idealAssigned = Resources.createResource(0, 0);
+
+ orderedByPriority.add(tmpApp);
+ }
+ return orderedByPriority;
+ }
+
+ /*
+ * Fifo+Priority based preemption policy need not have to preempt resources at
+ * same priority level. Such cases will be validated out.
+ */
+ public void validateOutSameAppPriorityFromDemand(Resource cluster,
+ TreeSet<TempAppPerPartition> appsOrderedfromLowerPriority) {
+
+ TempAppPerPartition[] apps = appsOrderedfromLowerPriority
+ .toArray(new TempAppPerPartition[appsOrderedfromLowerPriority.size()]);
+ if (apps.length <= 0) {
+ return;
+ }
+
+ int lPriority = 0;
+ int hPriority = apps.length - 1;
+
+ while (lPriority < hPriority
+ && !apps[lPriority].equals(apps[hPriority])
+ && apps[lPriority].getPriority() < apps[hPriority].getPriority()) {
+ Resource toPreemptFromOther = apps[hPriority]
+ .getToBePreemptFromOther();
+ Resource actuallyToPreempt = apps[lPriority].getActuallyToBePreempted();
+ Resource delta = Resources.subtract(apps[lPriority].toBePreempted,
+ actuallyToPreempt);
+
+ if (Resources.greaterThan(rc, cluster, delta, Resources.none())) {
+ Resource toPreempt = Resources.min(rc, cluster,
+ toPreemptFromOther, delta);
+
+ apps[hPriority].setToBePreemptFromOther(
+ Resources.subtract(toPreemptFromOther, toPreempt));
+ apps[lPriority].setActuallyToBePreempted(
+ Resources.add(actuallyToPreempt, toPreempt));
+ }
+
+ if (Resources.lessThanOrEqual(rc, cluster,
+ apps[lPriority].toBePreempted,
+ apps[lPriority].getActuallyToBePreempted())) {
+ lPriority++;
+ continue;
+ }
+
+ if (Resources.equals(apps[hPriority].getToBePreemptFromOther(),
+ Resources.none())) {
+ hPriority--;
+ continue;
+ }
+ }
+ }
+
+ private Resource calculateUsedAMResourcesPerQueue(String partition,
+ LeafQueue leafQueue, Map<String, Resource> perUserAMUsed) {
+ Collection<FiCaSchedulerApp> runningApps = leafQueue.getApplications();
+ Resource amUsed = Resources.createResource(0, 0);
+
+ for (FiCaSchedulerApp app : runningApps) {
+ Resource userAMResource = perUserAMUsed.get(app.getUser());
+ if (null == userAMResource) {
+ userAMResource = Resources.createResource(0, 0);
+ perUserAMUsed.put(app.getUser(), userAMResource);
+ }
+
+ Resources.addTo(userAMResource, app.getAMResource(partition));
+ Resources.addTo(amUsed, app.getAMResource(partition));
+ }
+ return amUsed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
new file mode 100644
index 0000000..039b53e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Identifies over utilized resources within a queue and tries to normalize
+ * them to resolve resource allocation anomalies w.r.t priority and user-limit.
+ */
+public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
+
+ @SuppressWarnings("serial")
+ static class TAPriorityComparator
+ implements
+ Serializable,
+ Comparator<TempAppPerPartition> {
+
+ @Override
+ public int compare(TempAppPerPartition tq1, TempAppPerPartition tq2) {
+ Priority p1 = Priority.newInstance(tq1.getPriority());
+ Priority p2 = Priority.newInstance(tq2.getPriority());
+
+ if (!p1.equals(p2)) {
+ return p1.compareTo(p2);
+ }
+ return tq1.getApplicationId().compareTo(tq2.getApplicationId());
+ }
+ }
+
+ IntraQueuePreemptionComputePlugin fifoPreemptionComputePlugin = null;
+ final CapacitySchedulerPreemptionContext context;
+
+ private static final Log LOG =
+ LogFactory.getLog(IntraQueueCandidatesSelector.class);
+
+ IntraQueueCandidatesSelector(
+ CapacitySchedulerPreemptionContext preemptionContext) {
+ super(preemptionContext);
+ fifoPreemptionComputePlugin = new FifoIntraQueuePreemptionPlugin(rc,
+ preemptionContext);
+ context = preemptionContext;
+ }
+
+ @Override
+ public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
+ Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+ Resource clusterResource, Resource totalPreemptedResourceAllowed) {
+
+ // 1. Calculate the abnormality within each queue one by one.
+ computeIntraQueuePreemptionDemand(
+ clusterResource, totalPreemptedResourceAllowed, selectedCandidates);
+
+ // 2. Previous selectors (with higher priority) could have already
+ // selected containers. We need to deduct pre-emptable resources
+ // based on already selected candidates.
+ CapacitySchedulerPreemptionUtils
+ .deductPreemptableResourcesBasedSelectedCandidates(preemptionContext,
+ selectedCandidates);
+
+ // 3. Loop through all partitions to select containers for preemption.
+ for (String partition : preemptionContext.getAllPartitions()) {
+ LinkedHashSet<String> queueNames = preemptionContext
+ .getUnderServedQueuesPerPartition(partition);
+
+ // Error check to handle non-mapped labels to queue.
+ if (null == queueNames) {
+ continue;
+ }
+
+ // 4. Iterate from most under-served queue in order.
+ for (String queueName : queueNames) {
+ LeafQueue leafQueue = preemptionContext.getQueueByPartition(queueName,
+ RMNodeLabelsManager.NO_LABEL).leafQueue;
+
+ // skip if not a leafqueue
+ if (null == leafQueue) {
+ continue;
+ }
+
+ // 5. Calculate the resource to obtain per partition
+ Map<String, Resource> resToObtainByPartition = fifoPreemptionComputePlugin
+ .getResourceDemandFromAppsPerQueue(queueName, partition);
+
+ // 6. Based on the selected resource demand per partition, select
+ // containers with known policy from inter-queue preemption.
+ synchronized (leafQueue) {
+ Iterator<FiCaSchedulerApp> desc = leafQueue.getOrderingPolicy()
+ .getPreemptionIterator();
+ while (desc.hasNext()) {
+ FiCaSchedulerApp app = desc.next();
+ preemptFromLeastStarvedApp(selectedCandidates, clusterResource,
+ totalPreemptedResourceAllowed, resToObtainByPartition,
+ leafQueue, app);
+ }
+ }
+ }
+ }
+
+ return selectedCandidates;
+ }
+
+ private void preemptFromLeastStarvedApp(
+ Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+ Resource clusterResource, Resource totalPreemptedResourceAllowed,
+ Map<String, Resource> resToObtainByPartition, LeafQueue leafQueue,
+ FiCaSchedulerApp app) {
+
+ // ToDo: Reuse reservation selector here.
+
+ List<RMContainer> liveContainers = new ArrayList<>(
+ app.getLiveContainers());
+ sortContainers(liveContainers);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "totalPreemptedResourceAllowed for preemption at this round is :"
+ + totalPreemptedResourceAllowed);
+ }
+
+ for (RMContainer c : liveContainers) {
+
+ // if there are no demand, return.
+ if (resToObtainByPartition.isEmpty()) {
+ return;
+ }
+
+ // skip preselected containers.
+ if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,
+ selectedCandidates)) {
+ continue;
+ }
+
+ // Skip already marked to killable containers
+ if (null != preemptionContext.getKillableContainers() && preemptionContext
+ .getKillableContainers().contains(c.getContainerId())) {
+ continue;
+ }
+
+ // Skip AM Container from preemption for now.
+ if (c.isAMContainer()) {
+ continue;
+ }
+
+ // Try to preempt this container
+ CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
+ rc, preemptionContext, resToObtainByPartition, c, clusterResource,
+ selectedCandidates, totalPreemptedResourceAllowed);
+ }
+
+ }
+
+ private void computeIntraQueuePreemptionDemand(Resource clusterResource,
+ Resource totalPreemptedResourceAllowed,
+ Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) {
+
+ // 1. Iterate through all partition to calculate demand within a partition.
+ for (String partition : context.getAllPartitions()) {
+ LinkedHashSet<String> queueNames = context
+ .getUnderServedQueuesPerPartition(partition);
+
+ if (null == queueNames) {
+ continue;
+ }
+
+ // 2. Its better to get partition based resource limit earlier before
+ // starting calculation
+ Resource partitionBasedResource =
+ context.getPartitionResource(partition);
+
+ // 3. loop through all queues corresponding to a partition.
+ for (String queueName : queueNames) {
+ TempQueuePerPartition tq = context.getQueueByPartition(queueName,
+ partition);
+ LeafQueue leafQueue = tq.leafQueue;
+
+ // skip if its parent queue
+ if (null == leafQueue) {
+ continue;
+ }
+
+ // 4. Consider reassignableResource as (used - actuallyToBePreempted).
+ // This provides as upper limit to split apps quota in a queue.
+ Resource queueReassignableResource = Resources.subtract(tq.getUsed(),
+ tq.getActuallyToBePreempted());
+
+ // 5. Check queue's used capacity. Make sure that the used capacity is
+ // above certain limit to consider for intra queue preemption.
+ if (leafQueue.getQueueCapacities().getUsedCapacity(partition) < context
+ .getMinimumThresholdForIntraQueuePreemption()) {
+ continue;
+ }
+
+ // 6. compute the allocation of all apps based on queue's unallocated
+ // capacity
+ fifoPreemptionComputePlugin.computeAppsIdealAllocation(clusterResource,
+ partitionBasedResource, tq, selectedCandidates,
+ totalPreemptedResourceAllowed,
+ queueReassignableResource,
+ context.getMaxAllowableLimitForIntraQueuePreemption());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java
new file mode 100644
index 0000000..93ebe65
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
+
+interface IntraQueuePreemptionComputePlugin {
+
+ Map<String, Resource> getResourceDemandFromAppsPerQueue(String queueName,
+ String partition);
+
+ void computeAppsIdealAllocation(Resource clusterResource,
+ Resource partitionBasedResource, TempQueuePerPartition tq,
+ Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+ Resource totalPreemptedResourceAllowed, Resource queueTotalUnassigned,
+ float maxAllowablePreemptLimit);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
index 103b419..3017e8f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
@@ -27,61 +27,22 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
-import java.util.PriorityQueue;
import java.util.Set;
/**
* Calculate how much resources need to be preempted for each queue,
* will be used by {@link PreemptionCandidatesSelector}
*/
-public class PreemptableResourceCalculator {
+public class PreemptableResourceCalculator
+ extends
+ AbstractPreemptableResourceCalculator {
private static final Log LOG =
LogFactory.getLog(PreemptableResourceCalculator.class);
- private final CapacitySchedulerPreemptionContext context;
- private final ResourceCalculator rc;
private boolean isReservedPreemptionCandidatesSelector;
- static class TQComparator implements Comparator<TempQueuePerPartition> {
- private ResourceCalculator rc;
- private Resource clusterRes;
-
- TQComparator(ResourceCalculator rc, Resource clusterRes) {
- this.rc = rc;
- this.clusterRes = clusterRes;
- }
-
- @Override
- public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) {
- if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) {
- return -1;
- }
- if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) {
- return 1;
- }
- return 0;
- }
-
- // Calculates idealAssigned / guaranteed
- // TempQueues with 0 guarantees are always considered the most over
- // capacity and therefore considered last for resources.
- private double getIdealPctOfGuaranteed(TempQueuePerPartition q) {
- double pctOver = Integer.MAX_VALUE;
- if (q != null && Resources.greaterThan(rc, clusterRes,
- q.getGuaranteed(),
- Resources.none())) {
- pctOver = Resources.divide(rc, clusterRes, q.idealAssigned,
- q.getGuaranteed());
- }
- return (pctOver);
- }
- }
-
/**
* PreemptableResourceCalculator constructor
*
@@ -93,136 +54,7 @@ public class PreemptableResourceCalculator {
public PreemptableResourceCalculator(
CapacitySchedulerPreemptionContext preemptionContext,
boolean isReservedPreemptionCandidatesSelector) {
- context = preemptionContext;
- rc = preemptionContext.getResourceCalculator();
- this.isReservedPreemptionCandidatesSelector =
- isReservedPreemptionCandidatesSelector;
- }
-
- /**
- * Computes a normalizedGuaranteed capacity based on active queues
- * @param rc resource calculator
- * @param clusterResource the total amount of resources in the cluster
- * @param queues the list of queues to consider
- */
- private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
- Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
- Resource activeCap = Resource.newInstance(0, 0);
-
- if (ignoreGuar) {
- for (TempQueuePerPartition q : queues) {
- q.normalizedGuarantee = 1.0f / queues.size();
- }
- } else {
- for (TempQueuePerPartition q : queues) {
- Resources.addTo(activeCap, q.getGuaranteed());
- }
- for (TempQueuePerPartition q : queues) {
- q.normalizedGuarantee = Resources.divide(rc, clusterResource,
- q.getGuaranteed(), activeCap);
- }
- }
- }
-
- // Take the most underserved TempQueue (the one on the head). Collect and
- // return the list of all queues that have the same idealAssigned
- // percentage of guaranteed.
- protected Collection<TempQueuePerPartition> getMostUnderservedQueues(
- PriorityQueue<TempQueuePerPartition> orderedByNeed,
- TQComparator tqComparator) {
- ArrayList<TempQueuePerPartition> underserved = new ArrayList<>();
- while (!orderedByNeed.isEmpty()) {
- TempQueuePerPartition q1 = orderedByNeed.remove();
- underserved.add(q1);
- TempQueuePerPartition q2 = orderedByNeed.peek();
- // q1's pct of guaranteed won't be larger than q2's. If it's less, then
- // return what has already been collected. Otherwise, q1's pct of
- // guaranteed == that of q2, so add q2 to underserved list during the
- // next pass.
- if (q2 == null || tqComparator.compare(q1,q2) < 0) {
- return underserved;
- }
- }
- return underserved;
- }
-
-
- /**
- * Given a set of queues compute the fix-point distribution of unassigned
- * resources among them. As pending request of a queue are exhausted, the
- * queue is removed from the set and remaining capacity redistributed among
- * remaining queues. The distribution is weighted based on guaranteed
- * capacity, unless asked to ignoreGuarantee, in which case resources are
- * distributed uniformly.
- */
- private void computeFixpointAllocation(ResourceCalculator rc,
- Resource tot_guarant, Collection<TempQueuePerPartition> qAlloc,
- Resource unassigned, boolean ignoreGuarantee) {
- // Prior to assigning the unused resources, process each queue as follows:
- // If current > guaranteed, idealAssigned = guaranteed + untouchable extra
- // Else idealAssigned = current;
- // Subtract idealAssigned resources from unassigned.
- // If the queue has all of its needs met (that is, if
- // idealAssigned >= current + pending), remove it from consideration.
- // Sort queues from most under-guaranteed to most over-guaranteed.
- TQComparator tqComparator = new TQComparator(rc, tot_guarant);
- PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,
- tqComparator);
- for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
- TempQueuePerPartition q = i.next();
- Resource used = q.getUsed();
-
- if (Resources.greaterThan(rc, tot_guarant, used,
- q.getGuaranteed())) {
- q.idealAssigned = Resources.add(
- q.getGuaranteed(), q.untouchableExtra);
- } else {
- q.idealAssigned = Resources.clone(used);
- }
- Resources.subtractFrom(unassigned, q.idealAssigned);
- // If idealAssigned < (allocated + used + pending), q needs more resources, so
- // add it to the list of underserved queues, ordered by need.
- Resource curPlusPend = Resources.add(q.getUsed(), q.pending);
- if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) {
- orderedByNeed.add(q);
- }
- }
-
- //assign all cluster resources until no more demand, or no resources are left
- while (!orderedByNeed.isEmpty()
- && Resources.greaterThan(rc,tot_guarant, unassigned,Resources.none())) {
- Resource wQassigned = Resource.newInstance(0, 0);
- // we compute normalizedGuarantees capacity based on currently active
- // queues
- resetCapacity(rc, unassigned, orderedByNeed, ignoreGuarantee);
-
- // For each underserved queue (or set of queues if multiple are equally
- // underserved), offer its share of the unassigned resources based on its
- // normalized guarantee. After the offer, if the queue is not satisfied,
- // place it back in the ordered list of queues, recalculating its place
- // in the order of most under-guaranteed to most over-guaranteed. In this
- // way, the most underserved queue(s) are always given resources first.
- Collection<TempQueuePerPartition> underserved =
- getMostUnderservedQueues(orderedByNeed, tqComparator);
- for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
- .hasNext();) {
- TempQueuePerPartition sub = i.next();
- Resource wQavail = Resources.multiplyAndNormalizeUp(rc,
- unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1));
- Resource wQidle = sub.offer(wQavail, rc, tot_guarant,
- isReservedPreemptionCandidatesSelector);
- Resource wQdone = Resources.subtract(wQavail, wQidle);
-
- if (Resources.greaterThan(rc, tot_guarant,
- wQdone, Resources.none())) {
- // The queue is still asking for more. Put it back in the priority
- // queue, recalculating its order based on need.
- orderedByNeed.add(sub);
- }
- Resources.addTo(wQassigned, wQdone);
- }
- Resources.subtractFrom(unassigned, wQassigned);
- }
+ super(preemptionContext, isReservedPreemptionCandidatesSelector);
}
/**
@@ -263,14 +95,14 @@ public class PreemptableResourceCalculator {
}
// first compute the allocation as a fixpoint based on guaranteed capacity
- computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned,
+ computeFixpointAllocation(tot_guarant, nonZeroGuarQueues, unassigned,
false);
// if any capacity is left unassigned, distributed among zero-guarantee
// queues uniformly (i.e., not based on guaranteed capacity, as this is zero)
if (!zeroGuarQueues.isEmpty()
&& Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) {
- computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned,
+ computeFixpointAllocation(tot_guarant, zeroGuarQueues, unassigned,
true);
}
@@ -321,13 +153,12 @@ public class PreemptableResourceCalculator {
computeIdealResourceDistribution(rc, root.getChildren(),
totalPreemptionAllowed, root.idealAssigned);
// compute recursively for lower levels and build list of leafs
- for(TempQueuePerPartition t : root.getChildren()) {
+ for (TempQueuePerPartition t : root.getChildren()) {
recursivelyComputeIdealAssignment(t, totalPreemptionAllowed);
}
}
}
-
private void calculateResToObtainByPartitionForLeafQueues(
Set<String> leafQueueNames, Resource clusterResource) {
// Loop all leaf queues
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
index dd33d8f..c74e34e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
@@ -19,10 +19,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -41,7 +47,7 @@ public abstract class PreemptionCandidatesSelector {
* selected candidates.
*
* @param selectedCandidates already selected candidates from previous policies
- * @param clusterResource
+ * @param clusterResource total resource
* @param totalPreemptedResourceAllowed how many resources allowed to be
* preempted in this round
* @return merged selected candidates.
@@ -49,4 +55,28 @@ public abstract class PreemptionCandidatesSelector {
public abstract Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource, Resource totalPreemptedResourceAllowed);
+
+ /**
+ * Compare by reversed priority order first, and then reversed containerId
+ * order.
+ *
+ * @param containers list of containers to sort for.
+ */
+ @VisibleForTesting
+ static void sortContainers(List<RMContainer> containers) {
+ Collections.sort(containers, new Comparator<RMContainer>() {
+ @Override
+ public int compare(RMContainer a, RMContainer b) {
+ Comparator<Priority> c = new org.apache.hadoop.yarn.server
+ .resourcemanager.resource.Priority.Comparator();
+ int priorityComp = c.compare(b.getContainer().getPriority(),
+ a.getContainer().getPriority());
+ if (priorityComp != 0) {
+ return priorityComp;
+ }
+ return b.getContainerId().compareTo(a.getContainerId());
+ }
+ });
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org