You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2020/06/19 12:50:44 UTC
[hadoop] branch trunk updated: YARN-9930. Support max running app
logic for CapacityScheduler. Contributed by Peter Bacsko
This is an automated email from the ASF dual-hosted git repository.
snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4698414 YARN-9930. Support max running app logic for CapacityScheduler. Contributed by Peter Bacsko
4698414 is described below
commit 469841446f921f3da5bbd96cf83b3a808dde8084
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Fri Jun 19 14:50:24 2020 +0200
YARN-9930. Support max running app logic for CapacityScheduler. Contributed by Peter Bacsko
---
.../scheduler/capacity/AbstractCSQueue.java | 16 +
.../capacity/CSMaxRunningAppsEnforcer.java | 436 +++++++++++++++++++++
.../scheduler/capacity/CapacityScheduler.java | 19 +-
.../capacity/CapacitySchedulerConfiguration.java | 35 +-
.../scheduler/capacity/LeafQueue.java | 81 +++-
.../scheduler/capacity/ParentQueue.java | 30 ++
.../scheduler/common/fica/FiCaSchedulerApp.java | 21 +
.../reservation/TestReservationSystem.java | 4 +
.../scheduler/capacity/TestApplicationLimits.java | 1 +
.../capacity/TestCSMaxRunningAppsEnforcer.java | 278 +++++++++++++
.../TestCapacitySchedulerMaxParallelApps.java | 312 +++++++++++++++
.../scheduler/capacity/TestLeafQueue.java | 2 +
.../scheduler/capacity/TestQueueState.java | 1 +
.../scheduler/capacity/TestQueueStateManager.java | 1 +
14 files changed, 1229 insertions(+), 8 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 0a4a14f..968d971 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -146,6 +146,7 @@ public abstract class AbstractCSQueue implements CSQueue {
volatile Priority priority = Priority.newInstance(0);
private Map<String, Float> userWeights = new HashMap<String, Float>();
+ private int maxParallelApps;
public AbstractCSQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
@@ -390,6 +391,11 @@ public abstract class AbstractCSQueue implements CSQueue {
// and queue setting
setupMaximumAllocation(configuration);
+ // Max parallel apps
+ int queueMaxParallelApps =
+ configuration.getMaxParallelAppsForQueue(getQueuePath());
+ setMaxParallelApps(queueMaxParallelApps);
+
// initialized the queue state based on previous state, configured state
// and its parent state.
QueueState previous = getState();
@@ -1431,4 +1437,14 @@ public abstract class AbstractCSQueue implements CSQueue {
public boolean getDefaultAppLifetimeWasSpecifiedInConfig() {
return defaultAppLifetimeWasSpecifiedInConfig;
}
+
+ public void setMaxParallelApps(int maxParallelApps) {
+ this.maxParallelApps = maxParallelApps;
+ }
+
+ public int getMaxParallelApps() {
+ return maxParallelApps;
+ }
+
+ abstract int getNumRunnableApps();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSMaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSMaxRunningAppsEnforcer.java
new file mode 100644
index 0000000..d1a62b4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSMaxRunningAppsEnforcer.java
@@ -0,0 +1,436 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+
+/**
+ * Handles tracking and enforcement for user and queue maxRunningApps
+ * constraints.
+ */
+public class CSMaxRunningAppsEnforcer {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ CSMaxRunningAppsEnforcer.class);
+
+ private final CapacityScheduler scheduler;
+
+ // Tracks the number of running applications by user.
+ private final Map<String, Integer> usersNumRunnableApps;
+
+ private final ListMultimap<String, FiCaSchedulerApp> usersNonRunnableApps;
+
+ public CSMaxRunningAppsEnforcer(CapacityScheduler scheduler) {
+ this.scheduler = scheduler;
+ this.usersNumRunnableApps = new HashMap<String, Integer>();
+ this.usersNonRunnableApps = ArrayListMultimap.create();
+ }
+
+ /**
+ * Checks whether making the application runnable would exceed any
+ * maxRunningApps limits. Also sets the "runnable" flag on the
+ * attempt.
+ *
+ * @param attempt the app attempt being checked
+ * @return true if the application is runnable; false otherwise
+ */
+ public boolean checkRunnabilityWithUpdate(
+ FiCaSchedulerApp attempt) {
+ boolean attemptCanRun = !exceedUserMaxParallelApps(attempt.getUser())
+ && !exceedQueueMaxParallelApps(attempt.getCSLeafQueue());
+
+ attempt.setRunnable(attemptCanRun);
+
+ return attemptCanRun;
+ }
+
+ /**
+ * Checks whether the number of user runnable apps exceeds the limitation.
+ *
+ * @param user the user name
+ * @return true if the number hits the limit; false otherwise
+ */
+ private boolean exceedUserMaxParallelApps(String user) {
+ Integer userNumRunnable = usersNumRunnableApps.get(user);
+ if (userNumRunnable == null) {
+ userNumRunnable = 0;
+ }
+ if (userNumRunnable >= getUserMaxParallelApps(user)) {
+ LOG.info("Maximum runnable apps exceeded for user {}", user);
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Recursively checks whether the number of queue runnable apps exceeds the
+ * limitation.
+ *
+ * @param queue the current queue
+ * @return true if the number hits the limit; false otherwise
+ */
+ private boolean exceedQueueMaxParallelApps(AbstractCSQueue queue) {
+ // Check queue and all parent queues
+ while (queue != null) {
+ if (queue.getNumRunnableApps() >= queue.getMaxParallelApps()) {
+ LOG.info("Maximum runnable apps exceeded for queue {}",
+ queue.getQueuePath());
+ return true;
+ }
+ queue = (AbstractCSQueue) queue.getParent();
+ }
+
+ return false;
+ }
+
+ public void trackApp(FiCaSchedulerApp app) {
+ if (app.isRunnable()) {
+ trackRunnableApp(app);
+ } else {
+ trackNonRunnableApp(app);
+ }
+ }
+ /**
+ * Tracks the given new runnable app for purposes of maintaining max running
+ * app limits.
+ */
+ private void trackRunnableApp(FiCaSchedulerApp app) {
+ String user = app.getUser();
+ AbstractCSQueue queue = (AbstractCSQueue) app.getQueue();
+ // Increment running counts for all parent queues
+ ParentQueue parent = (ParentQueue) queue.getParent();
+ while (parent != null) {
+ parent.incrementRunnableApps();
+ parent = (ParentQueue) parent.getParent();
+ }
+
+ Integer userNumRunnable = usersNumRunnableApps.get(user);
+ usersNumRunnableApps.put(user, (userNumRunnable == null ? 0
+ : userNumRunnable) + 1);
+ }
+
+ /**
+ * Tracks the given new non runnable app so that it can be made runnable when
+ * it would not violate max running app limits.
+ */
+ private void trackNonRunnableApp(FiCaSchedulerApp app) {
+ String user = app.getUser();
+ usersNonRunnableApps.put(user, app);
+ }
+
+ /**
+ * This is called after reloading the allocation configuration when the
+ * scheduler is reinitialized
+ *
+ * Checks to see whether any non-runnable applications become runnable
+ * now that the max running apps of given queue has been changed
+ *
+ * Runs in O(n) where n is the number of apps that are non-runnable and in
+ * the queues that went from having no slack to having slack.
+ */
+
+ public void updateRunnabilityOnReload() {
+ ParentQueue rootQueue = (ParentQueue) scheduler.getRootQueue();
+ List<List<FiCaSchedulerApp>> appsNowMaybeRunnable =
+ new ArrayList<List<FiCaSchedulerApp>>();
+
+ gatherPossiblyRunnableAppLists(rootQueue, appsNowMaybeRunnable);
+
+ updateAppsRunnability(appsNowMaybeRunnable, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Checks to see whether any other applications runnable now that the given
+ * application has been removed from the given queue. And makes them so.
+ *
+ * Runs in O(n log(n)) where n is the number of queues that are under the
+ * highest queue that went from having no slack to having slack.
+ */
+ public void updateRunnabilityOnAppRemoval(FiCaSchedulerApp app) {
+ // childqueueX might have no pending apps itself, but if a queue higher up
+ // in the hierarchy parentqueueY has a maxRunningApps set, an app completion
+ // in childqueueX could allow an app in some other distant child of
+ // parentqueueY to become runnable.
+ // An app removal will only possibly allow another app to become runnable if
+ // the queue was already at its max before the removal.
+ // Thus we find the ancestor queue highest in the tree for which the app
+ // that was at its maxRunningApps before the removal.
+ LeafQueue queue = app.getCSLeafQueue();
+ AbstractCSQueue highestQueueWithAppsNowRunnable =
+ (queue.getNumRunnableApps() == queue.getMaxParallelApps() - 1)
+ ? queue : null;
+
+ ParentQueue parent = (ParentQueue) queue.getParent();
+ while (parent != null) {
+ if (parent.getNumRunnableApps() == parent.getMaxParallelApps() - 1) {
+ highestQueueWithAppsNowRunnable = parent;
+ }
+ parent = (ParentQueue) parent.getParent();
+ }
+
+ List<List<FiCaSchedulerApp>> appsNowMaybeRunnable =
+ new ArrayList<List<FiCaSchedulerApp>>();
+
+ // Compile lists of apps which may now be runnable
+ // We gather lists instead of building a set of all non-runnable apps so
+ // that this whole operation can be O(number of queues) instead of
+ // O(number of apps)
+ if (highestQueueWithAppsNowRunnable != null) {
+ gatherPossiblyRunnableAppLists(highestQueueWithAppsNowRunnable,
+ appsNowMaybeRunnable);
+ }
+ String user = app.getUser();
+ Integer userNumRunning = usersNumRunnableApps.get(user);
+ if (userNumRunning == null) {
+ userNumRunning = 0;
+ }
+ if (userNumRunning == getUserMaxParallelApps(user) - 1) {
+ List<FiCaSchedulerApp> userWaitingApps = usersNonRunnableApps.get(user);
+ if (userWaitingApps != null) {
+ appsNowMaybeRunnable.add(userWaitingApps);
+ }
+ }
+
+ updateAppsRunnability(appsNowMaybeRunnable,
+ appsNowMaybeRunnable.size());
+ }
+
+ /**
+ * Checks to see whether applications are runnable now by iterating
+ * through each one of them and check if the queue and user have slack.
+ *
+ * if we know how many apps can be runnable, there is no need to iterate
+ * through all apps, maxRunnableApps is used to break out of the iteration.
+ */
+ private void updateAppsRunnability(List<List<FiCaSchedulerApp>>
+ appsNowMaybeRunnable, int maxRunnableApps) {
+ // Scan through and check whether this means that any apps are now runnable
+ Iterator<FiCaSchedulerApp> iter = new MultiListStartTimeIterator(
+ appsNowMaybeRunnable);
+ FiCaSchedulerApp prev = null;
+ List<FiCaSchedulerApp> noLongerPendingApps = new ArrayList<>();
+ while (iter.hasNext()) {
+ FiCaSchedulerApp next = iter.next();
+ if (next == prev) {
+ continue;
+ }
+
+ if (checkRunnabilityWithUpdate(next)) {
+ LeafQueue nextQueue = next.getCSLeafQueue();
+ LOG.info("{} is now runnable in {}",
+ next.getApplicationAttemptId(), nextQueue);
+ trackRunnableApp(next);
+ FiCaSchedulerApp appSched = next;
+ nextQueue.submitApplicationAttempt(next, next.getUser());
+ noLongerPendingApps.add(appSched);
+
+ if (noLongerPendingApps.size() >= maxRunnableApps) {
+ break;
+ }
+ }
+
+ prev = next;
+ }
+
+ // We remove the apps from their pending lists afterwards so that we don't
+ // pull them out from under the iterator. If they are not in these lists
+ // in the first place, there is a bug.
+ for (FiCaSchedulerApp appSched : noLongerPendingApps) {
+ if (!(appSched.getCSLeafQueue().removeNonRunnableApp(appSched))) {
+ LOG.error("Can't make app runnable that does not already exist in queue"
+ + " as non-runnable: {}. This should never happen.",
+ appSched.getApplicationAttemptId());
+ }
+
+ if (!usersNonRunnableApps.remove(appSched.getUser(), appSched)) {
+ LOG.error("Waiting app {} expected to be in "
+ + "usersNonRunnableApps, but was not. This should never happen.",
+ appSched.getApplicationAttemptId());
+ }
+ }
+ }
+
+ public void untrackApp(FiCaSchedulerApp app) {
+ if (app.isRunnable()) {
+ untrackRunnableApp(app);
+ } else {
+ untrackNonRunnableApp(app);
+ }
+ }
+
+ /**
+ * Updates the relevant tracking variables after a runnable app with the given
+ * queue and user has been removed.
+ */
+ private void untrackRunnableApp(FiCaSchedulerApp app) {
+ // Update usersRunnableApps
+ String user = app.getUser();
+ int newUserNumRunning = usersNumRunnableApps.get(user) - 1;
+ if (newUserNumRunning == 0) {
+ usersNumRunnableApps.remove(user);
+ } else {
+ usersNumRunnableApps.put(user, newUserNumRunning);
+ }
+
+ // Update runnable app bookkeeping for queues
+ AbstractCSQueue queue = (AbstractCSQueue) app.getQueue();
+ ParentQueue parent = (ParentQueue) queue.getParent();
+ while (parent != null) {
+ parent.decrementRunnableApps();
+ parent = (ParentQueue) parent.getParent();
+ }
+ }
+
+ /**
+ * Stops tracking the given non-runnable app.
+ */
+ private void untrackNonRunnableApp(FiCaSchedulerApp app) {
+ usersNonRunnableApps.remove(app.getUser(), app);
+ }
+
+ /**
+ * Traverses the queue hierarchy under the given queue to gather all lists
+ * of non-runnable applications.
+ */
+ private void gatherPossiblyRunnableAppLists(AbstractCSQueue queue,
+ List<List<FiCaSchedulerApp>> appLists) {
+ if (queue.getNumRunnableApps() < queue.getMaxParallelApps()) {
+ if (queue instanceof LeafQueue) {
+ appLists.add(
+ ((LeafQueue)queue).getCopyOfNonRunnableAppSchedulables());
+ } else {
+ for (CSQueue child : queue.getChildQueues()) {
+ gatherPossiblyRunnableAppLists((AbstractCSQueue) child, appLists);
+ }
+ }
+ }
+ }
+
+ private int getUserMaxParallelApps(String user) {
+ CapacitySchedulerConfiguration conf = scheduler.getConfiguration();
+ if (conf == null) {
+ return Integer.MAX_VALUE;
+ }
+
+ int userMaxParallelApps = conf.getMaxParallelAppsForUser(user);
+
+ return userMaxParallelApps;
+ }
+
+ /**
+ * Takes a list of lists, each of which is ordered by start time, and returns
+ * their elements in order of start time.
+ *
+ * We maintain positions in each of the lists. Each next() call advances
+ * the position in one of the lists. We maintain a heap that orders lists
+ * by the start time of the app in the current position in that list.
+ * This allows us to pick which list to advance in O(log(num lists)) instead
+ * of O(num lists) time.
+ */
+ static class MultiListStartTimeIterator implements
+ Iterator<FiCaSchedulerApp> {
+
+ private List<FiCaSchedulerApp>[] appLists;
+ private int[] curPositionsInAppLists;
+ private PriorityQueue<IndexAndTime> appListsByCurStartTime;
+
+ @SuppressWarnings("unchecked")
+ MultiListStartTimeIterator(List<List<FiCaSchedulerApp>> appListList) {
+ appLists = appListList.toArray(new List[appListList.size()]);
+ curPositionsInAppLists = new int[appLists.length];
+ appListsByCurStartTime = new PriorityQueue<IndexAndTime>();
+ for (int i = 0; i < appLists.length; i++) {
+ long time = appLists[i].isEmpty() ? Long.MAX_VALUE : appLists[i].get(0)
+ .getStartTime();
+ appListsByCurStartTime.add(new IndexAndTime(i, time));
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !appListsByCurStartTime.isEmpty()
+ && appListsByCurStartTime.peek().time != Long.MAX_VALUE;
+ }
+
+ @Override
+ public FiCaSchedulerApp next() {
+ IndexAndTime indexAndTime = appListsByCurStartTime.remove();
+ int nextListIndex = indexAndTime.index;
+ FiCaSchedulerApp next = appLists[nextListIndex]
+ .get(curPositionsInAppLists[nextListIndex]);
+ curPositionsInAppLists[nextListIndex]++;
+
+ if (curPositionsInAppLists[nextListIndex] <
+ appLists[nextListIndex].size()) {
+ indexAndTime.time = appLists[nextListIndex]
+ .get(curPositionsInAppLists[nextListIndex]).getStartTime();
+ } else {
+ indexAndTime.time = Long.MAX_VALUE;
+ }
+ appListsByCurStartTime.add(indexAndTime);
+
+ return next;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Remove not supported");
+ }
+
+ private static class IndexAndTime implements Comparable<IndexAndTime> {
+ private int index;
+ private long time;
+
+ IndexAndTime(int index, long time) {
+ this.index = index;
+ this.time = time;
+ }
+
+ @Override
+ public int compareTo(IndexAndTime o) {
+ return time < o.time ? -1 : (time > o.time ? 1 : 0);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof IndexAndTime)) {
+ return false;
+ }
+ IndexAndTime other = (IndexAndTime)o;
+ return other.time == time;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int)time;
+ }
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index a6aa824..bd2acd7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -242,8 +242,11 @@ public class CapacityScheduler extends
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
private long asyncMaxPendingBacklogs;
+ private CSMaxRunningAppsEnforcer maxRunningEnforcer;
+
public CapacityScheduler() {
super(CapacityScheduler.class.getName());
+ this.maxRunningEnforcer = new CSMaxRunningAppsEnforcer(this);
}
@Override
@@ -483,6 +486,7 @@ public class CapacityScheduler extends
super.reinitialize(newConf, rmContext);
}
+ maxRunningEnforcer.updateRunnabilityOnReload();
} finally {
writeLock.unlock();
}
@@ -1083,6 +1087,9 @@ public class CapacityScheduler extends
// SchedulerApplication#setCurrentAppAttempt.
attempt.setPriority(application.getPriority());
+ maxRunningEnforcer.checkRunnabilityWithUpdate(attempt);
+ maxRunningEnforcer.trackApp(attempt);
+
queue.submitApplicationAttempt(attempt, application.getUser());
LOG.info("Added Application Attempt " + applicationAttemptId
+ " to scheduler from user " + application.getUser() + " in queue "
@@ -1176,8 +1183,13 @@ public class CapacityScheduler extends
LOG.error(
"Cannot finish application " + "from non-leaf queue: "
+ csQueue.getQueuePath());
- } else{
+ } else {
csQueue.finishApplicationAttempt(attempt, csQueue.getQueuePath());
+
+ maxRunningEnforcer.untrackApp(attempt);
+ if (attempt.isRunnable()) {
+ maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt);
+ }
}
} finally {
writeLock.unlock();
@@ -3253,4 +3265,9 @@ public class CapacityScheduler extends
public int getNumAsyncSchedulerThreads() {
return asyncSchedulerThreads == null ? 0 : asyncSchedulerThreads.size();
}
+
+ @VisibleForTesting
+ public void setMaxRunningAppsEnforcer(CSMaxRunningAppsEnforcer enforcer) {
+ this.maxRunningEnforcer = enforcer;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 496dd0b..3bebb44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -378,6 +378,10 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final Pattern RESOURCE_PATTERN = Pattern.compile(PATTERN_FOR_ABSOLUTE_RESOURCE);
+ public static final String MAX_PARALLEL_APPLICATIONS = "max-parallel-apps";
+
+ public static final int DEFAULT_MAX_PARALLEL_APPLICATIONS = Integer.MAX_VALUE;
+
/**
* Different resource types supported.
*/
@@ -412,7 +416,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
String queueName = PREFIX + queue + DOT + ORDERING_POLICY + DOT;
return queueName;
}
-
+
+ static String getUserPrefix(String user) {
+ return PREFIX + "user." + user + DOT;
+ }
+
private String getNodeLabelPrefix(String queue, String label) {
if (label.equals(CommonNodeLabelsManager.NO_LABEL)) {
return getQueuePrefix(queue);
@@ -1392,6 +1400,31 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
return conf.getBoolean(APP_FAIL_FAST, DEFAULT_APP_FAIL_FAST);
}
+ public Integer getMaxParallelAppsForQueue(String queue) {
+ int defaultMaxParallelAppsForQueue =
+ getInt(PREFIX + MAX_PARALLEL_APPLICATIONS,
+ DEFAULT_MAX_PARALLEL_APPLICATIONS);
+
+ String maxParallelAppsForQueue = get(getQueuePrefix(queue)
+ + MAX_PARALLEL_APPLICATIONS);
+
+ return (maxParallelAppsForQueue != null) ?
+ Integer.parseInt(maxParallelAppsForQueue)
+ : defaultMaxParallelAppsForQueue;
+ }
+
+ public Integer getMaxParallelAppsForUser(String user) {
+ int defaultMaxParallelAppsForUser =
+ getInt(PREFIX + "user." + MAX_PARALLEL_APPLICATIONS,
+ DEFAULT_MAX_PARALLEL_APPLICATIONS);
+ String maxParallelAppsForUser = get(getUserPrefix(user)
+ + MAX_PARALLEL_APPLICATIONS);
+
+ return (maxParallelAppsForUser != null) ?
+ Integer.parseInt(maxParallelAppsForUser)
+ : defaultMaxParallelAppsForUser;
+ }
+
private static final String PREEMPTION_CONFIG_PREFIX =
"yarn.resourcemanager.monitor.capacity.preemption.";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 9f0caf2..4d83538 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -129,6 +129,9 @@ public class LeafQueue extends AbstractCSQueue {
List<AppPriorityACLGroup> priorityAcls =
new ArrayList<AppPriorityACLGroup>();
+ private final List<FiCaSchedulerApp> runnableApps = new ArrayList<>();
+ private final List<FiCaSchedulerApp> nonRunnableApps = new ArrayList<>();
+
@SuppressWarnings({ "unchecked", "rawtypes" })
public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
@@ -159,6 +162,7 @@ public class LeafQueue extends AbstractCSQueue {
setupQueueConfigs(clusterResource, csContext.getConfiguration());
}
+ @SuppressWarnings("checkstyle:nowhitespaceafter")
protected void setupQueueConfigs(Resource clusterResource,
CapacitySchedulerConfiguration conf) throws
IOException {
@@ -289,7 +293,9 @@ public class LeafQueue extends AbstractCSQueue {
+ " (int)(configuredMaximumSystemApplications * absoluteCapacity)]"
+ "\n" + "maxApplicationsPerUser = " + maxApplicationsPerUser
+ " [= (int)(maxApplications * (userLimit / 100.0f) * "
- + "userLimitFactor) ]" + "\n" + "usedCapacity = "
+ + "userLimitFactor) ]" + "\n"
+ + "maxParallelApps = " + getMaxParallelApps() + "\n"
+ + "usedCapacity = " +
+ queueCapacities.getUsedCapacity() + " [= usedResourcesMemory / "
+ "(clusterResourceMemory * absoluteCapacity)]" + "\n"
+ "absoluteUsedCapacity = " + absoluteUsedCapacity
@@ -386,7 +392,8 @@ public class LeafQueue extends AbstractCSQueue {
public int getNumApplications() {
readLock.lock();
try {
- return getNumPendingApplications() + getNumActiveApplications();
+ return getNumPendingApplications() + getNumActiveApplications() +
+ getNumNonRunnableApps();
} finally {
readLock.unlock();
}
@@ -887,16 +894,28 @@ public class LeafQueue extends AbstractCSQueue {
writeLock.unlock();
}
}
-
+
private void addApplicationAttempt(FiCaSchedulerApp application,
User user) {
writeLock.lock();
try {
+ applicationAttemptMap.put(application.getApplicationAttemptId(),
+ application);
+
+ if (application.isRunnable()) {
+ runnableApps.add(application);
+ LOG.debug("Adding runnable application: {}",
+ application.getApplicationAttemptId());
+ } else {
+ nonRunnableApps.add(application);
+ LOG.info("Application attempt {} is not runnable,"
+ + " parallel limit reached", application.getApplicationAttemptId());
+ return;
+ }
+
// Accept
user.submitApplication();
getPendingAppsOrderingPolicy().addSchedulableEntity(application);
- applicationAttemptMap.put(application.getApplicationAttemptId(),
- application);
// Activate applications
if (Resources.greaterThan(resourceCalculator, lastClusterResource,
@@ -917,7 +936,9 @@ public class LeafQueue extends AbstractCSQueue {
.getPendingApplications() + " #user-active-applications: " + user
.getActiveApplications() + " #queue-pending-applications: "
+ getNumPendingApplications() + " #queue-active-applications: "
- + getNumActiveApplications());
+ + getNumActiveApplications()
+ + " #queue-nonrunnable-applications: "
+ + getNumNonRunnableApps());
} finally {
writeLock.unlock();
}
@@ -950,6 +971,15 @@ public class LeafQueue extends AbstractCSQueue {
// which is caused by wrong invoking order, will fix UT separately
User user = usersManager.getUserAndAddIfAbsent(userName);
+ boolean runnable = runnableApps.remove(application);
+ if (!runnable) {
+ // removeNonRunnableApp acquires the write lock again, which is fine
+ if (!removeNonRunnableApp(application)) {
+ LOG.error("Given app to remove " + application +
+ " does not exist in queue " + getQueuePath());
+ }
+ }
+
String partitionName = application.getAppAMNodePartitionName();
boolean wasActive = orderingPolicy.removeSchedulableEntity(application);
if (!wasActive) {
@@ -2229,4 +2259,43 @@ public class LeafQueue extends AbstractCSQueue {
usedSeconds);
metrics.updatePreemptedForCustomResources(containerResource);
}
+
+ @Override
+ int getNumRunnableApps() {
+ readLock.lock();
+ try {
+ return runnableApps.size();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ int getNumNonRunnableApps() {
+ readLock.lock();
+ try {
+ return nonRunnableApps.size();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ boolean removeNonRunnableApp(FiCaSchedulerApp app) {
+ writeLock.lock();
+ try {
+ return nonRunnableApps.remove(app);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ List<FiCaSchedulerApp> getCopyOfNonRunnableAppSchedulables() {
+ List<FiCaSchedulerApp> appsToReturn = new ArrayList<>();
+ readLock.lock();
+ try {
+ appsToReturn.addAll(nonRunnableApps);
+ } finally {
+ readLock.unlock();
+ }
+ return appsToReturn;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 95f5468..bbb80ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -93,6 +93,8 @@ public class ParentQueue extends AbstractCSQueue {
private long lastSkipQueueDebugLoggingTimestamp = -1;
+ private int runnableApps;
+
public ParentQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
@@ -1383,4 +1385,32 @@ public class ParentQueue extends AbstractCSQueue {
public QueueOrderingPolicy getQueueOrderingPolicy() {
return queueOrderingPolicy;
}
+
+ @Override
+ int getNumRunnableApps() {
+ readLock.lock();
+ try {
+ return runnableApps;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ void incrementRunnableApps() {
+ writeLock.lock();
+ try {
+ runnableApps++;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ void decrementRunnableApps() {
+ writeLock.lock();
+ try {
+ runnableApps--;
+ } finally {
+ writeLock.unlock();
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 8f6fb63..cf6ffd9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -112,6 +112,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
private AbstractContainerAllocator containerAllocator;
+ private boolean runnable;
+
/**
* to hold the message if its app doesn't not get container from a node
*/
@@ -139,6 +141,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
RMContext rmContext, Priority appPriority, boolean isAttemptRecovering,
ActivitiesManager activitiesManager) {
super(applicationAttemptId, user, queue, abstractUsersManager, rmContext);
+ this.runnable = true;
RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
@@ -1219,4 +1222,22 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
writeLock.unlock();
}
}
+
+ public void setRunnable(boolean runnable) {
+ writeLock.lock();
+ try {
+ this.runnable = runnable;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public boolean isRunnable() {
+ readLock.lock();
+ try {
+ return runnable;
+ } finally {
+ readLock.unlock();
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationSystem.java
index ff5738c..389dd62 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationSystem.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSMaxRunningAppsEnforcer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
@@ -175,6 +176,9 @@ public class TestReservationSystem extends
CapacityScheduler cs = Mockito.spy(new CapacityScheduler());
cs.setConf(conf);
+ CSMaxRunningAppsEnforcer enforcer =
+ Mockito.mock(CSMaxRunningAppsEnforcer.class);
+ cs.setMaxRunningAppsEnforcer(enforcer);
mockRMContext = ReservationSystemTestUtil.createRMContext(conf);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index bad943c..93d8d5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -184,6 +184,7 @@ public class TestApplicationLimits {
doReturn(amResource).when(application).getAMResource(
CommonNodeLabelsManager.NO_LABEL);
when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))).thenCallRealMethod();
+ when(application.isRunnable()).thenReturn(true);
return application;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSMaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSMaxRunningAppsEnforcer.java
new file mode 100644
index 0000000..e3c05a1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSMaxRunningAppsEnforcer.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
+import org.apache.hadoop.yarn.util.ControlledClock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCSMaxRunningAppsEnforcer {
+ private CapacitySchedulerQueueManager queueManager;
+ private CSMaxRunningAppsEnforcer maxAppsEnforcer;
+ private int appNum;
+ private ControlledClock clock;
+ private RMContext rmContext;
+ private CapacityScheduler scheduler;
+ private ActivitiesManager activitiesManager;
+ private CapacitySchedulerConfiguration csConfig;
+
+ @Before
+ public void setup() throws IOException {
+ csConfig = new CapacitySchedulerConfiguration();
+ rmContext = mock(RMContext.class);
+ when(rmContext.getYarnConfiguration()).thenReturn(csConfig);
+ when(rmContext.getRMApps()).thenReturn(new ConcurrentHashMap<>());
+ clock = new ControlledClock();
+ scheduler = mock(CapacityScheduler.class);
+ when(rmContext.getScheduler()).thenReturn(scheduler);
+ when(scheduler.getConf()).thenReturn(csConfig);
+ when(scheduler.getConfig()).thenReturn(csConfig);
+ when(scheduler.getConfiguration()).thenReturn(csConfig);
+ when(scheduler.getResourceCalculator()).thenReturn(
+ new DefaultResourceCalculator());
+ when(scheduler.getRMContext()).thenReturn(rmContext);
+ when(scheduler.getClusterResource())
+ .thenReturn(Resource.newInstance(16384, 8));
+ when(scheduler.getMinimumAllocation())
+ .thenReturn(Resource.newInstance(1024, 1));
+ when(scheduler.getMinimumResourceCapability())
+ .thenReturn(Resource.newInstance(1024, 1));
+ activitiesManager = mock(ActivitiesManager.class);
+ maxAppsEnforcer = new CSMaxRunningAppsEnforcer(scheduler);
+ appNum = 0;
+ setupQueues(csConfig);
+ RMNodeLabelsManager labelManager = mock(RMNodeLabelsManager.class);
+ AppPriorityACLsManager appPriorityACLManager =
+ mock(AppPriorityACLsManager.class);
+ when(rmContext.getNodeLabelManager()).thenReturn(labelManager);
+ when(labelManager.getResourceByLabel(anyString(), any(Resource.class)))
+ .thenReturn(Resource.newInstance(16384, 8));
+ queueManager = new CapacitySchedulerQueueManager(csConfig, labelManager,
+ appPriorityACLManager);
+ queueManager.setCapacitySchedulerContext(scheduler);
+ queueManager.initializeQueues(csConfig);
+ }
+
+ private void setupQueues(CapacitySchedulerConfiguration config) {
+ config.setQueues(CapacitySchedulerConfiguration.ROOT,
+ new String[] {"queue1", "queue2"});
+ config.setQueues("root.queue1", new String[] {"subqueue1", "subqueue2"});
+ config.setQueues("root.queue1.subqueue1", new String[] {"leaf1"});
+ config.setQueues("root.queue1.subqueue2", new String[] {"leaf2"});
+ config.setFloat(PREFIX + "root.capacity", 100.0f);
+ config.setFloat(PREFIX + "root.queue1.capacity", 50.0f);
+ config.setFloat(PREFIX + "root.queue2.capacity", 50.0f);
+ config.setFloat(PREFIX + "root.queue1.subqueue1.capacity", 50.0f);
+ config.setFloat(PREFIX + "root.queue1.subqueue2.capacity", 50.0f);
+ config.setFloat(PREFIX + "root.queue1.subqueue1.leaf1.capacity", 100.0f);
+ config.setFloat(PREFIX + "root.queue1.subqueue2.leaf2.capacity", 100.0f);
+ }
+
+ private FiCaSchedulerApp addApp(LeafQueue queue, String user) {
+ ApplicationId appId = ApplicationId.newInstance(0, appNum++);
+ ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0);
+
+ FiCaSchedulerApp attempt = new FiCaSchedulerApp(attId,
+ user, queue, queue.getAbstractUsersManager(),
+ rmContext, Priority.newInstance(0), false,
+ activitiesManager) {
+
+ private final long startTime = clock.getTime();
+
+ @Override
+ public long getStartTime() {
+ return startTime;
+ }
+ };
+
+ maxAppsEnforcer.checkRunnabilityWithUpdate(attempt);
+ maxAppsEnforcer.trackApp(attempt);
+
+ queue.submitApplicationAttempt(attempt, attempt.getUser());
+
+ return attempt;
+ }
+
+ private void removeApp(FiCaSchedulerApp attempt) {
+ LeafQueue queue = attempt.getCSLeafQueue();
+ queue.finishApplicationAttempt(attempt, queue.getQueuePath());
+ maxAppsEnforcer.untrackApp(attempt);
+ maxAppsEnforcer.updateRunnabilityOnAppRemoval(attempt);
+ }
+
+ @Test
+ public void testRemoveDoesNotEnableAnyApp() {
+ ParentQueue root =
+ (ParentQueue) queueManager.getRootQueue();
+ LeafQueue leaf1 = (LeafQueue) queueManager
+ .getQueueByFullName("root.queue1.subqueue1.leaf1");
+ LeafQueue leaf2 = (LeafQueue) queueManager
+ .getQueueByFullName("root.queue1.subqueue2.leaf2");
+ root.setMaxParallelApps(2);
+ leaf1.setMaxParallelApps(1);
+ leaf2.setMaxParallelApps(1);
+
+ FiCaSchedulerApp app1 = addApp(leaf1, "user");
+ addApp(leaf2, "user");
+ addApp(leaf2, "user");
+ assertEquals(1, leaf1.getNumRunnableApps());
+ assertEquals(1, leaf2.getNumRunnableApps());
+ assertEquals(1, leaf2.getNumNonRunnableApps());
+
+ removeApp(app1);
+ assertEquals(0, leaf1.getNumRunnableApps());
+ assertEquals(1, leaf2.getNumRunnableApps());
+ assertEquals(1, leaf2.getNumNonRunnableApps());
+ }
+
+ @Test
+ public void testRemoveEnablesAppOnCousinQueue() {
+ LeafQueue leaf1 = (LeafQueue) queueManager
+ .getQueueByFullName("root.queue1.subqueue1.leaf1");
+ LeafQueue leaf2 = (LeafQueue) queueManager
+ .getQueueByFullName("root.queue1.subqueue2.leaf2");
+ ParentQueue queue1 = (ParentQueue) queueManager
+ .getQueueByFullName("root.queue1");
+ queue1.setMaxParallelApps(2);
+
+ FiCaSchedulerApp app1 = addApp(leaf1, "user");
+ addApp(leaf2, "user");
+ addApp(leaf2, "user");
+ assertEquals(1, leaf1.getNumRunnableApps());
+ assertEquals(1, leaf2.getNumRunnableApps());
+ assertEquals(1, leaf2.getNumNonRunnableApps());
+
+ removeApp(app1);
+ assertEquals(0, leaf1.getNumRunnableApps());
+ assertEquals(2, leaf2.getNumRunnableApps());
+ assertEquals(0, leaf2.getNumNonRunnableApps());
+ }
+
+ @Test
+ public void testRemoveEnablesOneByQueueOneByUser() {
+ LeafQueue leaf1 = (LeafQueue) queueManager
+ .getQueueByFullName("root.queue1.subqueue1.leaf1");
+ LeafQueue leaf2 = (LeafQueue) queueManager
+ .getQueueByFullName("root.queue1.subqueue2.leaf2");
+ leaf1.setMaxParallelApps(2);
+ //userMaxApps.put("user1", 1);
+ csConfig.setInt(PREFIX + "user.user1.max-parallel-apps", 1);
+
+ FiCaSchedulerApp app1 = addApp(leaf1, "user1");
+ addApp(leaf1, "user2");
+ addApp(leaf1, "user3");
+ addApp(leaf2, "user1");
+ assertEquals(2, leaf1.getNumRunnableApps());
+ assertEquals(1, leaf1.getNumNonRunnableApps());
+ assertEquals(1, leaf2.getNumNonRunnableApps());
+
+ removeApp(app1);
+ assertEquals(2, leaf1.getNumRunnableApps());
+ assertEquals(1, leaf2.getNumRunnableApps());
+ assertEquals(0, leaf1.getNumNonRunnableApps());
+ assertEquals(0, leaf2.getNumNonRunnableApps());
+ }
+
+ @Test
+ public void testRemoveEnablingOrderedByStartTime() {
+ LeafQueue leaf1 = (LeafQueue) queueManager
+ .getQueueByFullName("root.queue1.subqueue1.leaf1");
+ LeafQueue leaf2 = (LeafQueue) queueManager
+ .getQueueByFullName("root.queue1.subqueue2.leaf2");
+ ParentQueue queue1 = (ParentQueue) queueManager
+ .getQueueByFullName("root.queue1");
+ queue1.setMaxParallelApps(2);
+ FiCaSchedulerApp app1 = addApp(leaf1, "user");
+ addApp(leaf2, "user");
+ addApp(leaf2, "user");
+ clock.tickSec(20);
+ addApp(leaf1, "user");
+ assertEquals(1, leaf1.getNumRunnableApps());
+ assertEquals(1, leaf2.getNumRunnableApps());
+ assertEquals(1, leaf1.getNumNonRunnableApps());
+ assertEquals(1, leaf2.getNumNonRunnableApps());
+ removeApp(app1);
+ assertEquals(0, leaf1.getNumRunnableApps());
+ assertEquals(2, leaf2.getNumRunnableApps());
+ assertEquals(0, leaf2.getNumNonRunnableApps());
+ }
+
+ @Test
+ public void testMultipleAppsWaitingOnCousinQueue() {
+ LeafQueue leaf1 = (LeafQueue) queueManager
+ .getQueueByFullName("root.queue1.subqueue1.leaf1");
+ LeafQueue leaf2 = (LeafQueue) queueManager
+ .getQueueByFullName("root.queue1.subqueue2.leaf2");
+ ParentQueue queue1 = (ParentQueue) queueManager
+ .getQueueByFullName("root.queue1");
+ queue1.setMaxParallelApps(2);
+ FiCaSchedulerApp app1 = addApp(leaf1, "user");
+ addApp(leaf2, "user");
+ addApp(leaf2, "user");
+ addApp(leaf2, "user");
+ assertEquals(1, leaf1.getNumRunnableApps());
+ assertEquals(1, leaf2.getNumRunnableApps());
+ assertEquals(2, leaf2.getNumNonRunnableApps());
+ removeApp(app1);
+ assertEquals(0, leaf1.getNumRunnableApps());
+ assertEquals(2, leaf2.getNumRunnableApps());
+ assertEquals(1, leaf2.getNumNonRunnableApps());
+ }
+
+ @Test
+ public void testMultiListStartTimeIteratorEmptyAppLists() {
+ List<List<FiCaSchedulerApp>> lists =
+ new ArrayList<List<FiCaSchedulerApp>>();
+ lists.add(Arrays.asList(mockAppAttempt(1)));
+ lists.add(Arrays.asList(mockAppAttempt(2)));
+ Iterator<FiCaSchedulerApp> iter =
+ new CSMaxRunningAppsEnforcer.MultiListStartTimeIterator(lists);
+ assertEquals(1, iter.next().getStartTime());
+ assertEquals(2, iter.next().getStartTime());
+ }
+
+ private FiCaSchedulerApp mockAppAttempt(long startTime) {
+ FiCaSchedulerApp schedApp = mock(FiCaSchedulerApp.class);
+ when(schedApp.getStartTime()).thenReturn(startTime);
+ return schedApp;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMaxParallelApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMaxParallelApps.java
new file mode 100644
index 0000000..d2e3278
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMaxParallelApps.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class TestCapacitySchedulerMaxParallelApps {
+ private CapacitySchedulerConfiguration conf;
+ private MockRM rm;
+ private MockNM nm1;
+
+ private RMApp app1;
+ private MockAM am1;
+ private RMApp app2;
+ private MockAM am2;
+ private RMApp app3;
+ private RMAppAttempt attempt3;
+ private RMApp app4;
+ private RMAppAttempt attempt4;
+
+ private ParentQueue rootQueue;
+ private LeafQueue defaultQueue;
+
+ @Before
+ public void setUp() {
+ CapacitySchedulerConfiguration config =
+ new CapacitySchedulerConfiguration();
+ config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
+ DominantResourceCalculator.class.getName());
+
+ conf = new CapacitySchedulerConfiguration(config);
+ }
+
+ @After
+ public void after() {
+ if (rm != null) {
+ rm.stop();
+ }
+ }
+
+ @Test(timeout = 30000)
+ public void testMaxParallelAppsExceedsQueueSetting() throws Exception {
+ conf.setInt("yarn.scheduler.capacity.root.default.max-parallel-apps", 2);
+ executeCommonStepsAndChecks();
+ testWhenSettingsExceeded();
+ }
+
+ @Test(timeout = 30000)
+ public void testMaxParallelAppsExceedsDefaultQueueSetting()
+ throws Exception {
+ conf.setInt("yarn.scheduler.capacity.max-parallel-apps", 2);
+ executeCommonStepsAndChecks();
+ testWhenSettingsExceeded();
+ }
+
+ @Test(timeout = 30000)
+ public void testMaxParallelAppsExceedsUserSetting() throws Exception {
+ conf.setInt("yarn.scheduler.capacity.user.testuser.max-parallel-apps", 2);
+ executeCommonStepsAndChecks();
+ testWhenSettingsExceeded();
+ }
+
+ @Test(timeout = 30000)
+ public void testMaxParallelAppsExceedsDefaultUserSetting() throws Exception {
+ conf.setInt("yarn.scheduler.capacity.user.max-parallel-apps", 2);
+ executeCommonStepsAndChecks();
+ testWhenSettingsExceeded();
+ }
+
+ @Test(timeout = 30000)
+ public void testMaxParallelAppsWhenReloadingConfig() throws Exception {
+ conf.setInt("yarn.scheduler.capacity.root.default.max-parallel-apps", 2);
+
+ executeCommonStepsAndChecks();
+
+ RMContext rmContext = rm.getRMContext();
+ // Disable parallel apps setting + max out AM percent
+ conf.unset("yarn.scheduler.capacity.root.default.max-parallel-apps");
+ conf.setFloat(PREFIX + "maximum-am-resource-percent", 1.0f);
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ cs.reinitialize(conf, rmContext);
+
+ // Both app #3 and app #4 should transition to RUNNABLE
+ launchAMandWaitForRunning(app3, attempt3, nm1);
+ launchAMandWaitForRunning(app4, attempt4, nm1);
+ verifyRunningAndAcceptedApps(4, 0);
+ }
+
+ @Test(timeout = 30000)
+ public void testMaxAppsReachedWithNonRunnableApps() throws Exception {
+ conf.setInt("yarn.scheduler.capacity.root.default.max-parallel-apps", 2);
+ conf.setInt("yarn.scheduler.capacity.root.default.maximum-applications", 4);
+ executeCommonStepsAndChecks();
+
+ RMApp app5 = MockRMAppSubmitter.submit(rm,
+ MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
+ .withAppName("app5")
+ .withUser("testuser")
+ .withQueue("default")
+ .withWaitForAppAcceptedState(false)
+ .build());
+
+ rm.waitForState(app5.getApplicationId(), RMAppState.FAILED);
+ }
+
+ private void executeCommonStepsAndChecks() throws Exception {
+ rm = new MockRM(conf);
+ rm.start();
+
+ nm1 = rm.registerNode("h1:1234", 4096, 8);
+ rm.registerNode("h2:1234", 4096, 8);
+ rm.registerNode("h3:1234", 4096, 8);
+
+ rm.drainEvents();
+
+ app1 = MockRMAppSubmitter.submit(rm,
+ MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
+ .withAppName("app1")
+ .withUser("testuser")
+ .withQueue("default")
+ .build());
+
+ am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+ app2 = MockRMAppSubmitter.submit(rm,
+ MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
+ .withAppName("app2")
+ .withUser("testuser")
+ .withQueue("default")
+ .build());
+ am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
+
+ app3 = MockRMAppSubmitter.submit(rm,
+ MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
+ .withAppName("app3")
+ .withUser("testuser")
+ .withQueue("default")
+ .build());
+ attempt3 = MockRM.waitForAttemptScheduled(app3, rm);
+
+ app4 = MockRMAppSubmitter.submit(rm,
+ MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
+ .withAppName("app4")
+ .withUser("testuser")
+ .withQueue("default")
+ .build());
+ attempt4 = MockRM.waitForAttemptScheduled(app4, rm);
+
+ // Check that app attempt #3 and #4 are non-runnable
+ rootQueue = getRootQueue();
+ defaultQueue = getDefaultQueue();
+ Set<ApplicationAttemptId> nonRunnables =
+ Sets.newHashSet(
+ attempt3.getAppAttemptId(),
+ attempt4.getAppAttemptId());
+ verifyRunnableAppsInParent(rootQueue, 2);
+ verifyRunnableAppsInLeaf(defaultQueue, 2, nonRunnables);
+ verifyRunningAndAcceptedApps(2, 2);
+ }
+
+ private void testWhenSettingsExceeded() throws Exception {
+ // Stop app #1
+ unregisterAMandWaitForFinish(app1, am1, nm1);
+
+ // Launch app #3
+ launchAMandWaitForRunning(app3, attempt3, nm1);
+
+ // Check that attempt #4 is still non-runnable
+ verifyRunnableAppsInParent(rootQueue, 2);
+ verifyRunnableAppsInLeaf(defaultQueue, 2,
+ Collections.singleton(attempt4.getAppAttemptId()));
+ verifyRunningAndAcceptedApps(2, 1);
+
+ // Stop app #2
+ unregisterAMandWaitForFinish(app2, am2, nm1);
+
+ // Launch app #4
+ launchAMandWaitForRunning(app4, attempt4, nm1);
+ verifyRunnableAppsInParent(rootQueue, 2);
+ verifyRunnableAppsInLeaf(defaultQueue, 2,
+ Collections.emptySet());
+ verifyRunningAndAcceptedApps(2, 0);
+ }
+
+ @SuppressWarnings("checkstyle:hiddenfield")
+ private LeafQueue getDefaultQueue() {
+ CSQueue defaultQueue =
+ ((CapacityScheduler) rm.getResourceScheduler()).getQueue("default");
+
+ return (LeafQueue) defaultQueue;
+ }
+
+ private ParentQueue getRootQueue() {
+ CSQueue root =
+ ((CapacityScheduler) rm.getResourceScheduler()).getQueue("root");
+
+ return (ParentQueue) root;
+ }
+
+ private void verifyRunnableAppsInParent(ParentQueue queue,
+ int expectedRunnable) {
+ assertEquals("Num of runnable apps", expectedRunnable,
+ queue.getNumRunnableApps());
+ }
+
+ private void verifyRunnableAppsInLeaf(LeafQueue queue, int expectedRunnable,
+ Set<ApplicationAttemptId> nonRunnableIds) {
+ assertEquals("Num of runnable apps", expectedRunnable,
+ queue.getNumRunnableApps());
+
+ queue.getCopyOfNonRunnableAppSchedulables()
+ .stream()
+ .map(fca -> fca.getApplicationAttemptId())
+ .forEach(id -> assertTrue(id + " not found as non-runnable",
+ nonRunnableIds.contains(id)));
+ }
+
+ private void verifyRunningAndAcceptedApps(int expectedRunning,
+ int expectedAccepted) throws YarnException {
+ GetApplicationsRequest request = GetApplicationsRequest.newInstance();
+
+ GetApplicationsResponse resp =
+ rm.getClientRMService().getApplications(request);
+
+ List<ApplicationReport> apps = resp.getApplicationList();
+
+ long runningCount = apps
+ .stream()
+ .filter(report ->
+ report.getYarnApplicationState() == YarnApplicationState.RUNNING)
+ .count();
+
+ long acceptedCount = apps
+ .stream()
+ .filter(report ->
+ report.getYarnApplicationState() == YarnApplicationState.ACCEPTED)
+ .count();
+
+ assertEquals("Running apps count", expectedRunning, runningCount);
+ assertEquals("Accepted apps count", expectedAccepted, acceptedCount);
+ }
+
+ private void unregisterAMandWaitForFinish(RMApp app, MockAM am, MockNM nm)
+ throws Exception {
+ am.unregisterAppAttempt();
+ nm.nodeHeartbeat(app.getCurrentAppAttempt().getAppAttemptId(), 1,
+ ContainerState.COMPLETE);
+ rm.waitForState(app.getCurrentAppAttempt().getAppAttemptId(),
+ RMAppAttemptState.FINISHED);
+ }
+
+ @SuppressWarnings("rawtypes")
+ private MockAM launchAMandWaitForRunning(RMApp app, RMAppAttempt attempt,
+ MockNM nm) throws Exception {
+ nm.nodeHeartbeat(true);
+ ((AbstractYarnScheduler)rm.getResourceScheduler()).update();
+ rm.drainEvents();
+ nm.nodeHeartbeat(true);
+ MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+ am.registerAppAttempt();
+ rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+
+ return am;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 3353eac..f664e03 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -456,6 +456,8 @@ public class TestLeafQueue {
@Test
public void testAppAttemptMetrics() throws Exception {
+ CSMaxRunningAppsEnforcer enforcer = mock(CSMaxRunningAppsEnforcer.class);
+ cs.setMaxRunningAppsEnforcer(enforcer);
// Manipulate queue 'a'
LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
index 13957e9..aa3b591 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
@@ -218,6 +218,7 @@ public class TestQueueState {
CommonNodeLabelsManager.NO_LABEL);
when(application.compareInputOrderTo(any(FiCaSchedulerApp.class)))
.thenCallRealMethod();
+ when(application.isRunnable()).thenReturn(true);
return application;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java
index a4c1300..e893717 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java
@@ -157,6 +157,7 @@ public class TestQueueStateManager {
CommonNodeLabelsManager.NO_LABEL);
when(application.compareInputOrderTo(any(FiCaSchedulerApp.class)))
.thenCallRealMethod();
+ when(application.isRunnable()).thenReturn(true);
return application;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org