You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2020/06/19 12:50:44 UTC

[hadoop] branch trunk updated: YARN-9930. Support max running app logic for CapacityScheduler. Contributed by Peter Bacsko

This is an automated email from the ASF dual-hosted git repository.

snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4698414  YARN-9930. Support max running app logic for CapacityScheduler. Contributed by Peter Bacsko
4698414 is described below

commit 469841446f921f3da5bbd96cf83b3a808dde8084
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Fri Jun 19 14:50:24 2020 +0200

    YARN-9930. Support max running app logic for CapacityScheduler. Contributed by Peter Bacsko
---
 .../scheduler/capacity/AbstractCSQueue.java        |  16 +
 .../capacity/CSMaxRunningAppsEnforcer.java         | 436 +++++++++++++++++++++
 .../scheduler/capacity/CapacityScheduler.java      |  19 +-
 .../capacity/CapacitySchedulerConfiguration.java   |  35 +-
 .../scheduler/capacity/LeafQueue.java              |  81 +++-
 .../scheduler/capacity/ParentQueue.java            |  30 ++
 .../scheduler/common/fica/FiCaSchedulerApp.java    |  21 +
 .../reservation/TestReservationSystem.java         |   4 +
 .../scheduler/capacity/TestApplicationLimits.java  |   1 +
 .../capacity/TestCSMaxRunningAppsEnforcer.java     | 278 +++++++++++++
 .../TestCapacitySchedulerMaxParallelApps.java      | 312 +++++++++++++++
 .../scheduler/capacity/TestLeafQueue.java          |   2 +
 .../scheduler/capacity/TestQueueState.java         |   1 +
 .../scheduler/capacity/TestQueueStateManager.java  |   1 +
 14 files changed, 1229 insertions(+), 8 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 0a4a14f..968d971 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -146,6 +146,7 @@ public abstract class AbstractCSQueue implements CSQueue {
 
   volatile Priority priority = Priority.newInstance(0);
   private Map<String, Float> userWeights = new HashMap<String, Float>();
+  private int maxParallelApps;
 
   public AbstractCSQueue(CapacitySchedulerContext cs,
       String queueName, CSQueue parent, CSQueue old) throws IOException {
@@ -390,6 +391,11 @@ public abstract class AbstractCSQueue implements CSQueue {
       // and queue setting
       setupMaximumAllocation(configuration);
 
+      // Max parallel apps
+      int queueMaxParallelApps =
+          configuration.getMaxParallelAppsForQueue(getQueuePath());
+      setMaxParallelApps(queueMaxParallelApps);
+
       // initialized the queue state based on previous state, configured state
       // and its parent state.
       QueueState previous = getState();
@@ -1431,4 +1437,14 @@ public abstract class AbstractCSQueue implements CSQueue {
   public boolean getDefaultAppLifetimeWasSpecifiedInConfig() {
     return defaultAppLifetimeWasSpecifiedInConfig;
   }
+
+  public void setMaxParallelApps(int maxParallelApps) {
+    this.maxParallelApps = maxParallelApps;
+  }
+
+  public int getMaxParallelApps() {
+    return maxParallelApps;
+  }
+
+  abstract int getNumRunnableApps();
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSMaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSMaxRunningAppsEnforcer.java
new file mode 100644
index 0000000..d1a62b4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSMaxRunningAppsEnforcer.java
@@ -0,0 +1,436 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+
+/**
+ * Handles tracking and enforcement for user and queue maxRunningApps
+ * constraints.
+ */
+public class CSMaxRunningAppsEnforcer {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      CSMaxRunningAppsEnforcer.class);
+
+  private final CapacityScheduler scheduler;
+
+  // Tracks the number of running applications by user.
+  private final Map<String, Integer> usersNumRunnableApps;
+
+  private final ListMultimap<String, FiCaSchedulerApp> usersNonRunnableApps;
+
+  public CSMaxRunningAppsEnforcer(CapacityScheduler scheduler) {
+    this.scheduler = scheduler;
+    this.usersNumRunnableApps = new HashMap<String, Integer>();
+    this.usersNonRunnableApps = ArrayListMultimap.create();
+  }
+
+  /**
+   * Checks whether making the application runnable would exceed any
+   * maxRunningApps limits. Also sets the "runnable" flag on the
+   * attempt.
+   *
+   * @param attempt the app attempt being checked
+   * @return true if the application is runnable; false otherwise
+   */
+  public boolean checkRunnabilityWithUpdate(
+      FiCaSchedulerApp attempt) {
+    boolean attemptCanRun = !exceedUserMaxParallelApps(attempt.getUser())
+        && !exceedQueueMaxParallelApps(attempt.getCSLeafQueue());
+
+    attempt.setRunnable(attemptCanRun);
+
+    return attemptCanRun;
+  }
+
+  /**
+   * Checks whether the number of user runnable apps exceeds the limitation.
+   *
+   * @param user the user name
+   * @return true if the number hits the limit; false otherwise
+   */
+  private boolean exceedUserMaxParallelApps(String user) {
+    Integer userNumRunnable = usersNumRunnableApps.get(user);
+    if (userNumRunnable == null) {
+      userNumRunnable = 0;
+    }
+    if (userNumRunnable >= getUserMaxParallelApps(user)) {
+      LOG.info("Maximum runnable apps exceeded for user {}", user);
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * Recursively checks whether the number of queue runnable apps exceeds the
+   * limitation.
+   *
+   * @param queue the current queue
+   * @return true if the number hits the limit; false otherwise
+   */
+  private boolean exceedQueueMaxParallelApps(AbstractCSQueue queue) {
+    // Check queue and all parent queues
+    while (queue != null) {
+      if (queue.getNumRunnableApps() >= queue.getMaxParallelApps()) {
+        LOG.info("Maximum runnable apps exceeded for queue {}",
+            queue.getQueuePath());
+        return true;
+      }
+      queue = (AbstractCSQueue) queue.getParent();
+    }
+
+    return false;
+  }
+
+  public void trackApp(FiCaSchedulerApp app) {
+    if (app.isRunnable()) {
+      trackRunnableApp(app);
+    } else {
+      trackNonRunnableApp(app);
+    }
+  }
+  /**
+   * Tracks the given new runnable app for purposes of maintaining max running
+   * app limits.
+   */
+  private void trackRunnableApp(FiCaSchedulerApp app) {
+    String user = app.getUser();
+    AbstractCSQueue queue = (AbstractCSQueue) app.getQueue();
+    // Increment running counts for all parent queues
+    ParentQueue parent = (ParentQueue) queue.getParent();
+    while (parent != null) {
+      parent.incrementRunnableApps();
+      parent = (ParentQueue) parent.getParent();
+    }
+
+    Integer userNumRunnable = usersNumRunnableApps.get(user);
+    usersNumRunnableApps.put(user, (userNumRunnable == null ? 0
+        : userNumRunnable) + 1);
+  }
+
+  /**
+   * Tracks the given new non runnable app so that it can be made runnable when
+   * it would not violate max running app limits.
+   */
+  private void trackNonRunnableApp(FiCaSchedulerApp app) {
+    String user = app.getUser();
+    usersNonRunnableApps.put(user, app);
+  }
+
+  /**
+   * This is called after reloading the allocation configuration when the
+   * scheduler is reinitialized
+   *
+   * Checks to see whether any non-runnable applications become runnable
+   * now that the max running apps of given queue has been changed
+   *
+   * Runs in O(n) where n is the number of apps that are non-runnable and in
+   * the queues that went from having no slack to having slack.
+   */
+
+  public void updateRunnabilityOnReload() {
+    ParentQueue rootQueue = (ParentQueue) scheduler.getRootQueue();
+    List<List<FiCaSchedulerApp>> appsNowMaybeRunnable =
+        new ArrayList<List<FiCaSchedulerApp>>();
+
+    gatherPossiblyRunnableAppLists(rootQueue, appsNowMaybeRunnable);
+
+    updateAppsRunnability(appsNowMaybeRunnable, Integer.MAX_VALUE);
+  }
+
+  /**
+   * Checks to see whether any other applications runnable now that the given
+   * application has been removed from the given queue.  And makes them so.
+   *
+   * Runs in O(n log(n)) where n is the number of queues that are under the
+   * highest queue that went from having no slack to having slack.
+   */
+  public void updateRunnabilityOnAppRemoval(FiCaSchedulerApp app) {
+    // childqueueX might have no pending apps itself, but if a queue higher up
+    // in the hierarchy parentqueueY has a maxRunningApps set, an app completion
+    // in childqueueX could allow an app in some other distant child of
+    // parentqueueY to become runnable.
+    // An app removal will only possibly allow another app to become runnable if
+    // the queue was already at its max before the removal.
+    // Thus we find the ancestor queue highest in the tree for which the app
+    // that was at its maxRunningApps before the removal.
+    LeafQueue queue = app.getCSLeafQueue();
+    AbstractCSQueue highestQueueWithAppsNowRunnable =
+        (queue.getNumRunnableApps() == queue.getMaxParallelApps() - 1)
+        ? queue : null;
+
+    ParentQueue parent = (ParentQueue) queue.getParent();
+    while (parent != null) {
+      if (parent.getNumRunnableApps() == parent.getMaxParallelApps() - 1) {
+        highestQueueWithAppsNowRunnable = parent;
+      }
+      parent = (ParentQueue) parent.getParent();
+    }
+
+    List<List<FiCaSchedulerApp>> appsNowMaybeRunnable =
+        new ArrayList<List<FiCaSchedulerApp>>();
+
+    // Compile lists of apps which may now be runnable
+    // We gather lists instead of building a set of all non-runnable apps so
+    // that this whole operation can be O(number of queues) instead of
+    // O(number of apps)
+    if (highestQueueWithAppsNowRunnable != null) {
+      gatherPossiblyRunnableAppLists(highestQueueWithAppsNowRunnable,
+          appsNowMaybeRunnable);
+    }
+    String user = app.getUser();
+    Integer userNumRunning = usersNumRunnableApps.get(user);
+    if (userNumRunning == null) {
+      userNumRunning = 0;
+    }
+    if (userNumRunning == getUserMaxParallelApps(user) - 1) {
+      List<FiCaSchedulerApp> userWaitingApps = usersNonRunnableApps.get(user);
+      if (userWaitingApps != null) {
+        appsNowMaybeRunnable.add(userWaitingApps);
+      }
+    }
+
+    updateAppsRunnability(appsNowMaybeRunnable,
+        appsNowMaybeRunnable.size());
+  }
+
+  /**
+   * Checks to see whether applications are runnable now by iterating
+   * through each one of them and check if the queue and user have slack.
+   *
+   * if we know how many apps can be runnable, there is no need to iterate
+   * through all apps, maxRunnableApps is used to break out of the iteration.
+   */
+  private void updateAppsRunnability(List<List<FiCaSchedulerApp>>
+      appsNowMaybeRunnable, int maxRunnableApps) {
+    // Scan through and check whether this means that any apps are now runnable
+    Iterator<FiCaSchedulerApp> iter = new MultiListStartTimeIterator(
+        appsNowMaybeRunnable);
+    FiCaSchedulerApp prev = null;
+    List<FiCaSchedulerApp> noLongerPendingApps = new ArrayList<>();
+    while (iter.hasNext()) {
+      FiCaSchedulerApp next = iter.next();
+      if (next == prev) {
+        continue;
+      }
+
+      if (checkRunnabilityWithUpdate(next)) {
+        LeafQueue nextQueue = next.getCSLeafQueue();
+        LOG.info("{} is now runnable in {}",
+            next.getApplicationAttemptId(), nextQueue);
+        trackRunnableApp(next);
+        FiCaSchedulerApp appSched = next;
+        nextQueue.submitApplicationAttempt(next, next.getUser());
+        noLongerPendingApps.add(appSched);
+
+        if (noLongerPendingApps.size() >= maxRunnableApps) {
+          break;
+        }
+      }
+
+      prev = next;
+    }
+
+    // We remove the apps from their pending lists afterwards so that we don't
+    // pull them out from under the iterator.  If they are not in these lists
+    // in the first place, there is a bug.
+    for (FiCaSchedulerApp appSched : noLongerPendingApps) {
+      if (!(appSched.getCSLeafQueue().removeNonRunnableApp(appSched))) {
+        LOG.error("Can't make app runnable that does not already exist in queue"
+            + " as non-runnable: {}. This should never happen.",
+            appSched.getApplicationAttemptId());
+      }
+
+      if (!usersNonRunnableApps.remove(appSched.getUser(), appSched)) {
+        LOG.error("Waiting app {} expected to be in "
+            + "usersNonRunnableApps, but was not. This should never happen.",
+            appSched.getApplicationAttemptId());
+      }
+    }
+  }
+
+  public void untrackApp(FiCaSchedulerApp app) {
+    if (app.isRunnable()) {
+      untrackRunnableApp(app);
+    } else {
+      untrackNonRunnableApp(app);
+    }
+  }
+
+  /**
+   * Updates the relevant tracking variables after a runnable app with the given
+   * queue and user has been removed.
+   */
+  private void untrackRunnableApp(FiCaSchedulerApp app) {
+    // Update usersRunnableApps
+    String user = app.getUser();
+    int newUserNumRunning = usersNumRunnableApps.get(user) - 1;
+    if (newUserNumRunning == 0) {
+      usersNumRunnableApps.remove(user);
+    } else {
+      usersNumRunnableApps.put(user, newUserNumRunning);
+    }
+
+    // Update runnable app bookkeeping for queues
+    AbstractCSQueue queue = (AbstractCSQueue) app.getQueue();
+    ParentQueue parent = (ParentQueue) queue.getParent();
+    while (parent != null) {
+      parent.decrementRunnableApps();
+      parent = (ParentQueue) parent.getParent();
+    }
+  }
+
+  /**
+   * Stops tracking the given non-runnable app.
+   */
+  private void untrackNonRunnableApp(FiCaSchedulerApp app) {
+    usersNonRunnableApps.remove(app.getUser(), app);
+  }
+
+  /**
+   * Traverses the queue hierarchy under the given queue to gather all lists
+   * of non-runnable applications.
+   */
+  private void gatherPossiblyRunnableAppLists(AbstractCSQueue queue,
+      List<List<FiCaSchedulerApp>> appLists) {
+    if (queue.getNumRunnableApps() < queue.getMaxParallelApps()) {
+      if (queue instanceof LeafQueue) {
+        appLists.add(
+            ((LeafQueue)queue).getCopyOfNonRunnableAppSchedulables());
+      } else {
+        for (CSQueue child : queue.getChildQueues()) {
+          gatherPossiblyRunnableAppLists((AbstractCSQueue) child, appLists);
+        }
+      }
+    }
+  }
+
+  private int getUserMaxParallelApps(String user) {
+    CapacitySchedulerConfiguration conf = scheduler.getConfiguration();
+    if (conf == null) {
+      return Integer.MAX_VALUE;
+    }
+
+    int userMaxParallelApps = conf.getMaxParallelAppsForUser(user);
+
+    return userMaxParallelApps;
+  }
+
+  /**
+   * Takes a list of lists, each of which is ordered by start time, and returns
+   * their elements in order of start time.
+   *
+   * We maintain positions in each of the lists.  Each next() call advances
+   * the position in one of the lists.  We maintain a heap that orders lists
+   * by the start time of the app in the current position in that list.
+   * This allows us to pick which list to advance in O(log(num lists)) instead
+   * of O(num lists) time.
+   */
+  static class MultiListStartTimeIterator implements
+      Iterator<FiCaSchedulerApp> {
+
+    private List<FiCaSchedulerApp>[] appLists;
+    private int[] curPositionsInAppLists;
+    private PriorityQueue<IndexAndTime> appListsByCurStartTime;
+
+    @SuppressWarnings("unchecked")
+    MultiListStartTimeIterator(List<List<FiCaSchedulerApp>> appListList) {
+      appLists = appListList.toArray(new List[appListList.size()]);
+      curPositionsInAppLists = new int[appLists.length];
+      appListsByCurStartTime = new PriorityQueue<IndexAndTime>();
+      for (int i = 0; i < appLists.length; i++) {
+        long time = appLists[i].isEmpty() ? Long.MAX_VALUE : appLists[i].get(0)
+            .getStartTime();
+        appListsByCurStartTime.add(new IndexAndTime(i, time));
+      }
+    }
+
+    @Override
+    public boolean hasNext() {
+      return !appListsByCurStartTime.isEmpty()
+          && appListsByCurStartTime.peek().time != Long.MAX_VALUE;
+    }
+
+    @Override
+    public FiCaSchedulerApp next() {
+      IndexAndTime indexAndTime = appListsByCurStartTime.remove();
+      int nextListIndex = indexAndTime.index;
+      FiCaSchedulerApp next = appLists[nextListIndex]
+          .get(curPositionsInAppLists[nextListIndex]);
+      curPositionsInAppLists[nextListIndex]++;
+
+      if (curPositionsInAppLists[nextListIndex] <
+          appLists[nextListIndex].size()) {
+        indexAndTime.time = appLists[nextListIndex]
+            .get(curPositionsInAppLists[nextListIndex]).getStartTime();
+      } else {
+        indexAndTime.time = Long.MAX_VALUE;
+      }
+      appListsByCurStartTime.add(indexAndTime);
+
+      return next;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("Remove not supported");
+    }
+
+    private static class IndexAndTime implements Comparable<IndexAndTime> {
+      private int index;
+      private long time;
+
+      IndexAndTime(int index, long time) {
+        this.index = index;
+        this.time = time;
+      }
+
+      @Override
+      public int compareTo(IndexAndTime o) {
+        return time < o.time ? -1 : (time > o.time ? 1 : 0);
+      }
+
+      @Override
+      public boolean equals(Object o) {
+        if (!(o instanceof IndexAndTime)) {
+          return false;
+        }
+        IndexAndTime other = (IndexAndTime)o;
+        return other.time == time;
+      }
+
+      @Override
+      public int hashCode() {
+        return (int)time;
+      }
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index a6aa824..bd2acd7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -242,8 +242,11 @@ public class CapacityScheduler extends
   private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
   private long asyncMaxPendingBacklogs;
 
+  private CSMaxRunningAppsEnforcer maxRunningEnforcer;
+
   public CapacityScheduler() {
     super(CapacityScheduler.class.getName());
+    this.maxRunningEnforcer = new CSMaxRunningAppsEnforcer(this);
   }
 
   @Override
@@ -483,6 +486,7 @@ public class CapacityScheduler extends
 
         super.reinitialize(newConf, rmContext);
       }
+      maxRunningEnforcer.updateRunnabilityOnReload();
     } finally {
       writeLock.unlock();
     }
@@ -1083,6 +1087,9 @@ public class CapacityScheduler extends
       // SchedulerApplication#setCurrentAppAttempt.
       attempt.setPriority(application.getPriority());
 
+      maxRunningEnforcer.checkRunnabilityWithUpdate(attempt);
+      maxRunningEnforcer.trackApp(attempt);
+
       queue.submitApplicationAttempt(attempt, application.getUser());
       LOG.info("Added Application Attempt " + applicationAttemptId
           + " to scheduler from user " + application.getUser() + " in queue "
@@ -1176,8 +1183,13 @@ public class CapacityScheduler extends
         LOG.error(
             "Cannot finish application " + "from non-leaf queue: "
             + csQueue.getQueuePath());
-      } else{
+      } else {
         csQueue.finishApplicationAttempt(attempt, csQueue.getQueuePath());
+
+        maxRunningEnforcer.untrackApp(attempt);
+        if (attempt.isRunnable()) {
+          maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt);
+        }
       }
     } finally {
       writeLock.unlock();
@@ -3253,4 +3265,9 @@ public class CapacityScheduler extends
   public int getNumAsyncSchedulerThreads() {
     return asyncSchedulerThreads == null ? 0 : asyncSchedulerThreads.size();
   }
+
+  @VisibleForTesting
+  public void setMaxRunningAppsEnforcer(CSMaxRunningAppsEnforcer enforcer) {
+    this.maxRunningEnforcer = enforcer;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 496dd0b..3bebb44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -378,6 +378,10 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
 
   public static final Pattern RESOURCE_PATTERN = Pattern.compile(PATTERN_FOR_ABSOLUTE_RESOURCE);
 
+  public static final String MAX_PARALLEL_APPLICATIONS = "max-parallel-apps";
+
+  public static final int DEFAULT_MAX_PARALLEL_APPLICATIONS = Integer.MAX_VALUE;
+
   /**
    * Different resource types supported.
    */
@@ -412,7 +416,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
     String queueName = PREFIX + queue + DOT + ORDERING_POLICY + DOT;
     return queueName;
   }
-  
+
+  static String getUserPrefix(String user) {
+    return PREFIX + "user." + user + DOT;
+  }
+
   private String getNodeLabelPrefix(String queue, String label) {
     if (label.equals(CommonNodeLabelsManager.NO_LABEL)) {
       return getQueuePrefix(queue);
@@ -1392,6 +1400,31 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
     return conf.getBoolean(APP_FAIL_FAST, DEFAULT_APP_FAIL_FAST);
   }
 
+  public Integer getMaxParallelAppsForQueue(String queue) {
+    int defaultMaxParallelAppsForQueue =
+        getInt(PREFIX + MAX_PARALLEL_APPLICATIONS,
+        DEFAULT_MAX_PARALLEL_APPLICATIONS);
+
+    String maxParallelAppsForQueue = get(getQueuePrefix(queue)
+        + MAX_PARALLEL_APPLICATIONS);
+
+    return (maxParallelAppsForQueue != null) ?
+        Integer.parseInt(maxParallelAppsForQueue)
+        : defaultMaxParallelAppsForQueue;
+  }
+
+  public Integer getMaxParallelAppsForUser(String user) {
+    int defaultMaxParallelAppsForUser =
+        getInt(PREFIX + "user." + MAX_PARALLEL_APPLICATIONS,
+        DEFAULT_MAX_PARALLEL_APPLICATIONS);
+    String maxParallelAppsForUser = get(getUserPrefix(user)
+        + MAX_PARALLEL_APPLICATIONS);
+
+    return (maxParallelAppsForUser != null) ?
+        Integer.parseInt(maxParallelAppsForUser)
+        : defaultMaxParallelAppsForUser;
+  }
+
   private static final String PREEMPTION_CONFIG_PREFIX =
       "yarn.resourcemanager.monitor.capacity.preemption.";
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 9f0caf2..4d83538 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -129,6 +129,9 @@ public class LeafQueue extends AbstractCSQueue {
   List<AppPriorityACLGroup> priorityAcls =
       new ArrayList<AppPriorityACLGroup>();
 
+  private final List<FiCaSchedulerApp> runnableApps = new ArrayList<>();
+  private final List<FiCaSchedulerApp> nonRunnableApps = new ArrayList<>();
+
   @SuppressWarnings({ "unchecked", "rawtypes" })
   public LeafQueue(CapacitySchedulerContext cs,
       String queueName, CSQueue parent, CSQueue old) throws IOException {
@@ -159,6 +162,7 @@ public class LeafQueue extends AbstractCSQueue {
     setupQueueConfigs(clusterResource, csContext.getConfiguration());
   }
 
+  @SuppressWarnings("checkstyle:nowhitespaceafter")
   protected void setupQueueConfigs(Resource clusterResource,
       CapacitySchedulerConfiguration conf) throws
       IOException {
@@ -289,7 +293,9 @@ public class LeafQueue extends AbstractCSQueue {
               + " (int)(configuredMaximumSystemApplications * absoluteCapacity)]"
               + "\n" + "maxApplicationsPerUser = " + maxApplicationsPerUser
               + " [= (int)(maxApplications * (userLimit / 100.0f) * "
-              + "userLimitFactor) ]" + "\n" + "usedCapacity = "
+              + "userLimitFactor) ]" + "\n"
+              + "maxParallelApps = " + getMaxParallelApps() + "\n"
+              + "usedCapacity = " +
               + queueCapacities.getUsedCapacity() + " [= usedResourcesMemory / "
               + "(clusterResourceMemory * absoluteCapacity)]" + "\n"
               + "absoluteUsedCapacity = " + absoluteUsedCapacity
@@ -386,7 +392,8 @@ public class LeafQueue extends AbstractCSQueue {
   public int getNumApplications() {
     readLock.lock();
     try {
-      return getNumPendingApplications() + getNumActiveApplications();
+      return getNumPendingApplications() + getNumActiveApplications() +
+          getNumNonRunnableApps();
     } finally {
       readLock.unlock();
     }
@@ -887,16 +894,28 @@ public class LeafQueue extends AbstractCSQueue {
       writeLock.unlock();
     }
   }
-  
+
   private void addApplicationAttempt(FiCaSchedulerApp application,
       User user) {
     writeLock.lock();
     try {
+      applicationAttemptMap.put(application.getApplicationAttemptId(),
+          application);
+
+      if (application.isRunnable()) {
+        runnableApps.add(application);
+        LOG.debug("Adding runnable application: {}",
+            application.getApplicationAttemptId());
+      } else {
+        nonRunnableApps.add(application);
+        LOG.info("Application attempt {} is not runnable,"
+            + " parallel limit reached", application.getApplicationAttemptId());
+        return;
+      }
+
       // Accept
       user.submitApplication();
       getPendingAppsOrderingPolicy().addSchedulableEntity(application);
-      applicationAttemptMap.put(application.getApplicationAttemptId(),
-          application);
 
       // Activate applications
       if (Resources.greaterThan(resourceCalculator, lastClusterResource,
@@ -917,7 +936,9 @@ public class LeafQueue extends AbstractCSQueue {
               .getPendingApplications() + " #user-active-applications: " + user
               .getActiveApplications() + " #queue-pending-applications: "
               + getNumPendingApplications() + " #queue-active-applications: "
-              + getNumActiveApplications());
+              + getNumActiveApplications()
+              + " #queue-nonrunnable-applications: "
+              + getNumNonRunnableApps());
     } finally {
       writeLock.unlock();
     }
@@ -950,6 +971,15 @@ public class LeafQueue extends AbstractCSQueue {
       // which is caused by wrong invoking order, will fix UT separately
       User user = usersManager.getUserAndAddIfAbsent(userName);
 
+      boolean runnable = runnableApps.remove(application);
+      if (!runnable) {
+        // removeNonRunnableApp acquires the write lock again, which is fine
+        if (!removeNonRunnableApp(application)) {
+          LOG.error("Given app to remove " + application +
+              " does not exist in queue " + getQueuePath());
+        }
+      }
+
       String partitionName = application.getAppAMNodePartitionName();
       boolean wasActive = orderingPolicy.removeSchedulableEntity(application);
       if (!wasActive) {
@@ -2229,4 +2259,43 @@ public class LeafQueue extends AbstractCSQueue {
         usedSeconds);
     metrics.updatePreemptedForCustomResources(containerResource);
   }
+
+  @Override
+  int getNumRunnableApps() {
+    readLock.lock();
+    try {
+      return runnableApps.size();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  int getNumNonRunnableApps() {
+    readLock.lock();
+    try {
+      return nonRunnableApps.size();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  boolean removeNonRunnableApp(FiCaSchedulerApp app) {
+    writeLock.lock();
+    try {
+      return nonRunnableApps.remove(app);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  List<FiCaSchedulerApp> getCopyOfNonRunnableAppSchedulables() {
+    List<FiCaSchedulerApp> appsToReturn = new ArrayList<>();
+    readLock.lock();
+    try {
+      appsToReturn.addAll(nonRunnableApps);
+    } finally {
+      readLock.unlock();
+    }
+    return appsToReturn;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 95f5468..bbb80ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -93,6 +93,8 @@ public class ParentQueue extends AbstractCSQueue {
 
   private long lastSkipQueueDebugLoggingTimestamp = -1;
 
+  private int runnableApps;
+
   public ParentQueue(CapacitySchedulerContext cs,
       String queueName, CSQueue parent, CSQueue old) throws IOException {
     super(cs, queueName, parent, old);
@@ -1383,4 +1385,32 @@ public class ParentQueue extends AbstractCSQueue {
   public QueueOrderingPolicy getQueueOrderingPolicy() {
     return queueOrderingPolicy;
   }
+
+  @Override
+  int getNumRunnableApps() {
+    readLock.lock();
+    try {
+      return runnableApps;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  void incrementRunnableApps() {
+    writeLock.lock();
+    try {
+      runnableApps++;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  void decrementRunnableApps() {
+    writeLock.lock();
+    try {
+      runnableApps--;
+    } finally {
+      writeLock.unlock();
+    }
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 8f6fb63..cf6ffd9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -112,6 +112,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
   private AbstractContainerAllocator containerAllocator;
 
+  private boolean runnable;
+
   /**
    * to hold the message if its app doesn't not get container from a node
    */
@@ -139,6 +141,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       RMContext rmContext, Priority appPriority, boolean isAttemptRecovering,
       ActivitiesManager activitiesManager) {
     super(applicationAttemptId, user, queue, abstractUsersManager, rmContext);
+    this.runnable = true;
 
     RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
 
@@ -1219,4 +1222,22 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       writeLock.unlock();
     }
   }
+
+  public void setRunnable(boolean runnable) {
+    writeLock.lock();
+    try {
+      this.runnable = runnable;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public boolean isRunnable() {
+    readLock.lock();
+    try {
+      return runnable;
+    } finally {
+      readLock.unlock();
+    }
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationSystem.java
index ff5738c..389dd62 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationSystem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationSystem.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSMaxRunningAppsEnforcer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
@@ -175,6 +176,9 @@ public class TestReservationSystem extends
 
     CapacityScheduler cs = Mockito.spy(new CapacityScheduler());
     cs.setConf(conf);
+    CSMaxRunningAppsEnforcer enforcer =
+        Mockito.mock(CSMaxRunningAppsEnforcer.class);
+    cs.setMaxRunningAppsEnforcer(enforcer);
 
     mockRMContext = ReservationSystemTestUtil.createRMContext(conf);
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index bad943c..93d8d5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -184,6 +184,7 @@ public class TestApplicationLimits {
     doReturn(amResource).when(application).getAMResource(
         CommonNodeLabelsManager.NO_LABEL);
     when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))).thenCallRealMethod();
+    when(application.isRunnable()).thenReturn(true);
     return application;
   }
   
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSMaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSMaxRunningAppsEnforcer.java
new file mode 100644
index 0000000..e3c05a1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSMaxRunningAppsEnforcer.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
+import org.apache.hadoop.yarn.util.ControlledClock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCSMaxRunningAppsEnforcer {
+  private CapacitySchedulerQueueManager queueManager;
+  private CSMaxRunningAppsEnforcer maxAppsEnforcer;
+  private int appNum;
+  private ControlledClock clock;
+  private RMContext rmContext;
+  private CapacityScheduler scheduler;
+  private ActivitiesManager activitiesManager;
+  private CapacitySchedulerConfiguration csConfig;
+
+  @Before
+  public void setup() throws IOException {
+    csConfig = new CapacitySchedulerConfiguration();
+    rmContext = mock(RMContext.class);
+    when(rmContext.getYarnConfiguration()).thenReturn(csConfig);
+    when(rmContext.getRMApps()).thenReturn(new ConcurrentHashMap<>());
+    clock = new ControlledClock();
+    scheduler = mock(CapacityScheduler.class);
+    when(rmContext.getScheduler()).thenReturn(scheduler);
+    when(scheduler.getConf()).thenReturn(csConfig);
+    when(scheduler.getConfig()).thenReturn(csConfig);
+    when(scheduler.getConfiguration()).thenReturn(csConfig);
+    when(scheduler.getResourceCalculator()).thenReturn(
+        new DefaultResourceCalculator());
+    when(scheduler.getRMContext()).thenReturn(rmContext);
+    when(scheduler.getClusterResource())
+        .thenReturn(Resource.newInstance(16384, 8));
+    when(scheduler.getMinimumAllocation())
+        .thenReturn(Resource.newInstance(1024, 1));
+    when(scheduler.getMinimumResourceCapability())
+        .thenReturn(Resource.newInstance(1024, 1));
+    activitiesManager = mock(ActivitiesManager.class);
+    maxAppsEnforcer = new CSMaxRunningAppsEnforcer(scheduler);
+    appNum = 0;
+    setupQueues(csConfig);
+    RMNodeLabelsManager labelManager = mock(RMNodeLabelsManager.class);
+    AppPriorityACLsManager appPriorityACLManager =
+        mock(AppPriorityACLsManager.class);
+    when(rmContext.getNodeLabelManager()).thenReturn(labelManager);
+    when(labelManager.getResourceByLabel(anyString(), any(Resource.class)))
+        .thenReturn(Resource.newInstance(16384, 8));
+    queueManager = new CapacitySchedulerQueueManager(csConfig, labelManager,
+        appPriorityACLManager);
+    queueManager.setCapacitySchedulerContext(scheduler);
+    queueManager.initializeQueues(csConfig);
+  }
+
+  private void setupQueues(CapacitySchedulerConfiguration config) {
+    config.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[] {"queue1", "queue2"});
+    config.setQueues("root.queue1", new String[] {"subqueue1", "subqueue2"});
+    config.setQueues("root.queue1.subqueue1", new String[] {"leaf1"});
+    config.setQueues("root.queue1.subqueue2", new String[] {"leaf2"});
+    config.setFloat(PREFIX + "root.capacity", 100.0f);
+    config.setFloat(PREFIX + "root.queue1.capacity", 50.0f);
+    config.setFloat(PREFIX + "root.queue2.capacity", 50.0f);
+    config.setFloat(PREFIX + "root.queue1.subqueue1.capacity", 50.0f);
+    config.setFloat(PREFIX + "root.queue1.subqueue2.capacity", 50.0f);
+    config.setFloat(PREFIX + "root.queue1.subqueue1.leaf1.capacity", 100.0f);
+    config.setFloat(PREFIX + "root.queue1.subqueue2.leaf2.capacity", 100.0f);
+  }
+
+  private FiCaSchedulerApp addApp(LeafQueue queue, String user) {
+    ApplicationId appId = ApplicationId.newInstance(0, appNum++);
+    ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0);
+
+    FiCaSchedulerApp attempt = new FiCaSchedulerApp(attId,
+        user, queue, queue.getAbstractUsersManager(),
+        rmContext, Priority.newInstance(0), false,
+        activitiesManager) {
+
+      private final long startTime = clock.getTime();
+
+      @Override
+      public long getStartTime() {
+        return startTime;
+      }
+    };
+
+    maxAppsEnforcer.checkRunnabilityWithUpdate(attempt);
+    maxAppsEnforcer.trackApp(attempt);
+
+    queue.submitApplicationAttempt(attempt, attempt.getUser());
+
+    return attempt;
+  }
+
+  private void removeApp(FiCaSchedulerApp attempt) {
+    LeafQueue queue = attempt.getCSLeafQueue();
+    queue.finishApplicationAttempt(attempt, queue.getQueuePath());
+    maxAppsEnforcer.untrackApp(attempt);
+    maxAppsEnforcer.updateRunnabilityOnAppRemoval(attempt);
+  }
+
+  @Test
+  public void testRemoveDoesNotEnableAnyApp() {
+    ParentQueue root =
+        (ParentQueue) queueManager.getRootQueue();
+    LeafQueue leaf1 = (LeafQueue) queueManager
+        .getQueueByFullName("root.queue1.subqueue1.leaf1");
+    LeafQueue leaf2 = (LeafQueue) queueManager
+        .getQueueByFullName("root.queue1.subqueue2.leaf2");
+    root.setMaxParallelApps(2);
+    leaf1.setMaxParallelApps(1);
+    leaf2.setMaxParallelApps(1);
+
+    FiCaSchedulerApp app1 = addApp(leaf1, "user");
+    addApp(leaf2, "user");
+    addApp(leaf2, "user");
+    assertEquals(1, leaf1.getNumRunnableApps());
+    assertEquals(1, leaf2.getNumRunnableApps());
+    assertEquals(1, leaf2.getNumNonRunnableApps());
+
+    removeApp(app1);
+    assertEquals(0, leaf1.getNumRunnableApps());
+    assertEquals(1, leaf2.getNumRunnableApps());
+    assertEquals(1, leaf2.getNumNonRunnableApps());
+  }
+
+  @Test
+  public void testRemoveEnablesAppOnCousinQueue() {
+    LeafQueue leaf1 = (LeafQueue) queueManager
+        .getQueueByFullName("root.queue1.subqueue1.leaf1");
+    LeafQueue leaf2 = (LeafQueue) queueManager
+        .getQueueByFullName("root.queue1.subqueue2.leaf2");
+    ParentQueue queue1 = (ParentQueue) queueManager
+        .getQueueByFullName("root.queue1");
+    queue1.setMaxParallelApps(2);
+
+    FiCaSchedulerApp app1 = addApp(leaf1, "user");
+    addApp(leaf2, "user");
+    addApp(leaf2, "user");
+    assertEquals(1, leaf1.getNumRunnableApps());
+    assertEquals(1, leaf2.getNumRunnableApps());
+    assertEquals(1, leaf2.getNumNonRunnableApps());
+
+    removeApp(app1);
+    assertEquals(0, leaf1.getNumRunnableApps());
+    assertEquals(2, leaf2.getNumRunnableApps());
+    assertEquals(0, leaf2.getNumNonRunnableApps());
+  }
+
+  @Test
+  public void testRemoveEnablesOneByQueueOneByUser() {
+    LeafQueue leaf1 = (LeafQueue) queueManager
+        .getQueueByFullName("root.queue1.subqueue1.leaf1");
+    LeafQueue leaf2 = (LeafQueue) queueManager
+        .getQueueByFullName("root.queue1.subqueue2.leaf2");
+    leaf1.setMaxParallelApps(2);
+    //userMaxApps.put("user1", 1);
+    csConfig.setInt(PREFIX + "user.user1.max-parallel-apps", 1);
+
+    FiCaSchedulerApp app1 = addApp(leaf1, "user1");
+    addApp(leaf1, "user2");
+    addApp(leaf1, "user3");
+    addApp(leaf2, "user1");
+    assertEquals(2, leaf1.getNumRunnableApps());
+    assertEquals(1, leaf1.getNumNonRunnableApps());
+    assertEquals(1, leaf2.getNumNonRunnableApps());
+
+    removeApp(app1);
+    assertEquals(2, leaf1.getNumRunnableApps());
+    assertEquals(1, leaf2.getNumRunnableApps());
+    assertEquals(0, leaf1.getNumNonRunnableApps());
+    assertEquals(0, leaf2.getNumNonRunnableApps());
+  }
+
+  @Test
+  public void testRemoveEnablingOrderedByStartTime() {
+    LeafQueue leaf1 = (LeafQueue) queueManager
+        .getQueueByFullName("root.queue1.subqueue1.leaf1");
+    LeafQueue leaf2 = (LeafQueue) queueManager
+        .getQueueByFullName("root.queue1.subqueue2.leaf2");
+    ParentQueue queue1 = (ParentQueue) queueManager
+        .getQueueByFullName("root.queue1");
+    queue1.setMaxParallelApps(2);
+    FiCaSchedulerApp app1 = addApp(leaf1, "user");
+    addApp(leaf2, "user");
+    addApp(leaf2, "user");
+    clock.tickSec(20);
+    addApp(leaf1, "user");
+    assertEquals(1, leaf1.getNumRunnableApps());
+    assertEquals(1, leaf2.getNumRunnableApps());
+    assertEquals(1, leaf1.getNumNonRunnableApps());
+    assertEquals(1, leaf2.getNumNonRunnableApps());
+    removeApp(app1);
+    assertEquals(0, leaf1.getNumRunnableApps());
+    assertEquals(2, leaf2.getNumRunnableApps());
+    assertEquals(0, leaf2.getNumNonRunnableApps());
+  }
+
+  @Test
+  public void testMultipleAppsWaitingOnCousinQueue() {
+    LeafQueue leaf1 = (LeafQueue) queueManager
+        .getQueueByFullName("root.queue1.subqueue1.leaf1");
+    LeafQueue leaf2 = (LeafQueue) queueManager
+        .getQueueByFullName("root.queue1.subqueue2.leaf2");
+    ParentQueue queue1 = (ParentQueue) queueManager
+        .getQueueByFullName("root.queue1");
+    queue1.setMaxParallelApps(2);
+    FiCaSchedulerApp app1 = addApp(leaf1, "user");
+    addApp(leaf2, "user");
+    addApp(leaf2, "user");
+    addApp(leaf2, "user");
+    assertEquals(1, leaf1.getNumRunnableApps());
+    assertEquals(1, leaf2.getNumRunnableApps());
+    assertEquals(2, leaf2.getNumNonRunnableApps());
+    removeApp(app1);
+    assertEquals(0, leaf1.getNumRunnableApps());
+    assertEquals(2, leaf2.getNumRunnableApps());
+    assertEquals(1, leaf2.getNumNonRunnableApps());
+  }
+
+  @Test
+  public void testMultiListStartTimeIteratorEmptyAppLists() {
+    List<List<FiCaSchedulerApp>> lists =
+        new ArrayList<List<FiCaSchedulerApp>>();
+    lists.add(Arrays.asList(mockAppAttempt(1)));
+    lists.add(Arrays.asList(mockAppAttempt(2)));
+    Iterator<FiCaSchedulerApp> iter =
+        new CSMaxRunningAppsEnforcer.MultiListStartTimeIterator(lists);
+    assertEquals(1, iter.next().getStartTime());
+    assertEquals(2, iter.next().getStartTime());
+  }
+
+  private FiCaSchedulerApp mockAppAttempt(long startTime) {
+    FiCaSchedulerApp schedApp = mock(FiCaSchedulerApp.class);
+    when(schedApp.getStartTime()).thenReturn(startTime);
+    return schedApp;
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMaxParallelApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMaxParallelApps.java
new file mode 100644
index 0000000..d2e3278
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMaxParallelApps.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class TestCapacitySchedulerMaxParallelApps {
+  private CapacitySchedulerConfiguration conf;
+  private MockRM rm;
+  private MockNM nm1;
+
+  private RMApp app1;
+  private MockAM am1;
+  private RMApp app2;
+  private MockAM am2;
+  private RMApp app3;
+  private RMAppAttempt attempt3;
+  private RMApp app4;
+  private RMAppAttempt attempt4;
+
+  private ParentQueue rootQueue;
+  private LeafQueue defaultQueue;
+
+  @Before
+  public void setUp() {
+    CapacitySchedulerConfiguration config =
+        new CapacitySchedulerConfiguration();
+    config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
+        DominantResourceCalculator.class.getName());
+
+    conf = new CapacitySchedulerConfiguration(config);
+  }
+
+  @After
+  public void after() {
+    if (rm != null) {
+      rm.stop();
+    }
+  }
+
+  @Test(timeout = 30000)
+  public void testMaxParallelAppsExceedsQueueSetting() throws Exception {
+    conf.setInt("yarn.scheduler.capacity.root.default.max-parallel-apps", 2);
+    executeCommonStepsAndChecks();
+    testWhenSettingsExceeded();
+  }
+
+  @Test(timeout = 30000)
+  public void testMaxParallelAppsExceedsDefaultQueueSetting()
+      throws Exception {
+    conf.setInt("yarn.scheduler.capacity.max-parallel-apps", 2);
+    executeCommonStepsAndChecks();
+    testWhenSettingsExceeded();
+  }
+
+  @Test(timeout = 30000)
+  public void testMaxParallelAppsExceedsUserSetting() throws Exception {
+    conf.setInt("yarn.scheduler.capacity.user.testuser.max-parallel-apps", 2);
+    executeCommonStepsAndChecks();
+    testWhenSettingsExceeded();
+  }
+
+  @Test(timeout = 30000)
+  public void testMaxParallelAppsExceedsDefaultUserSetting() throws Exception {
+    conf.setInt("yarn.scheduler.capacity.user.max-parallel-apps", 2);
+    executeCommonStepsAndChecks();
+    testWhenSettingsExceeded();
+  }
+
+  @Test(timeout = 30000)
+  public void testMaxParallelAppsWhenReloadingConfig() throws Exception {
+    conf.setInt("yarn.scheduler.capacity.root.default.max-parallel-apps", 2);
+
+    executeCommonStepsAndChecks();
+
+    RMContext rmContext = rm.getRMContext();
+    // Disable parallel apps setting + max out AM percent
+    conf.unset("yarn.scheduler.capacity.root.default.max-parallel-apps");
+    conf.setFloat(PREFIX + "maximum-am-resource-percent", 1.0f);
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    cs.reinitialize(conf, rmContext);
+
+    // Both app #3 and app #4 should transition to RUNNABLE
+    launchAMandWaitForRunning(app3, attempt3, nm1);
+    launchAMandWaitForRunning(app4, attempt4, nm1);
+    verifyRunningAndAcceptedApps(4, 0);
+  }
+
+  @Test(timeout = 30000)
+  public void testMaxAppsReachedWithNonRunnableApps() throws Exception {
+    conf.setInt("yarn.scheduler.capacity.root.default.max-parallel-apps", 2);
+    conf.setInt("yarn.scheduler.capacity.root.default.maximum-applications", 4);
+    executeCommonStepsAndChecks();
+
+    RMApp app5 = MockRMAppSubmitter.submit(rm,
+        MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
+        .withAppName("app5")
+        .withUser("testuser")
+        .withQueue("default")
+        .withWaitForAppAcceptedState(false)
+        .build());
+
+    rm.waitForState(app5.getApplicationId(), RMAppState.FAILED);
+  }
+
+  private void executeCommonStepsAndChecks() throws Exception {
+    rm = new MockRM(conf);
+    rm.start();
+
+    nm1 = rm.registerNode("h1:1234", 4096, 8);
+    rm.registerNode("h2:1234", 4096, 8);
+    rm.registerNode("h3:1234", 4096, 8);
+
+    rm.drainEvents();
+
+    app1 = MockRMAppSubmitter.submit(rm,
+        MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
+        .withAppName("app1")
+        .withUser("testuser")
+        .withQueue("default")
+        .build());
+
+    am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+    app2 = MockRMAppSubmitter.submit(rm,
+        MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
+        .withAppName("app2")
+        .withUser("testuser")
+        .withQueue("default")
+        .build());
+    am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
+
+    app3 = MockRMAppSubmitter.submit(rm,
+        MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
+        .withAppName("app3")
+        .withUser("testuser")
+        .withQueue("default")
+        .build());
+    attempt3 = MockRM.waitForAttemptScheduled(app3, rm);
+
+    app4 = MockRMAppSubmitter.submit(rm,
+        MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
+        .withAppName("app4")
+        .withUser("testuser")
+        .withQueue("default")
+        .build());
+    attempt4 = MockRM.waitForAttemptScheduled(app4, rm);
+
+    // Check that app attempt #3 and #4 are non-runnable
+    rootQueue = getRootQueue();
+    defaultQueue = getDefaultQueue();
+    Set<ApplicationAttemptId> nonRunnables =
+        Sets.newHashSet(
+            attempt3.getAppAttemptId(),
+            attempt4.getAppAttemptId());
+    verifyRunnableAppsInParent(rootQueue, 2);
+    verifyRunnableAppsInLeaf(defaultQueue, 2, nonRunnables);
+    verifyRunningAndAcceptedApps(2, 2);
+  }
+
+  private void testWhenSettingsExceeded() throws Exception {
+    // Stop app #1
+    unregisterAMandWaitForFinish(app1, am1, nm1);
+
+    // Launch app #3
+    launchAMandWaitForRunning(app3, attempt3, nm1);
+
+    // Check that attempt #4 is still non-runnable
+    verifyRunnableAppsInParent(rootQueue, 2);
+    verifyRunnableAppsInLeaf(defaultQueue, 2,
+        Collections.singleton(attempt4.getAppAttemptId()));
+    verifyRunningAndAcceptedApps(2, 1);
+
+    // Stop app #2
+    unregisterAMandWaitForFinish(app2, am2, nm1);
+
+    // Launch app #4
+    launchAMandWaitForRunning(app4, attempt4, nm1);
+    verifyRunnableAppsInParent(rootQueue, 2);
+    verifyRunnableAppsInLeaf(defaultQueue, 2,
+        Collections.emptySet());
+    verifyRunningAndAcceptedApps(2, 0);
+  }
+
+  @SuppressWarnings("checkstyle:hiddenfield")
+  private LeafQueue getDefaultQueue() {
+    CSQueue defaultQueue =
+        ((CapacityScheduler) rm.getResourceScheduler()).getQueue("default");
+
+    return (LeafQueue) defaultQueue;
+  }
+
+  private ParentQueue getRootQueue() {
+    CSQueue root =
+        ((CapacityScheduler) rm.getResourceScheduler()).getQueue("root");
+
+    return (ParentQueue) root;
+  }
+
+  private void verifyRunnableAppsInParent(ParentQueue queue,
+      int expectedRunnable) {
+    assertEquals("Num of runnable apps", expectedRunnable,
+        queue.getNumRunnableApps());
+  }
+
+  private void verifyRunnableAppsInLeaf(LeafQueue queue, int expectedRunnable,
+      Set<ApplicationAttemptId> nonRunnableIds) {
+    assertEquals("Num of runnable apps", expectedRunnable,
+        queue.getNumRunnableApps());
+
+    queue.getCopyOfNonRunnableAppSchedulables()
+        .stream()
+        .map(fca -> fca.getApplicationAttemptId())
+        .forEach(id -> assertTrue(id + " not found as non-runnable",
+          nonRunnableIds.contains(id)));
+  }
+
+  private void verifyRunningAndAcceptedApps(int expectedRunning,
+      int expectedAccepted) throws YarnException {
+    GetApplicationsRequest request = GetApplicationsRequest.newInstance();
+
+    GetApplicationsResponse resp =
+        rm.getClientRMService().getApplications(request);
+
+    List<ApplicationReport> apps = resp.getApplicationList();
+
+    long runningCount = apps
+        .stream()
+        .filter(report ->
+          report.getYarnApplicationState() == YarnApplicationState.RUNNING)
+        .count();
+
+    long acceptedCount = apps
+        .stream()
+        .filter(report ->
+          report.getYarnApplicationState() == YarnApplicationState.ACCEPTED)
+        .count();
+
+    assertEquals("Running apps count", expectedRunning, runningCount);
+    assertEquals("Accepted apps count", expectedAccepted, acceptedCount);
+  }
+
+  private void unregisterAMandWaitForFinish(RMApp app, MockAM am, MockNM nm)
+      throws Exception {
+    am.unregisterAppAttempt();
+    nm.nodeHeartbeat(app.getCurrentAppAttempt().getAppAttemptId(), 1,
+        ContainerState.COMPLETE);
+    rm.waitForState(app.getCurrentAppAttempt().getAppAttemptId(),
+        RMAppAttemptState.FINISHED);
+  }
+
+  @SuppressWarnings("rawtypes")
+  private MockAM launchAMandWaitForRunning(RMApp app, RMAppAttempt attempt,
+      MockNM nm) throws Exception {
+    nm.nodeHeartbeat(true);
+    ((AbstractYarnScheduler)rm.getResourceScheduler()).update();
+    rm.drainEvents();
+    nm.nodeHeartbeat(true);
+    MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+    am.registerAppAttempt();
+    rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+
+    return am;
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 3353eac..f664e03 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -456,6 +456,8 @@ public class TestLeafQueue {
 
   @Test
   public void testAppAttemptMetrics() throws Exception {
+    CSMaxRunningAppsEnforcer enforcer = mock(CSMaxRunningAppsEnforcer.class);
+    cs.setMaxRunningAppsEnforcer(enforcer);
 
     // Manipulate queue 'a'
     LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
index 13957e9..aa3b591 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
@@ -218,6 +218,7 @@ public class TestQueueState {
         CommonNodeLabelsManager.NO_LABEL);
     when(application.compareInputOrderTo(any(FiCaSchedulerApp.class)))
         .thenCallRealMethod();
+    when(application.isRunnable()).thenReturn(true);
     return application;
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java
index a4c1300..e893717 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java
@@ -157,6 +157,7 @@ public class TestQueueStateManager {
         CommonNodeLabelsManager.NO_LABEL);
     when(application.compareInputOrderTo(any(FiCaSchedulerApp.class)))
         .thenCallRealMethod();
+    when(application.isRunnable()).thenReturn(true);
     return application;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org