You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/07/02 20:32:39 UTC
[22/45] hadoop git commit: YARN-8379. Improve balancing resources in
already satisfied queues by using Capacity Scheduler preemption. Contributed
by Zian Chen.
YARN-8379. Improve balancing resources in already satisfied queues by using Capacity Scheduler preemption. Contributed by Zian Chen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/29119430
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/29119430
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/29119430
Branch: refs/heads/HDDS-4
Commit: 291194302cc1a875d6d94ea93cf1184a3f1fc2cc
Parents: 384764c
Author: Sunil G <su...@apache.org>
Authored: Thu Jun 28 10:23:31 2018 -0700
Committer: Sunil G <su...@apache.org>
Committed: Thu Jun 28 10:23:31 2018 -0700
----------------------------------------------------------------------
.../AbstractPreemptableResourceCalculator.java | 21 +-
.../CapacitySchedulerPreemptionContext.java | 2 +
.../CapacitySchedulerPreemptionUtils.java | 23 +-
.../capacity/FifoCandidatesSelector.java | 45 ++--
.../capacity/IntraQueueCandidatesSelector.java | 9 +-
.../capacity/PreemptableResourceCalculator.java | 7 +-
.../capacity/PreemptionCandidatesSelector.java | 11 +
.../ProportionalCapacityPreemptionPolicy.java | 129 +++++++---
...QueuePriorityContainerCandidateSelector.java | 16 +-
.../ReservedContainerCandidatesSelector.java | 16 +-
.../monitor/capacity/TempQueuePerPartition.java | 8 +-
.../CapacitySchedulerConfiguration.java | 17 ++
.../TestPreemptionForQueueWithPriorities.java | 58 +++++
...apacityPreemptionPolicyPreemptToBalance.java | 254 +++++++++++++++++++
...TestCapacitySchedulerSurgicalPreemption.java | 111 ++++++++
15 files changed, 637 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
index 64b3615..5b8360a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
@@ -42,6 +42,7 @@ public class AbstractPreemptableResourceCalculator {
protected final ResourceCalculator rc;
protected boolean isReservedPreemptionCandidatesSelector;
private Resource stepFactor;
+ private boolean allowQueuesBalanceAfterAllQueuesSatisfied;
static class TQComparator implements Comparator<TempQueuePerPartition> {
private ResourceCalculator rc;
@@ -83,15 +84,28 @@ public class AbstractPreemptableResourceCalculator {
* this will be set by different implementation of candidate
* selectors, please refer to TempQueuePerPartition#offer for
* details.
+ * @param allowQueuesBalanceAfterAllQueuesSatisfied
+ * Should resources be preempted from an over-served queue when the
+ * requesting queues are all at or over their guarantees?
+ * An example is, there're 10 queues under root, guaranteed resource
+ * of them are all 10%.
+ * Assume there're two queues are using resources, queueA uses 10%
+ * queueB uses 90%. For all queues are guaranteed, but it's not fair
+ * for queueA.
+ * We wanna make this behavior can be configured. By default it is
+ * not allowed.
+ *
*/
public AbstractPreemptableResourceCalculator(
CapacitySchedulerPreemptionContext preemptionContext,
- boolean isReservedPreemptionCandidatesSelector) {
+ boolean isReservedPreemptionCandidatesSelector,
+ boolean allowQueuesBalanceAfterAllQueuesSatisfied) {
context = preemptionContext;
rc = preemptionContext.getResourceCalculator();
this.isReservedPreemptionCandidatesSelector =
isReservedPreemptionCandidatesSelector;
-
+ this.allowQueuesBalanceAfterAllQueuesSatisfied =
+ allowQueuesBalanceAfterAllQueuesSatisfied;
stepFactor = Resource.newInstance(0, 0);
for (ResourceInformation ri : stepFactor.getResources()) {
ri.setValue(1);
@@ -193,7 +207,8 @@ public class AbstractPreemptableResourceCalculator {
wQavail = Resources.componentwiseMin(wQavail, unassigned);
Resource wQidle = sub.offer(wQavail, rc, totGuarant,
- isReservedPreemptionCandidatesSelector);
+ isReservedPreemptionCandidatesSelector,
+ allowQueuesBalanceAfterAllQueuesSatisfied);
Resource wQdone = Resources.subtract(wQavail, wQidle);
if (Resources.greaterThan(rc, totGuarant, wQdone, Resources.none())) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
index 098acdd..7985296 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
@@ -70,6 +70,8 @@ public interface CapacitySchedulerPreemptionContext {
float getMaxAllowableLimitForIntraQueuePreemption();
+ long getDefaultMaximumKillWaitTimeout();
+
@Unstable
IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
index 690eb02..ed50eff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
@@ -151,6 +151,7 @@ public class CapacitySchedulerPreemptionUtils {
Map<String, Resource> resourceToObtainByPartitions,
RMContainer rmContainer, Resource clusterResource,
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+ Map<ApplicationAttemptId, Set<RMContainer>> curCandidates,
Resource totalPreemptionAllowed, boolean conservativeDRF) {
ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId();
@@ -218,7 +219,7 @@ public class CapacitySchedulerPreemptionUtils {
}
// Add to preemptMap
- addToPreemptMap(preemptMap, attemptId, rmContainer);
+ addToPreemptMap(preemptMap, curCandidates, attemptId, rmContainer);
return true;
}
@@ -230,15 +231,23 @@ public class CapacitySchedulerPreemptionUtils {
return context.getScheduler().getSchedulerNode(nodeId).getPartition();
}
- private static void addToPreemptMap(
+ protected static void addToPreemptMap(
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+ Map<ApplicationAttemptId, Set<RMContainer>> curCandidates,
ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) {
- Set<RMContainer> set = preemptMap.get(appAttemptId);
- if (null == set) {
- set = new HashSet<>();
- preemptMap.put(appAttemptId, set);
+ Set<RMContainer> setForToPreempt = preemptMap.get(appAttemptId);
+ Set<RMContainer> setForCurCandidates = curCandidates.get(appAttemptId);
+ if (null == setForToPreempt) {
+ setForToPreempt = new HashSet<>();
+ preemptMap.put(appAttemptId, setForToPreempt);
}
- set.add(containerToPreempt);
+ setForToPreempt.add(containerToPreempt);
+
+ if (null == setForCurCandidates) {
+ setForCurCandidates = new HashSet<>();
+ curCandidates.put(appAttemptId, setForCurCandidates);
+ }
+ setForCurCandidates.add(containerToPreempt);
}
private static boolean preemptMapContains(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
index 3b2fcbb..c2735f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -42,19 +43,25 @@ public class FifoCandidatesSelector
private static final Log LOG =
LogFactory.getLog(FifoCandidatesSelector.class);
private PreemptableResourceCalculator preemptableAmountCalculator;
+ private boolean allowQueuesBalanceAfterAllQueuesSatisfied;
FifoCandidatesSelector(CapacitySchedulerPreemptionContext preemptionContext,
- boolean includeReservedResource) {
+ boolean includeReservedResource,
+ boolean allowQueuesBalanceAfterAllQueuesSatisfied) {
super(preemptionContext);
+ this.allowQueuesBalanceAfterAllQueuesSatisfied =
+ allowQueuesBalanceAfterAllQueuesSatisfied;
preemptableAmountCalculator = new PreemptableResourceCalculator(
- preemptionContext, includeReservedResource);
+ preemptionContext, includeReservedResource,
+ allowQueuesBalanceAfterAllQueuesSatisfied);
}
@Override
public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource, Resource totalPreemptionAllowed) {
+ Map<ApplicationAttemptId, Set<RMContainer>> curCandidates = new HashMap<>();
// Calculate how much resources we need to preempt
preemptableAmountCalculator.computeIdealAllocation(clusterResource,
totalPreemptionAllowed);
@@ -110,7 +117,7 @@ public class FifoCandidatesSelector
boolean preempted = CapacitySchedulerPreemptionUtils
.tryPreemptContainerAndDeductResToObtain(rc,
preemptionContext, resToObtainByPartition, c,
- clusterResource, selectedCandidates,
+ clusterResource, selectedCandidates, curCandidates,
totalPreemptionAllowed, false);
if (!preempted) {
continue;
@@ -134,7 +141,7 @@ public class FifoCandidatesSelector
preemptFrom(fc, clusterResource, resToObtainByPartition,
skippedAMContainerlist, skippedAMSize, selectedCandidates,
- totalPreemptionAllowed);
+ curCandidates, totalPreemptionAllowed);
}
// Can try preempting AMContainers (still saving atmost
@@ -145,15 +152,15 @@ public class FifoCandidatesSelector
leafQueue.getEffectiveCapacity(RMNodeLabelsManager.NO_LABEL),
leafQueue.getMaxAMResourcePerQueuePercent());
- preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist,
- resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue,
- totalPreemptionAllowed);
+ preemptAMContainers(clusterResource, selectedCandidates, curCandidates,
+ skippedAMContainerlist, resToObtainByPartition, skippedAMSize,
+ maxAMCapacityForThisQueue, totalPreemptionAllowed);
} finally {
leafQueue.getReadLock().unlock();
}
}
- return selectedCandidates;
+ return curCandidates;
}
/**
@@ -169,6 +176,7 @@ public class FifoCandidatesSelector
*/
private void preemptAMContainers(Resource clusterResource,
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+ Map<ApplicationAttemptId, Set<RMContainer>> curCandidates,
List<RMContainer> skippedAMContainerlist,
Map<String, Resource> resToObtainByPartition, Resource skippedAMSize,
Resource maxAMCapacityForThisQueue, Resource totalPreemptionAllowed) {
@@ -187,7 +195,7 @@ public class FifoCandidatesSelector
boolean preempted = CapacitySchedulerPreemptionUtils
.tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
resToObtainByPartition, c, clusterResource, preemptMap,
- totalPreemptionAllowed, false);
+ curCandidates, totalPreemptionAllowed, false);
if (preempted) {
Resources.subtractFrom(skippedAMSize, c.getAllocatedResource());
}
@@ -203,6 +211,7 @@ public class FifoCandidatesSelector
Resource clusterResource, Map<String, Resource> resToObtainByPartition,
List<RMContainer> skippedAMContainerlist, Resource skippedAMSize,
Map<ApplicationAttemptId, Set<RMContainer>> selectedContainers,
+ Map<ApplicationAttemptId, Set<RMContainer>> curCandidates,
Resource totalPreemptionAllowed) {
ApplicationAttemptId appId = app.getApplicationAttemptId();
@@ -219,9 +228,10 @@ public class FifoCandidatesSelector
}
// Try to preempt this container
- CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
- rc, preemptionContext, resToObtainByPartition, c, clusterResource,
- selectedContainers, totalPreemptionAllowed, false);
+ CapacitySchedulerPreemptionUtils
+ .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
+ resToObtainByPartition, c, clusterResource, selectedContainers,
+ curCandidates, totalPreemptionAllowed, false);
if (!preemptionContext.isObserveOnly()) {
preemptionContext.getRMContext().getDispatcher().getEventHandler()
@@ -262,9 +272,14 @@ public class FifoCandidatesSelector
}
// Try to preempt this container
- CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
- rc, preemptionContext, resToObtainByPartition, c, clusterResource,
- selectedContainers, totalPreemptionAllowed, false);
+ CapacitySchedulerPreemptionUtils
+ .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
+ resToObtainByPartition, c, clusterResource, selectedContainers,
+ curCandidates, totalPreemptionAllowed, false);
}
}
+
+ public boolean getAllowQueuesBalanceAfterAllQueuesSatisfied() {
+ return allowQueuesBalanceAfterAllQueuesSatisfied;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
index 8ab9507..c52fd95 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
@@ -122,7 +122,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource, Resource totalPreemptedResourceAllowed) {
-
+ Map<ApplicationAttemptId, Set<RMContainer>> curCandidates = new HashMap<>();
// 1. Calculate the abnormality within each queue one by one.
computeIntraQueuePreemptionDemand(
clusterResource, totalPreemptedResourceAllowed, selectedCandidates);
@@ -182,7 +182,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
leafQueue.getReadLock().lock();
for (FiCaSchedulerApp app : apps) {
preemptFromLeastStarvedApp(leafQueue, app, selectedCandidates,
- clusterResource, totalPreemptedResourceAllowed,
+ curCandidates, clusterResource, totalPreemptedResourceAllowed,
resToObtainByPartition, rollingResourceUsagePerUser);
}
} finally {
@@ -191,7 +191,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
}
}
- return selectedCandidates;
+ return curCandidates;
}
private void initializeUsageAndUserLimitForCompute(Resource clusterResource,
@@ -211,6 +211,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
private void preemptFromLeastStarvedApp(LeafQueue leafQueue,
FiCaSchedulerApp app,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+ Map<ApplicationAttemptId, Set<RMContainer>> curCandidates,
Resource clusterResource, Resource totalPreemptedResourceAllowed,
Map<String, Resource> resToObtainByPartition,
Map<String, Resource> rollingResourceUsagePerUser) {
@@ -270,7 +271,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
boolean ret = CapacitySchedulerPreemptionUtils
.tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
resToObtainByPartition, c, clusterResource, selectedCandidates,
- totalPreemptedResourceAllowed, true);
+ curCandidates, totalPreemptedResourceAllowed, true);
// Subtract from respective user's resource usage once a container is
// selected for preemption.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
index 08d834e..89a015e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
@@ -48,11 +48,14 @@ public class PreemptableResourceCalculator
* @param isReservedPreemptionCandidatesSelector this will be set by
* different implementation of candidate selectors, please refer to
* TempQueuePerPartition#offer for details.
+ * @param allowQueuesBalanceAfterAllQueuesSatisfied
*/
public PreemptableResourceCalculator(
CapacitySchedulerPreemptionContext preemptionContext,
- boolean isReservedPreemptionCandidatesSelector) {
- super(preemptionContext, isReservedPreemptionCandidatesSelector);
+ boolean isReservedPreemptionCandidatesSelector,
+ boolean allowQueuesBalanceAfterAllQueuesSatisfied) {
+ super(preemptionContext, isReservedPreemptionCandidatesSelector,
+ allowQueuesBalanceAfterAllQueuesSatisfied);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
index 4d8afaf..3c97364 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
@@ -34,6 +34,7 @@ import java.util.Set;
public abstract class PreemptionCandidatesSelector {
protected CapacitySchedulerPreemptionContext preemptionContext;
protected ResourceCalculator rc;
+ private long maximumKillWaitTime = -1;
PreemptionCandidatesSelector(
CapacitySchedulerPreemptionContext preemptionContext) {
@@ -77,4 +78,14 @@ public abstract class PreemptionCandidatesSelector {
});
}
+ public long getMaximumKillWaitTimeMs() {
+ if (maximumKillWaitTime > 0) {
+ return maximumKillWaitTime;
+ }
+ return preemptionContext.getDefaultMaximumKillWaitTimeout();
+ }
+
+ public void setMaximumKillWaitTime(long maximumKillWaitTime) {
+ this.maximumKillWaitTime = maximumKillWaitTime;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index cc69fba..036fd2f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -131,6 +131,8 @@ public class ProportionalCapacityPreemptionPolicy
private List<PreemptionCandidatesSelector> candidatesSelectionPolicies;
private Set<String> allPartitions;
private Set<String> leafQueueNames;
+ Map<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
+ Set<RMContainer>>> pcsMap;
// Preemptable Entities, synced from scheduler at every run
private Map<String, PreemptableQueue> preemptableQueues;
@@ -249,7 +251,21 @@ public class ProportionalCapacityPreemptionPolicy
// initialize candidates preemption selection policies
candidatesSelectionPolicies.add(new FifoCandidatesSelector(this,
- additionalPreemptionBasedOnReservedResource));
+ additionalPreemptionBasedOnReservedResource, false));
+
+ // Do we need to do preemption to balance queue even after queues get satisfied?
+ boolean isPreemptionToBalanceRequired = config.getBoolean(
+ CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
+ CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED);
+ long maximumKillWaitTimeForPreemptionToQueueBalance = config.getLong(
+ CapacitySchedulerConfiguration.MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION,
+ CapacitySchedulerConfiguration.DEFAULT_MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION);
+ if (isPreemptionToBalanceRequired) {
+ PreemptionCandidatesSelector selector = new FifoCandidatesSelector(this,
+ false, true);
+ selector.setMaximumKillWaitTime(maximumKillWaitTimeForPreemptionToQueueBalance);
+ candidatesSelectionPolicies.add(selector);
+ }
// Do we need to specially consider intra queue
boolean isIntraQueuePreemptionEnabled = config.getBoolean(
@@ -282,7 +298,8 @@ public class ProportionalCapacityPreemptionPolicy
"select_based_on_reserved_containers = " +
selectCandidatesForResevedContainers + "\n" +
"additional_res_balance_based_on_reserved_containers = " +
- additionalPreemptionBasedOnReservedResource);
+ additionalPreemptionBasedOnReservedResource + "\n" +
+ "Preemption-to-balance-queue-enabled = " + isPreemptionToBalanceRequired);
csConfig = config;
}
@@ -308,44 +325,60 @@ public class ProportionalCapacityPreemptionPolicy
}
private void preemptOrkillSelectedContainerAfterWait(
- Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
- long currentTime) {
+ Map<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
+ Set<RMContainer>>> toPreemptPerSelector, long currentTime) {
+ int toPreemptCount = 0;
+ for (Map<ApplicationAttemptId, Set<RMContainer>> containers :
+ toPreemptPerSelector.values()) {
+ toPreemptCount += containers.size();
+ }
if (LOG.isDebugEnabled()) {
LOG.debug(
"Starting to preempt containers for selectedCandidates and size:"
- + selectedCandidates.size());
+ + toPreemptCount);
}
// preempt (or kill) the selected containers
- for (Map.Entry<ApplicationAttemptId, Set<RMContainer>> e : selectedCandidates
+ // We need toPreemptPerSelector here to match list of containers to
+ // its selector so that we can get custom timeout per selector when
+ // checking if current container should be killed or not
+ for (Map.Entry<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
+ Set<RMContainer>>> pc : toPreemptPerSelector
.entrySet()) {
- ApplicationAttemptId appAttemptId = e.getKey();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Send to scheduler: in app=" + appAttemptId
- + " #containers-to-be-preemptionCandidates=" + e.getValue().size());
- }
- for (RMContainer container : e.getValue()) {
- // if we tried to preempt this for more than maxWaitTime
- if (preemptionCandidates.get(container) != null
- && preemptionCandidates.get(container)
- + maxWaitTime <= currentTime) {
- // kill it
- rmContext.getDispatcher().getEventHandler().handle(
- new ContainerPreemptEvent(appAttemptId, container,
- SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE));
- preemptionCandidates.remove(container);
- } else {
- if (preemptionCandidates.get(container) != null) {
- // We already updated the information to scheduler earlier, we need
- // not have to raise another event.
- continue;
+ Map<ApplicationAttemptId, Set<RMContainer>> cMap = pc.getValue();
+ if (cMap.size() > 0) {
+ for (Map.Entry<ApplicationAttemptId,
+ Set<RMContainer>> e : cMap.entrySet()) {
+ ApplicationAttemptId appAttemptId = e.getKey();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Send to scheduler: in app=" + appAttemptId
+ + " #containers-to-be-preemptionCandidates=" + e.getValue().size());
+ }
+ for (RMContainer container : e.getValue()) {
+ // if we tried to preempt this for more than maxWaitTime, this
+ // should be based on custom timeout per container per selector
+ if (preemptionCandidates.get(container) != null
+ && preemptionCandidates.get(container)
+ + pc.getKey().getMaximumKillWaitTimeMs() <= currentTime) {
+ // kill it
+ rmContext.getDispatcher().getEventHandler().handle(
+ new ContainerPreemptEvent(appAttemptId, container,
+ SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE));
+ preemptionCandidates.remove(container);
+ } else {
+ if (preemptionCandidates.get(container) != null) {
+ // We already updated the information to scheduler earlier, we need
+ // not have to raise another event.
+ continue;
+ }
+
+ //otherwise just send preemption events
+ rmContext.getDispatcher().getEventHandler().handle(
+ new ContainerPreemptEvent(appAttemptId, container,
+ SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION));
+ preemptionCandidates.put(container, currentTime);
+ }
}
-
- //otherwise just send preemption events
- rmContext.getDispatcher().getEventHandler().handle(
- new ContainerPreemptEvent(appAttemptId, container,
- SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION));
- preemptionCandidates.put(container, currentTime);
}
}
}
@@ -438,6 +471,8 @@ public class ProportionalCapacityPreemptionPolicy
// queue and each application
Map<ApplicationAttemptId, Set<RMContainer>> toPreempt =
new HashMap<>();
+ Map<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
+ Set<RMContainer>>> toPreemptPerSelector = new HashMap<>();;
for (PreemptionCandidatesSelector selector :
candidatesSelectionPolicies) {
long startTime = 0;
@@ -447,20 +482,27 @@ public class ProportionalCapacityPreemptionPolicy
selector.getClass().getName()));
startTime = clock.getTime();
}
- toPreempt = selector.selectCandidates(toPreempt,
- clusterResources, totalPreemptionAllowed);
+ Map<ApplicationAttemptId, Set<RMContainer>> curCandidates =
+ selector.selectCandidates(toPreempt, clusterResources,
+ totalPreemptionAllowed);
+ toPreemptPerSelector.putIfAbsent(selector, curCandidates);
if (LOG.isDebugEnabled()) {
LOG.debug(MessageFormat
.format("{0} uses {1} millisecond to run",
selector.getClass().getName(), clock.getTime() - startTime));
int totalSelected = 0;
+ int curSelected = 0;
for (Set<RMContainer> set : toPreempt.values()) {
totalSelected += set.size();
}
+ for (Set<RMContainer> set : curCandidates.values()) {
+ curSelected += set.size();
+ }
LOG.debug(MessageFormat
- .format("So far, total {0} containers selected to be preempted",
- totalSelected));
+ .format("So far, total {0} containers selected to be preempted, {1}"
+ + " containers selected this round\n",
+ totalSelected, curSelected));
}
}
@@ -483,8 +525,10 @@ public class ProportionalCapacityPreemptionPolicy
long currentTime = clock.getTime();
+ pcsMap = toPreemptPerSelector;
+
// preempt (or kill) the selected containers
- preemptOrkillSelectedContainerAfterWait(toPreempt, currentTime);
+ preemptOrkillSelectedContainerAfterWait(toPreemptPerSelector, currentTime);
// cleanup staled preemption candidates
cleanupStaledPreemptionCandidates(currentTime);
@@ -689,6 +733,12 @@ public class ProportionalCapacityPreemptionPolicy
return queueToPartitions;
}
+ @VisibleForTesting
+ Map<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
+ Set<RMContainer>>> getToPreemptCandidatesPerSelector() {
+ return pcsMap;
+ }
+
@Override
public int getClusterMaxApplicationPriority() {
return scheduler.getMaxClusterLevelAppPriority().getPriority();
@@ -730,4 +780,9 @@ public class ProportionalCapacityPreemptionPolicy
public IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy() {
return intraQueuePreemptionOrderPolicy;
}
+
+ @Override
+ public long getDefaultMaximumKillWaitTimeout() {
+ return maxWaitTime;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java
index 4a169af..78a9988 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java
@@ -380,6 +380,7 @@ public class QueuePriorityContainerCandidateSelector
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource,
Resource totalPreemptedResourceAllowed) {
+ Map<ApplicationAttemptId, Set<RMContainer>> curCandidates = new HashMap<>();
// Initialize digraph from queues
// TODO (wangda): only do this when queue refreshed.
priorityDigraph.clear();
@@ -388,7 +389,7 @@ public class QueuePriorityContainerCandidateSelector
// When all queues are set to same priority, or priority is not respected,
// direct return.
if (priorityDigraph.isEmpty()) {
- return selectedCandidates;
+ return curCandidates;
}
// Save parameters to be shared by other methods
@@ -478,13 +479,9 @@ public class QueuePriorityContainerCandidateSelector
.getReservedResource());
}
- Set<RMContainer> containers = selectedCandidates.get(
- c.getApplicationAttemptId());
- if (null == containers) {
- containers = new HashSet<>();
- selectedCandidates.put(c.getApplicationAttemptId(), containers);
- }
- containers.add(c);
+ // Add to preemptMap
+ CapacitySchedulerPreemptionUtils.addToPreemptMap(selectedCandidates,
+ curCandidates, c.getApplicationAttemptId(), c);
// Update totalPreemptionResourceAllowed
Resources.subtractFrom(totalPreemptedResourceAllowed,
@@ -504,7 +501,6 @@ public class QueuePriorityContainerCandidateSelector
}
}
}
-
- return selectedCandidates;
+ return curCandidates;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java
index ff100d9..bdb7e8c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java
@@ -31,7 +31,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -63,7 +62,7 @@ public class ReservedContainerCandidatesSelector
CapacitySchedulerPreemptionContext preemptionContext) {
super(preemptionContext);
preemptableAmountCalculator = new PreemptableResourceCalculator(
- preemptionContext, true);
+ preemptionContext, true, false);
}
@Override
@@ -71,6 +70,7 @@ public class ReservedContainerCandidatesSelector
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource,
Resource totalPreemptedResourceAllowed) {
+ Map<ApplicationAttemptId, Set<RMContainer>> curCandidates = new HashMap<>();
// Calculate how much resources we need to preempt
preemptableAmountCalculator.computeIdealAllocation(clusterResource,
totalPreemptedResourceAllowed);
@@ -101,14 +101,10 @@ public class ReservedContainerCandidatesSelector
selectedCandidates, totalPreemptedResourceAllowed, false);
if (null != preemptionResult) {
for (RMContainer c : preemptionResult.selectedContainers) {
- ApplicationAttemptId appId = c.getApplicationAttemptId();
- Set<RMContainer> containers = selectedCandidates.get(appId);
- if (null == containers) {
- containers = new HashSet<>();
- selectedCandidates.put(appId, containers);
- }
+ // Add to preemptMap
+ CapacitySchedulerPreemptionUtils.addToPreemptMap(selectedCandidates,
+ curCandidates, c.getApplicationAttemptId(), c);
- containers.add(c);
if (LOG.isDebugEnabled()) {
LOG.debug(this.getClass().getName() + " Marked container=" + c
.getContainerId() + " from queue=" + c.getQueueName()
@@ -118,7 +114,7 @@ public class ReservedContainerCandidatesSelector
}
}
- return selectedCandidates;
+ return curCandidates;
}
private Resource getPreemptableResource(String queueName,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
index 4214acc..4fb1862 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -138,7 +138,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
// This function "accepts" all the resources it can (pending) and return
// the unused ones
Resource offer(Resource avail, ResourceCalculator rc,
- Resource clusterResource, boolean considersReservedResource) {
+ Resource clusterResource, boolean considersReservedResource,
+ boolean allowQueueBalanceAfterAllSafisfied) {
Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(
Resources.subtract(getMax(), idealAssigned),
Resource.newInstance(0, 0));
@@ -179,7 +180,10 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
// leaf queues. Such under-utilized leaf queue could preemption resources
// from over-utilized leaf queue located at other hierarchies.
- accepted = filterByMaxDeductAssigned(rc, clusterResource, accepted);
+ // Allow queues can continue grow and balance even if all queues are satisfied.
+ if (!allowQueueBalanceAfterAllSafisfied) {
+ accepted = filterByMaxDeductAssigned(rc, clusterResource, accepted);
+ }
// accepted so far contains the "quota acceptable" amount, we now filter by
// locality acceptable
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 76eaac0..f94654e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -1460,6 +1460,23 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final String DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY = "userlimit_first";
/**
+ * Should we allow queues continue grow after all queue reaches their
+ * guaranteed capacity.
+ */
+ public static final String PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED =
+ PREEMPTION_CONFIG_PREFIX + "preemption-to-balance-queue-after-satisfied.enabled";
+ public static final boolean DEFAULT_PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED = false;
+
+ /**
+ * How long we will wait to balance queues, by default it is 5 mins.
+ */
+ public static final String MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION =
+ PREEMPTION_CONFIG_PREFIX + "preemption-to-balance-queue-after-satisfied.max-wait-before-kill";
+ public static final long
+ DEFAULT_MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION =
+ 300 * 1000;
+
+ /**
* Maximum application for a queue to be used when application per queue is
* not defined.To be consistent with previous version the default value is set
* as UNDEFINED.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
index 6a953cf..38c2a2a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@@ -538,4 +539,61 @@ public class TestPreemptionForQueueWithPriorities
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
+
+ @Test
+ public void testPriorityPreemptionForBalanceBetweenSatisfiedQueues()
+ throws IOException {
+ /**
+ * All queues are beyond guarantee, c has higher priority than b.
+ * c ask for more resource, and there is no idle left, c should preempt
+ * some resource from b but won’t let b under its guarantee.
+ *
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / | \
+ * a b c
+ * </pre>
+ *
+ * For priorities
+ * - a=1
+ * - b=1
+ * - c=2
+ *
+ */
+ String labelsConfig = "=100,true"; // default partition
+ String nodesConfig = "n1="; // only one node
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100 100 100 100]);" + //root
+ "-a(=[30 100 0 0]){priority=1};" + // a
+ "-b(=[30 100 40 50]){priority=1};" + // b
+ "-c(=[40 100 60 25]){priority=2}"; // c
+ String appsConfig =
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "b\t(1,1,n1,,40,false);" + // app1 in b
+ "c\t(1,1,n1,,60,false)"; // app2 in c
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ CapacitySchedulerConfiguration newConf =
+ new CapacitySchedulerConfiguration(conf);
+ boolean isPreemptionToBalanceRequired = true;
+ newConf.setBoolean(
+ CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
+ isPreemptionToBalanceRequired);
+ when(cs.getConfiguration()).thenReturn(newConf);
+ policy.editSchedule();
+
+ // IdealAssigned b: 30 c: 70. initIdealAssigned: b: 30 c: 40, even though
+ // b and c has same relativeAssigned=1.0f(idealAssigned / guaranteed),
+ // since c has higher priority, c will be put in mostUnderServedQueue and
+ // get all remain 30 capacity.
+ verify(mDisp, times(10)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(1))));
+ verify(mDisp, never()).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(2))));
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java
new file mode 100644
index 0000000..22e8f63
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class TestProportionalCapacityPreemptionPolicyPreemptToBalance
+ extends ProportionalCapacityPreemptionPolicyMockFramework {
+
+ @Test
+ public void testPreemptionToBalanceDisabled() throws IOException {
+ String labelsConfig = "=100,true"; // default partition
+ String nodesConfig = "n1="; // only one node
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100 100 100 100]);" + //root
+ "-a(=[30 100 10 30]);" + // a
+ "-b(=[30 100 40 30]);" + // b
+ "-c(=[30 100 50 30]);" + // c
+ "-d(=[10 100 0 0])"; // d
+ String appsConfig =
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t(1,1,n1,,10,false);" + // app1 in a
+ "b\t(1,1,n1,,40,false);" + // app2 in b
+ "c\t(1,1,n1,,50,false)"; // app3 in c
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // I_A: A:30 B:35 C:35, preempt 5 from B and 15 from C to A
+ verify(mDisp, times(5)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(2))));
+ verify(mDisp, times(15)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(3))));
+
+ assertEquals(30, policy.getQueuePartitions().get("a")
+ .get("").getIdealAssigned().getMemorySize());
+ assertEquals(35, policy.getQueuePartitions().get("b")
+ .get("").getIdealAssigned().getMemorySize());
+ assertEquals(35, policy.getQueuePartitions().get("c")
+ .get("").getIdealAssigned().getMemorySize());
+ }
+
+ @Test
+ public void testPreemptionToBalanceEnabled() throws IOException {
+ String labelsConfig = "=100,true"; // default partition
+ String nodesConfig = "n1="; // only one node
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100 100 100 100]);" + //root
+ "-a(=[30 100 10 30]);" + // a
+ "-b(=[30 100 40 30]);" + // b
+ "-c(=[30 100 50 30]);" + // c
+ "-d(=[10 100 0 0])"; // d
+ String appsConfig =
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t(1,1,n1,,10,false);" + // app1 in a
+ "b\t(1,1,n1,,40,false);" + // app2 in b
+ "c\t(1,1,n1,,50,false)"; // app3 in c
+
+ // enable preempt to balance and ideal assignment will change.
+ boolean isPreemptionToBalanceEnabled = true;
+ conf.setBoolean(
+ CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
+ isPreemptionToBalanceEnabled);
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // I_A: A:33 B:33 C:33, preempt 7 from B and 17 from C to A
+ verify(mDisp, times(7)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(2))));
+ verify(mDisp, times(17)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(3))));
+
+ assertEquals(33, policy.getQueuePartitions().get("a")
+ .get("").getIdealAssigned().getMemorySize());
+ assertEquals(33, policy.getQueuePartitions().get("b")
+ .get("").getIdealAssigned().getMemorySize());
+ assertEquals(33, policy.getQueuePartitions().get("c")
+ .get("").getIdealAssigned().getMemorySize());
+ }
+
+
+ @Test
+ public void testPreemptionToBalanceUsedPlusPendingLessThanGuaranteed()
+ throws IOException{
+ String labelsConfig = "=100,true"; // default partition
+ String nodesConfig = "n1="; // only one node
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100 100 100 100]);" + //root
+ "-a(=[30 100 10 6]);" + // a
+ "-b(=[30 100 40 30]);" + // b
+ "-c(=[30 100 50 30]);" + // c
+ "-d(=[10 100 0 0])"; // d
+ String appsConfig =
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t(1,1,n1,,10,false);" + // app1 in a
+ "b\t(1,1,n1,,40,false);" + // app2 in b
+ "c\t(1,1,n1,,50,false)"; // app3 in c
+
+ boolean isPreemptionToBalanceEnabled = true;
+ conf.setBoolean(
+ CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
+ isPreemptionToBalanceEnabled);
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // I_A: A:15 B:42 C:43, preempt 7 from B and 17 from C to A
+ verify(mDisp, times(8)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(3))));
+
+ assertEquals(16, policy.getQueuePartitions().get("a")
+ .get("").getIdealAssigned().getMemorySize());
+ assertEquals(42, policy.getQueuePartitions().get("b")
+ .get("").getIdealAssigned().getMemorySize());
+ assertEquals(42, policy.getQueuePartitions().get("c")
+ .get("").getIdealAssigned().getMemorySize());
+ }
+
+ @Test
+ public void testPreemptionToBalanceWithVcoreResource() throws IOException {
+ Logger.getRootLogger().setLevel(Level.DEBUG);
+ String labelsConfig = "=100:100,true"; // default partition
+ String nodesConfig = "n1="; // only one node
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100:100 100:100 100:100 120:140]);" + //root
+ "-a(=[60:60 100:100 40:40 70:40]);" + // a
+ "-b(=[40:40 100:100 60:60 50:100])"; // b
+
+ String appsConfig =
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t(1,1:1,n1,,40,false);" + // app1 in a
+ "b\t(1,1:1,n1,,60,false)"; // app2 in b
+
+ boolean isPreemptionToBalanceEnabled = true;
+ conf.setBoolean(
+ CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
+ isPreemptionToBalanceEnabled);
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, true);
+ policy.editSchedule();
+
+ // 21 containers will be preempted here
+ verify(mDisp, times(21)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.
+ IsPreemptionRequestFor(getAppAttemptId(2))));
+
+ assertEquals(60, policy.getQueuePartitions().get("a")
+ .get("").getIdealAssigned().getMemorySize());
+ assertEquals(60, policy.getQueuePartitions().get("a")
+ .get("").getIdealAssigned().getVirtualCores());
+ assertEquals(40, policy.getQueuePartitions().get("b")
+ .get("").getIdealAssigned().getMemorySize());
+ assertEquals(40, policy.getQueuePartitions().get("b")
+ .get("").getIdealAssigned().getVirtualCores());
+ }
+
+ @Test
+ public void testPreemptionToBalanceWithConfiguredTimeout() throws IOException {
+ Logger.getRootLogger().setLevel(Level.DEBUG);
+ String labelsConfig = "=100:100,true"; // default partition
+ String nodesConfig = "n1="; // only one node
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100:100 100:100 100:100 120:140]);" + //root
+ "-a(=[60:60 100:100 40:40 70:40]);" + // a
+ "-b(=[40:40 100:100 60:60 50:100])"; // b
+
+ String appsConfig =
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t(1,1:1,n1,,40,false);" + // app1 in a
+ "b\t(1,1:1,n1,,60,false)"; // app2 in b
+
+ boolean isPreemptionToBalanceEnabled = true;
+ conf.setBoolean(
+ CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
+ isPreemptionToBalanceEnabled);
+ final long FB_MAX_BEFORE_KILL = 60 *1000;
+ conf.setLong(
+ CapacitySchedulerConfiguration.MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION,
+ FB_MAX_BEFORE_KILL);
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, true);
+ policy.editSchedule();
+
+ Map<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
+ Set<RMContainer>>> pcps= policy.getToPreemptCandidatesPerSelector();
+
+ String FIFO_CANDIDATE_SELECTOR = "FifoCandidatesSelector";
+ boolean hasFifoSelector = false;
+ for (Map.Entry<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
+ Set<RMContainer>>> pc : pcps.entrySet()) {
+ if (pc.getKey().getClass().getSimpleName().equals(FIFO_CANDIDATE_SELECTOR)) {
+ FifoCandidatesSelector pcs = (FifoCandidatesSelector) pc.getKey();
+ if (pcs.getAllowQueuesBalanceAfterAllQueuesSatisfied() == true) {
+ hasFifoSelector = true;
+ assertEquals(pcs.getMaximumKillWaitTimeMs(), FB_MAX_BEFORE_KILL);
+ }
+ }
+ }
+
+ assertEquals(hasFifoSelector, true);
+
+ // 21 containers will be preempted here
+ verify(mDisp, times(21)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.
+ IsPreemptionRequestFor(getAppAttemptId(2))));
+
+ assertEquals(60, policy.getQueuePartitions().get("a")
+ .get("").getIdealAssigned().getMemorySize());
+ assertEquals(60, policy.getQueuePartitions().get("a")
+ .get("").getIdealAssigned().getVirtualCores());
+ assertEquals(40, policy.getQueuePartitions().get("b")
+ .get("").getIdealAssigned().getMemorySize());
+ assertEquals(40, policy.getQueuePartitions().get("b")
+ .get("").getIdealAssigned().getVirtualCores());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
index 2aff82d..800789a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
@@ -1111,5 +1111,116 @@ public class TestCapacitySchedulerSurgicalPreemption
rm1.close();
}
+ @Test(timeout = 600000)
+ public void testPreemptionToBalanceWithCustomTimeout() throws Exception {
+ /**
+ * Test case: Submit two application (app1/app2) to different queues, queue
+ * structure:
+ *
+ * <pre>
+ * Root
+ * / | \
+ * a b c
+ * 10 20 70
+ * </pre>
+ *
+ * 1) Two nodes (n1/n2) in the cluster, each of them has 20G.
+ *
+ * 2) app1 submit to queue-b, asks for 1G * 5
+ *
+ * 3) app2 submit to queue-c, ask for one 4G container (for AM)
+ *
+ * After preemption, we should expect:
+ * 1. Preempt 4 containers from app1
+ * 2. the selected containers will be killed after configured timeout.
+ * 3. AM of app2 successfully allocated.
+ */
+ conf.setBoolean(
+ CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
+ true);
+ conf.setLong(
+ CapacitySchedulerConfiguration.MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION,
+ 20*1000);
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(
+ this.conf);
+
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+
+ MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 20 * GB);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 38, new ArrayList<ContainerId>());
+
+ // Do allocation for node1/node2
+ for (int i = 0; i < 38; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+
+ // App1 should have 39 containers now
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(39, schedulerApp1.getLiveContainers().size());
+ // 20 from n1 and 19 from n2
+ waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
+ am1.getApplicationAttemptId(), 20);
+ waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
+ am1.getApplicationAttemptId(), 19);
+
+
+ // Submit app2 to queue-c and asks for a 4G container for AM
+ RMApp app2 = rm1.submitApp(4 * GB, "app", "user", null, "c");
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+ ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
+
+ // Call editSchedule: containers are selected to be preemption candidate
+ SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
+ getResourceScheduler()).getSchedulingMonitorManager();
+ SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
+ ProportionalCapacityPreemptionPolicy editPolicy =
+ (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
+ editPolicy.editSchedule();
+ Assert.assertEquals(4, editPolicy.getToPreemptContainers().size());
+
+ // check live containers immediately, nothing happen
+ Assert.assertEquals(39, schedulerApp1.getLiveContainers().size());
+
+ Thread.sleep(20*1000);
+ // Call editSchedule again: selected containers are killed
+ editPolicy.editSchedule();
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 35);
+
+ // Call allocation, containers are reserved
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ waitNumberOfReservedContainersFromApp(schedulerApp2, 1);
+
+ // Call editSchedule twice and allocation once, container should get allocated
+ editPolicy.editSchedule();
+ editPolicy.editSchedule();
+
+ int tick = 0;
+ while (schedulerApp2.getLiveContainers().size() != 1 && tick < 10) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ tick++;
+ Thread.sleep(100);
+ }
+ waitNumberOfReservedContainersFromApp(schedulerApp2, 0);
+
+ rm1.close();
+
+
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org