You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2018/05/25 16:07:55 UTC
hadoop git commit: YARN-8292: Fix the dominant resource preemption
cannot happen when some of the resource vector becomes negative. Contributed
by Wangda Tan.
Repository: hadoop
Updated Branches:
refs/heads/trunk bddfe796f -> 8d5509c68
YARN-8292: Fix the dominant resource preemption cannot happen when some of the resource vector becomes negative. Contributed by Wangda Tan.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8d5509c6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8d5509c6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8d5509c6
Branch: refs/heads/trunk
Commit: 8d5509c68156faaa6641f4e747fc9ff80adccf88
Parents: bddfe79
Author: Eric E Payne <er...@oath.com>
Authored: Fri May 25 16:06:09 2018 +0000
Committer: Eric E Payne <er...@oath.com>
Committed: Fri May 25 16:06:09 2018 +0000
----------------------------------------------------------------------
.../resource/DefaultResourceCalculator.java | 15 ++-
.../resource/DominantResourceCalculator.java | 39 ++++---
.../yarn/util/resource/ResourceCalculator.java | 13 ++-
.../hadoop/yarn/util/resource/Resources.java | 5 -
.../AbstractPreemptableResourceCalculator.java | 58 ++++++++---
.../CapacitySchedulerPreemptionUtils.java | 61 +++++++++--
.../capacity/FifoCandidatesSelector.java | 8 +-
.../FifoIntraQueuePreemptionPlugin.java | 4 +-
.../capacity/IntraQueueCandidatesSelector.java | 2 +-
.../capacity/PreemptableResourceCalculator.java | 6 +-
.../monitor/capacity/TempQueuePerPartition.java | 8 +-
...alCapacityPreemptionPolicyMockFramework.java | 30 ++++++
.../TestPreemptionForQueueWithPriorities.java | 103 ++++++++++++-------
...pacityPreemptionPolicyInterQueueWithDRF.java | 60 ++++++++++-
14 files changed, 312 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
index 6375c4a..ab6d7f5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
@@ -136,13 +136,18 @@ public class DefaultResourceCalculator extends ResourceCalculator {
}
@Override
- public boolean isAnyMajorResourceZero(Resource resource) {
- return resource.getMemorySize() == 0f;
- }
-
- @Override
public Resource normalizeDown(Resource r, Resource stepFactor) {
return Resources.createResource(
roundDown((r.getMemorySize()), stepFactor.getMemorySize()));
}
+
+ @Override
+ public boolean isAnyMajorResourceZeroOrNegative(Resource resource) {
+ return resource.getMemorySize() <= 0;
+ }
+
+ @Override
+ public boolean isAnyMajorResourceAboveZero(Resource resource) {
+ return resource.getMemorySize() > 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
index 6fed23b..2e85ebc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
@@ -577,19 +577,6 @@ public class DominantResourceCalculator extends ResourceCalculator {
}
@Override
- public boolean isAnyMajorResourceZero(Resource resource) {
- int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
- for (int i = 0; i < maxLength; i++) {
- ResourceInformation resourceInformation = resource
- .getResourceInformation(i);
- if (resourceInformation.getValue() == 0L) {
- return true;
- }
- }
- return false;
- }
-
- @Override
public Resource normalizeDown(Resource r, Resource stepFactor) {
Resource ret = Resource.newInstance(r);
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
@@ -613,4 +600,30 @@ public class DominantResourceCalculator extends ResourceCalculator {
}
return ret;
}
+
+ @Override
+ public boolean isAnyMajorResourceZeroOrNegative(Resource resource) {
+ int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+ for (int i = 0; i < maxLength; i++) {
+ ResourceInformation resourceInformation = resource.getResourceInformation(
+ i);
+ if (resourceInformation.getValue() <= 0L) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean isAnyMajorResourceAboveZero(Resource resource) {
+ int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+ for (int i = 0; i < maxLength; i++) {
+ ResourceInformation resourceInformation = resource.getResourceInformation(
+ i);
+ if (resourceInformation.getValue() > 0) {
+ return true;
+ }
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
index 1c42126..51078cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
@@ -239,12 +239,12 @@ public abstract class ResourceCalculator {
/**
* Check if resource has any major resource types (which are all NodeManagers
- * included) a zero value.
+ * included) a zero value or negative value.
*
* @param resource resource
* @return returns true if any resource is zero.
*/
- public abstract boolean isAnyMajorResourceZero(Resource resource);
+ public abstract boolean isAnyMajorResourceZeroOrNegative(Resource resource);
/**
* Get resource <code>r</code>and normalize down using step-factor
@@ -257,4 +257,13 @@ public abstract class ResourceCalculator {
* @return resulting normalized resource
*/
public abstract Resource normalizeDown(Resource r, Resource stepFactor);
+
+ /**
+ * Check if resource has any major resource types (which are all NodeManagers
+ * included) has a >0 value.
+ *
+ * @param resource resource
+ * @return returns true if any resource is >0
+ */
+ public abstract boolean isAnyMajorResourceAboveZero(Resource resource);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index 1c08844..7826f51 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -547,11 +547,6 @@ public class Resources {
return ret;
}
- public static boolean isAnyMajorResourceZero(ResourceCalculator rc,
- Resource resource) {
- return rc.isAnyMajorResourceZero(resource);
- }
-
public static Resource normalizeDown(ResourceCalculator calculator,
Resource resource, Resource factor) {
return calculator.normalizeDown(resource, factor);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
index 2589970..64b3615 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
@@ -18,12 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.PriorityQueue;
-
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
@@ -32,6 +26,12 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
/**
* Calculate how much resources need to be preempted for each queue,
* will be used by {@link PreemptionCandidatesSelector}.
@@ -40,7 +40,8 @@ public class AbstractPreemptableResourceCalculator {
protected final CapacitySchedulerPreemptionContext context;
protected final ResourceCalculator rc;
- private boolean isReservedPreemptionCandidatesSelector;
+ protected boolean isReservedPreemptionCandidatesSelector;
+ private Resource stepFactor;
static class TQComparator implements Comparator<TempQueuePerPartition> {
private ResourceCalculator rc;
@@ -90,6 +91,11 @@ public class AbstractPreemptableResourceCalculator {
rc = preemptionContext.getResourceCalculator();
this.isReservedPreemptionCandidatesSelector =
isReservedPreemptionCandidatesSelector;
+
+ stepFactor = Resource.newInstance(0, 0);
+ for (ResourceInformation ri : stepFactor.getResources()) {
+ ri.setValue(1);
+ }
}
/**
@@ -122,23 +128,24 @@ public class AbstractPreemptableResourceCalculator {
TQComparator tqComparator = new TQComparator(rc, totGuarant);
PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,
tqComparator);
- for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
+ for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext(); ) {
TempQueuePerPartition q = i.next();
Resource used = q.getUsed();
Resource initIdealAssigned;
if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) {
- initIdealAssigned =
- Resources.add(q.getGuaranteed(), q.untouchableExtra);
- } else {
+ initIdealAssigned = Resources.add(
+ Resources.componentwiseMin(q.getGuaranteed(), q.getUsed()),
+ q.untouchableExtra);
+ } else{
initIdealAssigned = Resources.clone(used);
}
// perform initial assignment
initIdealAssignment(totGuarant, q, initIdealAssigned);
-
Resources.subtractFrom(unassigned, q.idealAssigned);
+
// If idealAssigned < (allocated + used + pending), q needs more
// resources, so
// add it to the list of underserved queues, ordered by need.
@@ -152,7 +159,6 @@ public class AbstractPreemptableResourceCalculator {
// left
while (!orderedByNeed.isEmpty() && Resources.greaterThan(rc, totGuarant,
unassigned, Resources.none())) {
- Resource wQassigned = Resource.newInstance(0, 0);
// we compute normalizedGuarantees capacity based on currently active
// queues
resetCapacity(unassigned, orderedByNeed, ignoreGuarantee);
@@ -166,11 +172,26 @@ public class AbstractPreemptableResourceCalculator {
Collection<TempQueuePerPartition> underserved = getMostUnderservedQueues(
orderedByNeed, tqComparator);
+ // This value will be used in every round to calculate ideal allocation.
+ // So make a copy to avoid it changed during calculation.
+ Resource dupUnassignedForTheRound = Resources.clone(unassigned);
+
for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
.hasNext();) {
+ if (!rc.isAnyMajorResourceAboveZero(unassigned)) {
+ break;
+ }
+
TempQueuePerPartition sub = i.next();
- Resource wQavail = Resources.multiplyAndNormalizeUp(rc, unassigned,
- sub.normalizedGuarantee, Resource.newInstance(1, 1));
+
+ // How much resource we offer to the queue (to increase its ideal_alloc
+ Resource wQavail = Resources.multiplyAndNormalizeUp(rc,
+ dupUnassignedForTheRound,
+ sub.normalizedGuarantee, this.stepFactor);
+
+ // Make sure it is not beyond unassigned
+ wQavail = Resources.componentwiseMin(wQavail, unassigned);
+
Resource wQidle = sub.offer(wQavail, rc, totGuarant,
isReservedPreemptionCandidatesSelector);
Resource wQdone = Resources.subtract(wQavail, wQidle);
@@ -180,9 +201,12 @@ public class AbstractPreemptableResourceCalculator {
// queue, recalculating its order based on need.
orderedByNeed.add(sub);
}
- Resources.addTo(wQassigned, wQdone);
+
+ Resources.subtractFrom(unassigned, wQdone);
+
+ // Make sure unassigned is always larger than 0
+ unassigned = Resources.componentwiseMax(unassigned, Resources.none());
}
- Resources.subtractFrom(unassigned, wQassigned);
}
// Sometimes its possible that, all queues are properly served. So intra
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
index f097e9c..5396d61 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -132,6 +133,16 @@ public class CapacitySchedulerPreemptionUtils {
* map to hold preempted containers
* @param totalPreemptionAllowed
* total preemption allowed per round
+ * @param conservativeDRF
+ * should we do conservativeDRF preemption or not.
+ * When true:
+ * stop preempt container when any major resource type <= 0 for to-
+ * preempt.
+ * This is default preemption behavior of intra-queue preemption
+ * When false:
+ * stop preempt container when: all major resource type <= 0 for
+ * to-preempt.
+ * This is default preemption behavior of inter-queue preemption
* @return should we preempt rmContainer. If we should, deduct from
* <code>resourceToObtainByPartition</code>
*/
@@ -140,7 +151,7 @@ public class CapacitySchedulerPreemptionUtils {
Map<String, Resource> resourceToObtainByPartitions,
RMContainer rmContainer, Resource clusterResource,
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
- Resource totalPreemptionAllowed) {
+ Resource totalPreemptionAllowed, boolean conservativeDRF) {
ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId();
// We will not account resource of a container twice or more
@@ -152,13 +163,49 @@ public class CapacitySchedulerPreemptionUtils {
rmContainer.getAllocatedNode());
Resource toObtainByPartition = resourceToObtainByPartitions
.get(nodePartition);
+ if (null == toObtainByPartition) {
+ return false;
+ }
+
+ // If a toObtain resource type == 0, set it to -1 to avoid 0 resource
+ // type affect following doPreemption check: isAnyMajorResourceZero
+ for (ResourceInformation ri : toObtainByPartition.getResources()) {
+ if (ri.getValue() == 0) {
+ ri.setValue(-1);
+ }
+ }
+
+ if (rc.isAnyMajorResourceAboveZero(toObtainByPartition) && Resources.fitsIn(
+ rc, rmContainer.getAllocatedResource(), totalPreemptionAllowed)) {
+ boolean doPreempt;
+
+ // How much resource left after preemption happen.
+ Resource toObtainAfterPreemption = Resources.subtract(toObtainByPartition,
+ rmContainer.getAllocatedResource());
+
+ if (conservativeDRF) {
+ doPreempt = !rc.isAnyMajorResourceZeroOrNegative(toObtainByPartition);
+ } else {
+ // When we want to do more aggressive preemption, we will do preemption
+ // only if:
+ // - The preempt of the container makes positive contribution to the
+ // to-obtain resource. Positive contribution means any positive
+ // resource type decreases.
+ //
+ // This is example of positive contribution:
+ // * before: <30, 10, 5>, after <20, 10, -10>
+ // But this not positive contribution:
+ // * before: <30, 10, 0>, after <30, 10, -15>
+ doPreempt = Resources.lessThan(rc, clusterResource,
+ Resources
+ .componentwiseMax(toObtainAfterPreemption, Resources.none()),
+ Resources.componentwiseMax(toObtainByPartition, Resources.none()));
+ }
+
+ if (!doPreempt) {
+ return false;
+ }
- if (null != toObtainByPartition
- && Resources.greaterThan(rc, clusterResource, toObtainByPartition,
- Resources.none())
- && Resources.fitsIn(rc, rmContainer.getAllocatedResource(),
- totalPreemptionAllowed)
- && !Resources.isAnyMajorResourceZero(rc, toObtainByPartition)) {
Resources.subtractFrom(toObtainByPartition,
rmContainer.getAllocatedResource());
Resources.subtractFrom(totalPreemptionAllowed,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
index 748548a..3b2fcbb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
@@ -111,7 +111,7 @@ public class FifoCandidatesSelector
.tryPreemptContainerAndDeductResToObtain(rc,
preemptionContext, resToObtainByPartition, c,
clusterResource, selectedCandidates,
- totalPreemptionAllowed);
+ totalPreemptionAllowed, false);
if (!preempted) {
continue;
}
@@ -187,7 +187,7 @@ public class FifoCandidatesSelector
boolean preempted = CapacitySchedulerPreemptionUtils
.tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
resToObtainByPartition, c, clusterResource, preemptMap,
- totalPreemptionAllowed);
+ totalPreemptionAllowed, false);
if (preempted) {
Resources.subtractFrom(skippedAMSize, c.getAllocatedResource());
}
@@ -221,7 +221,7 @@ public class FifoCandidatesSelector
// Try to preempt this container
CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
rc, preemptionContext, resToObtainByPartition, c, clusterResource,
- selectedContainers, totalPreemptionAllowed);
+ selectedContainers, totalPreemptionAllowed, false);
if (!preemptionContext.isObserveOnly()) {
preemptionContext.getRMContext().getDispatcher().getEventHandler()
@@ -264,7 +264,7 @@ public class FifoCandidatesSelector
// Try to preempt this container
CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
rc, preemptionContext, resToObtainByPartition, c, clusterResource,
- selectedContainers, totalPreemptionAllowed);
+ selectedContainers, totalPreemptionAllowed, false);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
index 1776bd4..40f333f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
@@ -278,8 +278,8 @@ public class FifoIntraQueuePreemptionPlugin
// Once unallocated resource is 0, we can stop assigning ideal per app.
if (Resources.lessThanOrEqual(rc, clusterResource,
- queueReassignableResource, Resources.none())
- || Resources.isAnyMajorResourceZero(rc, queueReassignableResource)) {
+ queueReassignableResource, Resources.none()) || rc
+ .isAnyMajorResourceZeroOrNegative(queueReassignableResource)) {
continue;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
index 5b6932e..a91fac7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
@@ -230,7 +230,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
boolean ret = CapacitySchedulerPreemptionUtils
.tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
resToObtainByPartition, c, clusterResource, selectedCandidates,
- totalPreemptedResourceAllowed);
+ totalPreemptedResourceAllowed, true);
// Subtract from respective user's resource usage once a container is
// selected for preemption.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
index 676c14f..08d834e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
@@ -41,8 +41,6 @@ public class PreemptableResourceCalculator
private static final Log LOG =
LogFactory.getLog(PreemptableResourceCalculator.class);
- private boolean isReservedPreemptionCandidatesSelector;
-
/**
* PreemptableResourceCalculator constructor
*
@@ -95,8 +93,8 @@ public class PreemptableResourceCalculator
}
// first compute the allocation as a fixpoint based on guaranteed capacity
- computeFixpointAllocation(tot_guarant, nonZeroGuarQueues, unassigned,
- false);
+ computeFixpointAllocation(tot_guarant, new HashSet<>(nonZeroGuarQueues),
+ unassigned, false);
// if any capacity is left unassigned, distributed among zero-guarantee
// queues uniformly (i.e., not based on guaranteed capacity, as this is zero)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
index 9d8297d..4214acc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -151,7 +151,7 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
// # This is for leaf queue only.
// max(guaranteed, used) - assigned}
// remain = avail - accepted
- Resource accepted = Resources.min(rc, clusterResource,
+ Resource accepted = Resources.componentwiseMin(
absMaxCapIdealAssignedDelta,
Resources.min(rc, clusterResource, avail, Resources
/*
@@ -186,6 +186,12 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
accepted = acceptedByLocality(rc, accepted);
+ // accept should never be < 0
+ accepted = Resources.componentwiseMax(accepted, Resources.none());
+
+ // or more than offered
+ accepted = Resources.componentwiseMin(accepted, avail);
+
Resource remain = Resources.subtract(avail, accepted);
Resources.addTo(idealAssigned, accepted);
return remain;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
index a8e2697..a972584 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.mockito.ArgumentMatcher;
@@ -104,10 +106,32 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
EventHandler<Event> mDisp = null;
ProportionalCapacityPreemptionPolicy policy = null;
Resource clusterResource = null;
+ // Initialize resource map
+ Map<String, ResourceInformation> riMap = new HashMap<>();
+
+ private void resetResourceInformationMap() {
+ // Initialize mandatory resources
+ ResourceInformation memory = ResourceInformation.newInstance(
+ ResourceInformation.MEMORY_MB.getName(),
+ ResourceInformation.MEMORY_MB.getUnits(),
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+ ResourceInformation vcores = ResourceInformation.newInstance(
+ ResourceInformation.VCORES.getName(),
+ ResourceInformation.VCORES.getUnits(),
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+ riMap.put(ResourceInformation.MEMORY_URI, memory);
+ riMap.put(ResourceInformation.VCORES_URI, vcores);
+
+ ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
+ }
@SuppressWarnings("unchecked")
@Before
public void setup() {
+ resetResourceInformationMap();
+
org.apache.log4j.Logger.getRootLogger().setLevel(
org.apache.log4j.Level.DEBUG);
@@ -142,6 +166,12 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
partitionToResource = new HashMap<>();
nodeIdToSchedulerNodes = new HashMap<>();
nameToCSQueues = new HashMap<>();
+ clusterResource = Resource.newInstance(0, 0);
+ }
+
+ @After
+ public void cleanup() {
+ resetResourceInformationMap();
}
public void buildEnv(String labelsConfig, String nodesConfig,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
index e9a8116..6a953cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
@@ -20,44 +20,25 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TestPreemptionForQueueWithPriorities
extends ProportionalCapacityPreemptionPolicyMockFramework {
- // Initialize resource map
- private Map<String, ResourceInformation> riMap = new HashMap<>();
-
@Before
public void setup() {
-
- // Initialize mandatory resources
- ResourceInformation memory = ResourceInformation.newInstance(
- ResourceInformation.MEMORY_MB.getName(),
- ResourceInformation.MEMORY_MB.getUnits(),
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
- ResourceInformation vcores = ResourceInformation.newInstance(
- ResourceInformation.VCORES.getName(),
- ResourceInformation.VCORES.getUnits(),
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
- riMap.put(ResourceInformation.MEMORY_URI, memory);
- riMap.put(ResourceInformation.VCORES_URI, vcores);
-
- ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
-
+ rc = new DefaultResourceCalculator();
super.setup();
policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
}
@@ -340,8 +321,8 @@ public class TestPreemptionForQueueWithPriorities
* - a2 (capacity=60), p=1
* - b (capacity=30), p=1
* - b1 (capacity=50), p=1
- * - b1 (capacity=50), p=2
- * - c (capacity=40), p=2
+ * - b2 (capacity=50), p=2
+ * - c (capacity=40), p=1
* </pre>
*/
String labelsConfig = "=100,true"; // default partition
@@ -349,11 +330,11 @@ public class TestPreemptionForQueueWithPriorities
String queuesConfig =
// guaranteed,max,used,pending
"root(=[100 100 100 100]);" + //root
- "-a(=[30 100 40 50]){priority=1};" + // a
+ "-a(=[29 100 40 50]){priority=1};" + // a
"--a1(=[12 100 20 50]){priority=1};" + // a1
- "--a2(=[18 100 20 50]){priority=1};" + // a2
- "-b(=[30 100 59 50]){priority=1};" + // b
- "--b1(=[15 100 30 50]){priority=1};" + // b1
+ "--a2(=[17 100 20 50]){priority=1};" + // a2
+ "-b(=[31 100 59 50]){priority=1};" + // b
+ "--b1(=[16 100 30 50]){priority=1};" + // b1
"--b2(=[15 100 29 50]){priority=2};" + // b2
"-c(=[40 100 1 30]){priority=1}"; // c
String appsConfig =
@@ -362,7 +343,7 @@ public class TestPreemptionForQueueWithPriorities
"a2\t(1,1,n1,,20,false);" + // app2 in a2
"b1\t(1,1,n1,,30,false);" + // app3 in b1
"b2\t(1,1,n1,,29,false);" + // app4 in b2
- "c\t(1,1,n1,,29,false)"; // app5 in c
+ "c\t(1,1,n1,,1,false)"; // app5 in c
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
@@ -370,16 +351,16 @@ public class TestPreemptionForQueueWithPriorities
// Preemption should first divide capacities between a / b, and b2 should
// get less preemption than b1 (because b2 has higher priority)
- verify(mDisp, times(5)).handle(argThat(
+ verify(mDisp, times(6)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, never()).handle(argThat(
+ verify(mDisp, times(1)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
- verify(mDisp, times(15)).handle(argThat(
+ verify(mDisp, times(13)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
- verify(mDisp, times(9)).handle(argThat(
+ verify(mDisp, times(10)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
}
@@ -426,7 +407,7 @@ public class TestPreemptionForQueueWithPriorities
// Preemption should first divide capacities between a / b, and b1 should
// get less preemption than b2 (because b1 has higher priority)
- verify(mDisp, never()).handle(argThat(
+ verify(mDisp, times(3)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, never()).handle(argThat(
@@ -505,4 +486,56 @@ public class TestPreemptionForQueueWithPriorities
getAppAttemptId(3))));
}
+ @Test
+ public void test3ResourceTypesInterQueuePreemption() throws IOException {
+ rc = new DominantResourceCalculator();
+ when(cs.getResourceCalculator()).thenReturn(rc);
+
+ // Initialize resource map
+ String RESOURCE_1 = "res1";
+ riMap.put(RESOURCE_1, ResourceInformation.newInstance(RESOURCE_1, "", 0,
+ ResourceTypes.COUNTABLE, 0, Integer.MAX_VALUE));
+
+ ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
+
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \ \
+ * a b c
+ * </pre>
+ * A / B / C have 33.3 / 33.3 / 33.4 resources
+ * Total cluster resource have mem=30, cpu=18, GPU=6
+ * A uses mem=6, cpu=3, GPU=3
+ * B uses mem=6, cpu=3, GPU=3
+ * C is asking mem=1,cpu=1,GPU=1
+ *
+ * We expect it can preempt from one of the jobs
+ */
+ String labelsConfig =
+ "=30:18:6,true;";
+ String nodesConfig =
+ "n1= res=30:18:6;"; // n1 is default partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[30:18:6 30:18:6 12:12:6 1:1:1]){priority=1};" + //root
+ "-a(=[10:6:2 10:6:2 6:6:3 0:0:0]){priority=1};" + // a
+ "-b(=[10:6:2 10:6:2 6:6:3 0:0:0]){priority=1};" + // b
+ "-c(=[10:6:2 10:6:2 0:0:0 1:1:1]){priority=2}"; // c
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t" // app1 in a1
+ + "(1,2:2:1,n1,,3,false);" +
+ "b\t" // app2 in b2
+ + "(1,2:2:1,n1,,3,false)";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ verify(mDisp, times(1)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(1))));
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java
index c8a1f0f..14a3a9a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java
@@ -18,11 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Before;
import org.junit.Test;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import java.io.IOException;
+
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@@ -41,8 +46,7 @@ public class TestProportionalCapacityPreemptionPolicyInterQueueWithDRF
}
@Test
- public void testInterQueuePreemptionWithMultipleResource()
- throws Exception {
+ public void testInterQueuePreemptionWithMultipleResource() throws Exception {
/**
* Queue structure is:
*
@@ -121,4 +125,52 @@ public class TestProportionalCapacityPreemptionPolicyInterQueueWithDRF
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
-}
+
+ @Test
+ public void test3ResourceTypesInterQueuePreemption() throws IOException {
+ // Initialize resource map
+ String RESOURCE_1 = "res1";
+ riMap.put(RESOURCE_1, ResourceInformation
+ .newInstance(RESOURCE_1, "", 0, ResourceTypes.COUNTABLE, 0,
+ Integer.MAX_VALUE));
+
+ ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
+
+ /*
+ * root
+ * / \ \
+ * a b c
+ *
+ * A / B / C have 33.3 / 33.3 / 33.4 resources
+ * Total cluster resource have mem=30, cpu=18, GPU=6
+ * A uses mem=6, cpu=3, GPU=3
+ * B uses mem=6, cpu=3, GPU=3
+ * C is asking mem=1,cpu=1,GPU=1
+ *
+ * We expect it can preempt from one of the jobs
+ */
+ String labelsConfig = "=30:18:6,true;";
+ String nodesConfig = "n1= res=30:18:6;"; // n1 is default partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[30:18:6 30:18:6 12:12:6 1:1:1]);" + //root
+ "-a(=[10:7:2 10:6:3 6:6:3 0:0:0]);" + // a
+ "-b(=[10:6:2 10:6:3 6:6:3 0:0:0]);" + // b
+ "-c(=[10:5:2 10:6:2 0:0:0 1:1:1])"; // c
+ String appsConfig =
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t" // app1 in a1
+ + "(1,2:2:1,n1,,3,false);" + "b\t" // app2 in b2
+ + "(1,2:2:1,n1,,3,false)";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ verify(mDisp, times(0)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(1))));
+ verify(mDisp, times(1)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(2))));
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org