You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2018/02/23 02:12:23 UTC
hadoop git commit: YARN-7934. [GQ] Refactor preemption calculators to
allow overriding for Federation Global Algos. (Contributed by curino)
Repository: hadoop
Updated Branches:
refs/heads/trunk 95904f6b3 -> 514794e1a
YARN-7934. [GQ] Refactor preemption calculators to allow overriding for Federation Global Algos. (Contributed by curino)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/514794e1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/514794e1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/514794e1
Branch: refs/heads/trunk
Commit: 514794e1a5a39ca61de3981d53a05547ae17f5e4
Parents: 95904f6
Author: Carlo Curino <cu...@apache.org>
Authored: Thu Feb 22 18:12:12 2018 -0800
Committer: Carlo Curino <cu...@apache.org>
Committed: Thu Feb 22 18:12:12 2018 -0800
----------------------------------------------------------------------
.../AbstractPreemptableResourceCalculator.java | 38 +++++--
.../capacity/AbstractPreemptionEntity.java | 4 +
.../CapacitySchedulerPreemptionContext.java | 6 +-
.../capacity/PreemptableResourceCalculator.java | 21 ++--
.../monitor/capacity/TempQueuePerPartition.java | 106 +++++++++++++++----
.../webapp/dao/ResourceInfo.java | 5 +-
6 files changed, 139 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/514794e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
index 5196831..2589970 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
@@ -18,6 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
@@ -26,12 +32,6 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.PriorityQueue;
-
/**
* Calculate how much resources need to be preempted for each queue,
* will be used by {@link PreemptionCandidatesSelector}.
@@ -126,11 +126,18 @@ public class AbstractPreemptableResourceCalculator {
TempQueuePerPartition q = i.next();
Resource used = q.getUsed();
+ Resource initIdealAssigned;
if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) {
- q.idealAssigned = Resources.add(q.getGuaranteed(), q.untouchableExtra);
+ initIdealAssigned =
+ Resources.add(q.getGuaranteed(), q.untouchableExtra);
} else {
- q.idealAssigned = Resources.clone(used);
+ initIdealAssigned = Resources.clone(used);
}
+
+ // perform initial assignment
+ initIdealAssignment(totGuarant, q, initIdealAssigned);
+
+
Resources.subtractFrom(unassigned, q.idealAssigned);
// If idealAssigned < (allocated + used + pending), q needs more
// resources, so
@@ -188,6 +195,21 @@ public class AbstractPreemptableResourceCalculator {
}
}
+
+ /**
+ * This method is visible to allow sub-classes to override the initialization
+ * behavior.
+ *
+ * @param totGuarant total resources (useful for {@code ResourceCalculator}
+ * operations)
+ * @param q the {@code TempQueuePerPartition} being initialized
+ * @param initIdealAssigned the proposed initialization value.
+ */
+ protected void initIdealAssignment(Resource totGuarant,
+ TempQueuePerPartition q, Resource initIdealAssigned) {
+ q.idealAssigned = initIdealAssigned;
+ }
+
/**
* Computes a normalizedGuaranteed capacity based on active queues.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/514794e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java
index dbd1f0a..cb4d7af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java
@@ -59,6 +59,10 @@ public class AbstractPreemptionEntity {
this.selected = Resource.newInstance(0, 0);
}
+ public String getQueueName() {
+ return queueName;
+ }
+
public Resource getUsed() {
return current;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/514794e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
index d6f3f6c..098acdd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
@@ -30,7 +30,11 @@ import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Set;
-interface CapacitySchedulerPreemptionContext {
+/**
+ * This interface provides context for the calculation of ideal allocation
+ * and preemption for the {@code CapacityScheduler}.
+ */
+public interface CapacitySchedulerPreemptionContext {
CapacityScheduler getScheduler();
TempQueuePerPartition getQueueByPartition(String queueName,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/514794e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
index 907785e..2d2cdf6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
@@ -18,6 +18,11 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -26,11 +31,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
/**
* Calculate how much resources need to be preempted for each queue,
* will be used by {@link PreemptionCandidatesSelector}
@@ -70,7 +70,7 @@ public class PreemptableResourceCalculator
* @param totalPreemptionAllowed total amount of preemption we allow
* @param tot_guarant the amount of capacity assigned to this pool of queues
*/
- private void computeIdealResourceDistribution(ResourceCalculator rc,
+ protected void computeIdealResourceDistribution(ResourceCalculator rc,
List<TempQueuePerPartition> queues, Resource totalPreemptionAllowed,
Resource tot_guarant) {
@@ -138,14 +138,13 @@ public class PreemptableResourceCalculator
/**
* This method recursively computes the ideal assignment of resources to each
* level of the hierarchy. This ensures that leafs that are over-capacity but
- * with parents within capacity will not be preemptionCandidates. Preemptions are allowed
- * within each subtree according to local over/under capacity.
+ * with parents within capacity will not be preemptionCandidates. Preemptions
+ * are allowed within each subtree according to local over/under capacity.
*
* @param root the root of the cloned queue hierachy
* @param totalPreemptionAllowed maximum amount of preemption allowed
- * @return a list of leaf queues updated with preemption targets
*/
- private void recursivelyComputeIdealAssignment(
+ protected void recursivelyComputeIdealAssignment(
TempQueuePerPartition root, Resource totalPreemptionAllowed) {
if (root.getChildren() != null &&
root.getChildren().size() > 0) {
@@ -242,7 +241,7 @@ public class PreemptableResourceCalculator
// compute the ideal distribution of resources among queues
// updates cloned queues state accordingly
- tRoot.idealAssigned = tRoot.getGuaranteed();
+ tRoot.initializeRootIdealWithGuarangeed();
recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/514794e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
index fdeee52..9d8297d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -18,22 +18,20 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
- .ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
/**
* Temporary data-structure tracking resource availability, pending resource
* need, current utilization. This is per-queue-per-partition data structure
@@ -74,7 +72,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
// idealAssigned, used etc.
Map<String, TempUserPerPartition> usersPerPartition = new LinkedHashMap<>();
- TempQueuePerPartition(String queueName, Resource current,
+ @SuppressWarnings("checkstyle:parameternumber")
+ public TempQueuePerPartition(String queueName, Resource current,
boolean preemptionDisabled, String partition, Resource killable,
float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
Resource reserved, CSQueue queue, Resource effMinRes,
@@ -94,7 +93,7 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
pendingDeductReserved = Resources.createResource(0);
}
- if (ParentQueue.class.isAssignableFrom(queue.getClass())) {
+ if (queue != null && ParentQueue.class.isAssignableFrom(queue.getClass())) {
parentQueue = (ParentQueue) queue;
}
@@ -179,15 +178,14 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
// Because for a satisfied parent queue, it could have some under-utilized
// leaf queues. Such under-utilized leaf queue could preemption resources
// from over-utilized leaf queue located at other hierarchies.
- if (null == children || children.isEmpty()) {
- Resource maxOfGuranteedAndUsedDeductAssigned = Resources.subtract(
- Resources.max(rc, clusterResource, getUsed(), getGuaranteed()),
- idealAssigned);
- maxOfGuranteedAndUsedDeductAssigned = Resources.max(rc, clusterResource,
- maxOfGuranteedAndUsedDeductAssigned, Resources.none());
- accepted = Resources.min(rc, clusterResource, accepted,
- maxOfGuranteedAndUsedDeductAssigned);
- }
+
+ accepted = filterByMaxDeductAssigned(rc, clusterResource, accepted);
+
+ // accepted so far contains the "quota acceptable" amount, we now filter by
+ // locality acceptable
+
+ accepted = acceptedByLocality(rc, accepted);
+
Resource remain = Resources.subtract(avail, accepted);
Resources.addTo(idealAssigned, accepted);
return remain;
@@ -329,4 +327,72 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
public Map<String, TempUserPerPartition> getUsersPerPartition() {
return usersPerPartition;
}
+
+ public void setPending(Resource pending) {
+ this.pending = pending;
+ }
+
+ public Resource getIdealAssigned() {
+ return idealAssigned;
+ }
+
+ public String toGlobalString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n").append(toString());
+ for (TempQueuePerPartition c : children) {
+ sb.append(c.toGlobalString());
+ }
+ return sb.toString();
+ }
+
+ /**
+ * This method is visible to allow sub-classes to override the behavior,
+ * specifically to take into account locality-based limitations of how much
+ * the queue can consumed.
+ *
+ * @param rc the ResourceCalculator to be used.
+ * @param offered the input amount of Resource offered to this queue.
+ *
+ * @return the subset of Resource(s) that the queue can consumed after
+ * accounting for locality effects.
+ */
+ protected Resource acceptedByLocality(ResourceCalculator rc,
+ Resource offered) {
+ return offered;
+ }
+
+ /**
+ * This method is visible to allow sub-classes to override the behavior,
+ * specifically for federation purposes we do not want to cap resources as it
+ * is done here.
+ *
+ * @param rc the {@code ResourceCalculator} to be used
+ * @param clusterResource the total cluster resources
+ * @param offered the resources offered to this queue
+ * @return the amount of resources accepted after considering max and
+ * deducting assigned.
+ */
+ protected Resource filterByMaxDeductAssigned(ResourceCalculator rc,
+ Resource clusterResource, Resource offered) {
+ if (null == children || children.isEmpty()) {
+ Resource maxOfGuranteedAndUsedDeductAssigned = Resources.subtract(
+ Resources.max(rc, clusterResource, getUsed(), getGuaranteed()),
+ idealAssigned);
+ maxOfGuranteedAndUsedDeductAssigned = Resources.max(rc, clusterResource,
+ maxOfGuranteedAndUsedDeductAssigned, Resources.none());
+ offered = Resources.min(rc, clusterResource, offered,
+ maxOfGuranteedAndUsedDeductAssigned);
+ }
+ return offered;
+ }
+
+ /**
+ * This method is visible to allow sub-classes to ovverride the behavior,
+ * specifically for federation purposes we need to initialize per-sub-cluster
+ * roots as well as the global one.
+ */
+ protected void initializeRootIdealWithGuarangeed() {
+ idealAssigned = Resources.clone(getGuaranteed());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/514794e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ResourceInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ResourceInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ResourceInfo.java
index 5bed936..9a335e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ResourceInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ResourceInfo.java
@@ -70,7 +70,7 @@ public class ResourceInfo {
@Override
public String toString() {
- return resources.toString();
+ return getResource().toString();
}
public void setMemory(int memory) {
@@ -90,6 +90,9 @@ public class ResourceInfo {
}
public Resource getResource() {
+ if (resources == null) {
+ resources = Resource.newInstance(memory, vCores);
+ }
return Resource.newInstance(resources);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org