You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2020/12/01 22:40:34 UTC

[hadoop] branch trunk updated: YARN-10278: CapacityScheduler test framework ProportionalCapacityPreemptionPolicyMockFramework. Contributed by Szilard Nemeth (snemeth)

This is an automated email from the ASF dual-hosted git repository.

epayne pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fa773a8  YARN-10278: CapacityScheduler test framework ProportionalCapacityPreemptionPolicyMockFramework. Contributed by Szilard Nemeth (snemeth)
fa773a8 is described below

commit fa773a832652ac6f2a24f93fc2bfc2ae251bfe1d
Author: Eric Payne <ep...@apache.org>
AuthorDate: Tue Dec 1 22:06:47 2020 +0000

    YARN-10278: CapacityScheduler test framework ProportionalCapacityPreemptionPolicyMockFramework. Contributed by Szilard Nemeth (snemeth)
---
 ...ionalCapacityPreemptionPolicyMockFramework.java | 985 ---------------------
 .../TestPreemptionForQueueWithPriorities.java      |  67 +-
 ...lCapacityPreemptionPolicyForNodePartitions.java |  71 +-
 ...acityPreemptionPolicyForReservedContainers.java |  43 +-
 ...lCapacityPreemptionPolicyInterQueueWithDRF.java |  15 +-
 ...ortionalCapacityPreemptionPolicyIntraQueue.java |  85 +-
 ...cityPreemptionPolicyIntraQueueFairOrdering.java |  15 +-
 ...apacityPreemptionPolicyIntraQueueUserLimit.java |  55 +-
 ...lCapacityPreemptionPolicyIntraQueueWithDRF.java |  17 +-
 ...ionalCapacityPreemptionPolicyMockFramework.java |   1 +
 ...alCapacityPreemptionPolicyPreemptToBalance.java |  15 +-
 .../mockframework/ContainerSpecification.java      | 109 +++
 .../capacity/mockframework/MockApplication.java    | 165 ++++
 .../capacity/mockframework/MockApplications.java   | 182 ++++
 .../capacity/mockframework/MockContainer.java      |  78 ++
 .../capacity/mockframework/MockContainers.java     | 104 +++
 .../mockframework/MockNodeLabelsManager.java       |  85 ++
 .../capacity/mockframework/MockQueueHierarchy.java | 397 +++++++++
 .../capacity/mockframework/MockSchedulerNodes.java |  90 ++
 ...ionalCapacityPreemptionPolicyMockFramework.java | 318 +++++++
 20 files changed, 1725 insertions(+), 1172 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
deleted file mode 100644
index 87f2f1f..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
+++ /dev/null
@@ -1,985 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceInformation;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
-import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceUtils;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.hadoop.yarn.event.Event;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isA;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class ProportionalCapacityPreemptionPolicyMockFramework {
-  static final Logger LOG = LoggerFactory.getLogger(
-      TestProportionalCapacityPreemptionPolicyForNodePartitions.class);
-  final String ROOT = CapacitySchedulerConfiguration.ROOT;
-
-  Map<String, CSQueue> nameToCSQueues = null;
-  Map<String, Resource> partitionToResource = null;
-  Map<NodeId, FiCaSchedulerNode> nodeIdToSchedulerNodes = null;
-  RMNodeLabelsManager nlm = null;
-  RMContext rmContext = null;
-
-  ResourceCalculator rc = new DefaultResourceCalculator();
-  Clock mClock = null;
-  CapacitySchedulerConfiguration conf = null;
-  CapacityScheduler cs = null;
-  @SuppressWarnings("rawtypes")
-  EventHandler<Event> mDisp = null;
-  ProportionalCapacityPreemptionPolicy policy = null;
-  Resource clusterResource = null;
-  // Initialize resource map
-  Map<String, ResourceInformation> riMap = new HashMap<>();
-
-  private void resetResourceInformationMap() {
-    // Initialize mandatory resources
-    ResourceInformation memory = ResourceInformation.newInstance(
-        ResourceInformation.MEMORY_MB.getName(),
-        ResourceInformation.MEMORY_MB.getUnits(),
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
-    ResourceInformation vcores = ResourceInformation.newInstance(
-        ResourceInformation.VCORES.getName(),
-        ResourceInformation.VCORES.getUnits(),
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
-    riMap.put(ResourceInformation.MEMORY_URI, memory);
-    riMap.put(ResourceInformation.VCORES_URI, vcores);
-
-    ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
-  }
-
-  @SuppressWarnings("unchecked")
-  @Before
-  public void setup() {
-    resetResourceInformationMap();
-
-    org.apache.log4j.Logger.getRootLogger().setLevel(
-        org.apache.log4j.Level.DEBUG);
-
-    conf = new CapacitySchedulerConfiguration(new Configuration(false));
-    conf.setLong(
-        CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 10000);
-    conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
-        3000);
-    // report "ideal" preempt
-    conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
-        (float) 1.0);
-    conf.setFloat(
-        CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
-        (float) 1.0);
-
-    mClock = mock(Clock.class);
-    cs = mock(CapacityScheduler.class);
-    when(cs.getResourceCalculator()).thenReturn(rc);
-    when(cs.getPreemptionManager()).thenReturn(new PreemptionManager());
-    when(cs.getConfiguration()).thenReturn(conf);
-
-    nlm = mock(RMNodeLabelsManager.class);
-    mDisp = mock(EventHandler.class);
-
-    rmContext = mock(RMContext.class);
-    when(rmContext.getNodeLabelManager()).thenReturn(nlm);
-    Dispatcher disp = mock(Dispatcher.class);
-    when(rmContext.getDispatcher()).thenReturn(disp);
-    when(disp.getEventHandler()).thenReturn(mDisp);
-    when(cs.getRMContext()).thenReturn(rmContext);
-
-    partitionToResource = new HashMap<>();
-    nodeIdToSchedulerNodes = new HashMap<>();
-    nameToCSQueues = new HashMap<>();
-    clusterResource = Resource.newInstance(0, 0);
-  }
-
-  @After
-  public void cleanup() {
-    resetResourceInformationMap();
-  }
-
-  public void buildEnv(String labelsConfig, String nodesConfig,
-      String queuesConfig, String appsConfig) throws IOException {
-    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, false);
-  }
-
-  public void buildEnv(String labelsConfig, String nodesConfig,
-      String queuesConfig, String appsConfig,
-      boolean useDominantResourceCalculator) throws IOException {
-    if (useDominantResourceCalculator) {
-      when(cs.getResourceCalculator()).thenReturn(
-          new DominantResourceCalculator());
-    }
-    mockNodeLabelsManager(labelsConfig);
-    mockSchedulerNodes(nodesConfig);
-    for (NodeId nodeId : nodeIdToSchedulerNodes.keySet()) {
-      when(cs.getSchedulerNode(nodeId)).thenReturn(
-          nodeIdToSchedulerNodes.get(nodeId));
-    }
-    List<FiCaSchedulerNode> allNodes = new ArrayList<>(
-        nodeIdToSchedulerNodes.values());
-    when(cs.getAllNodes()).thenReturn(allNodes);
-    ParentQueue root = mockQueueHierarchy(queuesConfig);
-    when(cs.getRootQueue()).thenReturn(root);
-    when(cs.getClusterResource()).thenReturn(clusterResource);
-    mockApplications(appsConfig);
-
-    policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs,
-        mClock);
-  }
-
-  public void updateQueueConfig(String queuesConfig) {
-    ParentQueue root = mockQueueHierarchy(queuesConfig);
-    when(cs.getRootQueue()).thenReturn(root);
-  }
-
-  private void mockContainers(String containersConfig, FiCaSchedulerApp app,
-      ApplicationAttemptId attemptId, String queueName,
-      List<RMContainer> reservedContainers, List<RMContainer> liveContainers) {
-    int containerId = 1;
-    int start = containersConfig.indexOf("=") + 1;
-    int end = -1;
-
-    Resource used = Resource.newInstance(0, 0);
-    Resource pending = Resource.newInstance(0, 0);
-    Priority pri = Priority.newInstance(0);
-
-    while (start < containersConfig.length()) {
-      while (start < containersConfig.length()
-          && containersConfig.charAt(start) != '(') {
-        start++;
-      }
-      if (start >= containersConfig.length()) {
-        throw new IllegalArgumentException(
-            "Error containers specification, line=" + containersConfig);
-      }
-      end = start + 1;
-      while (end < containersConfig.length()
-          && containersConfig.charAt(end) != ')') {
-        end++;
-      }
-      if (end >= containersConfig.length()) {
-        throw new IllegalArgumentException(
-            "Error containers specification, line=" + containersConfig);
-      }
-
-      // now we found start/end, get container values
-      String[] values = containersConfig.substring(start + 1, end).split(",");
-      if (values.length < 6 || values.length > 8) {
-        throw new IllegalArgumentException("Format to define container is:"
-            + "(priority,resource,host,expression,repeat,reserved, pending)");
-      }
-      pri.setPriority(Integer.valueOf(values[0]));
-      Resource res = parseResourceFromString(values[1]);
-      NodeId host = NodeId.newInstance(values[2], 1);
-      String label = values[3];
-      String userName = "user";
-      int repeat = Integer.valueOf(values[4]);
-      boolean reserved = Boolean.valueOf(values[5]);
-      if (values.length >= 7) {
-        Resources.addTo(pending, parseResourceFromString(values[6]));
-      }
-      if (values.length == 8) {
-        userName = values[7];
-      }
-
-      for (int i = 0; i < repeat; i++) {
-        Container c = mock(Container.class);
-        Resources.addTo(used, res);
-        when(c.getResource()).thenReturn(res);
-        when(c.getPriority()).thenReturn(pri);
-        SchedulerRequestKey sk = SchedulerRequestKey.extractFrom(c);
-        RMContainerImpl rmc = mock(RMContainerImpl.class);
-        when(rmc.getAllocatedSchedulerKey()).thenReturn(sk);
-        when(rmc.getAllocatedNode()).thenReturn(host);
-        when(rmc.getNodeLabelExpression()).thenReturn(label);
-        when(rmc.getAllocatedResource()).thenReturn(res);
-        when(rmc.getContainer()).thenReturn(c);
-        when(rmc.getApplicationAttemptId()).thenReturn(attemptId);
-        when(rmc.getQueueName()).thenReturn(queueName);
-        final ContainerId cId = ContainerId.newContainerId(attemptId,
-            containerId);
-        when(rmc.getContainerId()).thenReturn(cId);
-        doAnswer(new Answer<Integer>() {
-          @Override
-          public Integer answer(InvocationOnMock invocation) throws Throwable {
-            return cId.compareTo(
-                ((RMContainer) invocation.getArguments()[0]).getContainerId());
-          }
-        }).when(rmc).compareTo(any(RMContainer.class));
-
-        if (containerId == 1) {
-          when(rmc.isAMContainer()).thenReturn(true);
-          when(app.getAMResource(label)).thenReturn(res);
-          when(app.getAppAMNodePartitionName()).thenReturn(label);
-        }
-
-        if (reserved) {
-          reservedContainers.add(rmc);
-          when(rmc.getReservedResource()).thenReturn(res);
-        } else {
-          liveContainers.add(rmc);
-        }
-
-        // Add container to scheduler-node
-        addContainerToSchedulerNode(host, rmc, reserved);
-
-        // If this is a non-exclusive allocation
-        String partition = null;
-        if (label.isEmpty()
-            && !(partition = nodeIdToSchedulerNodes.get(host).getPartition())
-                .isEmpty()) {
-          LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
-          Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers = queue
-              .getIgnoreExclusivityRMContainers();
-          if (!ignoreExclusivityContainers.containsKey(partition)) {
-            ignoreExclusivityContainers.put(partition,
-                new TreeSet<RMContainer>());
-          }
-          ignoreExclusivityContainers.get(partition).add(rmc);
-        }
-        LOG.debug("add container to app=" + attemptId + " res=" + res + " node="
-            + host + " nodeLabelExpression=" + label + " partition="
-            + partition);
-
-        containerId++;
-      }
-
-      // If app has 0 container, and it has only pending, still make sure to
-      // update label.
-      if (repeat == 0) {
-        when(app.getAppAMNodePartitionName()).thenReturn(label);
-      }
-
-      // Some more app specific aggregated data can be better filled here.
-      when(app.getPriority()).thenReturn(pri);
-      when(app.getUser()).thenReturn(userName);
-      when(app.getCurrentConsumption()).thenReturn(used);
-      when(app.getCurrentReservation())
-          .thenReturn(Resources.createResource(0, 0));
-
-      Map<String, Resource> pendingForDefaultPartition =
-          new HashMap<String, Resource>();
-      // Add for default partition for now.
-      pendingForDefaultPartition.put(label, pending);
-      when(app.getTotalPendingRequestsPerPartition())
-          .thenReturn(pendingForDefaultPartition);
-
-      // need to set pending resource in resource usage as well
-      ResourceUsage ru = Mockito.spy(new ResourceUsage());
-      ru.setUsed(label, used);
-      when(ru.getCachedUsed(anyString())).thenReturn(used);
-      when(app.getAppAttemptResourceUsage()).thenReturn(ru);
-      when(app.getSchedulingResourceUsage()).thenReturn(ru);
-
-      start = end + 1;
-    }
-  }
-
-  /**
-   * Format is:
-   * <pre>
-   * queueName\t  // app1
-   * (priority,resource,host,expression,#repeat,reserved)
-   * (priority,resource,host,expression,#repeat,reserved);
-   * queueName\t  // app2
-   * </pre>
-   */
-  private void mockApplications(String appsConfig) {
-    int id = 1;
-    HashMap<String, HashSet<String>> userMap = new HashMap<String, HashSet<String>>();
-    HashMap<String, HashMap<String, HashMap<String, ResourceUsage>>> userResourceUsagePerLabel = new HashMap<>();
-    LeafQueue queue = null;
-    int mulp = -1;
-    for (String a : appsConfig.split(";")) {
-      String[] strs = a.split("\t");
-      String queueName = strs[0];
-      if (mulp <= 0 && strs.length > 2 && strs[2] != null) {
-        mulp = 100 / (new Integer(strs[2]).intValue());
-      }
-
-      // get containers
-      List<RMContainer> liveContainers = new ArrayList<RMContainer>();
-      List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
-      ApplicationId appId = ApplicationId.newInstance(0L, id);
-      ApplicationAttemptId appAttemptId = ApplicationAttemptId
-          .newInstance(appId, 1);
-
-      FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
-      when(app.getAMResource(anyString()))
-          .thenReturn(Resources.createResource(0, 0));
-      mockContainers(strs[1], app, appAttemptId, queueName, reservedContainers,
-          liveContainers);
-      LOG.debug("Application mock: queue: " + queueName + ", appId:" + appId);
-
-      when(app.getLiveContainers()).thenReturn(liveContainers);
-      when(app.getReservedContainers()).thenReturn(reservedContainers);
-      when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
-      when(app.getApplicationId()).thenReturn(appId);
-      when(app.getQueueName()).thenReturn(queueName);
-
-      // add to LeafQueue
-      queue = (LeafQueue) nameToCSQueues.get(queueName);
-      queue.getApplications().add(app);
-      queue.getAllApplications().add(app);
-      when(queue.getMinimumAllocation())
-      .thenReturn(Resource.newInstance(1,1));
-      when(app.getCSLeafQueue()).thenReturn(queue);
-
-      HashSet<String> users = userMap.get(queueName);
-      if (null == users) {
-        users = new HashSet<String>();
-        userMap.put(queueName, users);
-      }
-      users.add(app.getUser());
-
-      String label = app.getAppAMNodePartitionName();
-
-      // Get label to queue
-      HashMap<String, HashMap<String, ResourceUsage>> userResourceUsagePerQueue = userResourceUsagePerLabel
-          .get(label);
-      if (null == userResourceUsagePerQueue) {
-        userResourceUsagePerQueue = new HashMap<>();
-        userResourceUsagePerLabel.put(label, userResourceUsagePerQueue);
-      }
-
-      // Get queue to user based resource map
-      HashMap<String, ResourceUsage> userResourceUsage = userResourceUsagePerQueue
-          .get(queueName);
-      if (null == userResourceUsage) {
-        userResourceUsage = new HashMap<>();
-        userResourceUsagePerQueue.put(queueName, userResourceUsage);
-      }
-
-      // Get user to its resource usage.
-      ResourceUsage usage = userResourceUsage.get(app.getUser());
-      if (null == usage) {
-        usage = new ResourceUsage();
-        userResourceUsage.put(app.getUser(), usage);
-      }
-
-      usage.incAMUsed(app.getAMResource(label));
-      usage.incUsed(app.getAppAttemptResourceUsage().getUsed(label));
-      id++;
-    }
-
-    for (String label : userResourceUsagePerLabel.keySet()) {
-      for (String queueName : userMap.keySet()) {
-        queue = (LeafQueue) nameToCSQueues.get(queueName);
-        // Currently we have user-limit test support only for default label.
-        Resource totResoucePerPartition = partitionToResource.get("");
-        Resource capacity = Resources.multiply(totResoucePerPartition,
-            queue.getQueueCapacities().getAbsoluteCapacity());
-        HashSet<String> users = userMap.get(queue.getQueueName());
-        //TODO: Refactor this test class to use queue path internally like
-        // CS does from now on
-        if (users == null) {
-          users = userMap.get(queue.getQueuePath());
-        }
-        when(queue.getAllUsers()).thenReturn(users);
-        Resource userLimit;
-        if (mulp > 0) {
-          userLimit = Resources.divideAndCeil(rc, capacity, mulp);
-        } else {
-          userLimit = Resources.divideAndCeil(rc, capacity,
-              users.size());
-        }
-        LOG.debug("Updating user-limit from mock: totResoucePerPartition="
-            + totResoucePerPartition + ", capacity=" + capacity
-            + ", users.size()=" + users.size() + ", userlimit= " + userLimit
-            + ",label= " + label + ",queueName= " + queueName);
-
-        HashMap<String, ResourceUsage> userResourceUsage = userResourceUsagePerLabel
-            .get(label).get(queueName);
-        for (String userName : users) {
-          User user = new User(userName);
-          if (userResourceUsage != null) {
-            user.setResourceUsage(userResourceUsage.get(userName));
-          }
-          when(queue.getUser(eq(userName))).thenReturn(user);
-          when(queue.getResourceLimitForAllUsers(eq(userName),
-              any(Resource.class), anyString(), any(SchedulingMode.class)))
-                  .thenReturn(userLimit);
-        }
-      }
-    }
-  }
-
-  private void addContainerToSchedulerNode(NodeId nodeId, RMContainer container,
-      boolean isReserved) {
-    SchedulerNode node = nodeIdToSchedulerNodes.get(nodeId);
-    assert node != null;
-
-    if (isReserved) {
-      when(node.getReservedContainer()).thenReturn(container);
-    } else {
-      node.getCopiedListOfRunningContainers().add(container);
-      Resources.subtractFrom(node.getUnallocatedResource(),
-          container.getAllocatedResource());
-    }
-  }
-
-  /**
-   * Format is:
-   * host1=partition[ res=resource];
-   * host2=partition[ res=resource];
-   */
-  private void mockSchedulerNodes(String schedulerNodesConfigStr)
-      throws IOException {
-    String[] nodesConfigStrArray = schedulerNodesConfigStr.split(";");
-    for (String p : nodesConfigStrArray) {
-      String[] arr = p.split(" ");
-
-      NodeId nodeId = NodeId.newInstance(arr[0].substring(0, arr[0].indexOf("=")), 1);
-      String partition = arr[0].substring(arr[0].indexOf("=") + 1, arr[0].length());
-
-      FiCaSchedulerNode sn = mock(FiCaSchedulerNode.class);
-      when(sn.getNodeID()).thenReturn(nodeId);
-      when(sn.getPartition()).thenReturn(partition);
-
-      Resource totalRes = Resources.createResource(0);
-      if (arr.length > 1) {
-        String res = arr[1];
-        if (res.contains("res=")) {
-          String resSring = res.substring(
-              res.indexOf("res=") + "res=".length());
-          totalRes = parseResourceFromString(resSring);
-        }
-      }
-      when(sn.getTotalResource()).thenReturn(totalRes);
-      when(sn.getUnallocatedResource()).thenReturn(Resources.clone(totalRes));
-
-      // TODO, add settings of killable resources when necessary
-      when(sn.getTotalKillableResources()).thenReturn(Resources.none());
-
-      List<RMContainer> liveContainers = new ArrayList<>();
-      when(sn.getCopiedListOfRunningContainers()).thenReturn(liveContainers);
-
-      nodeIdToSchedulerNodes.put(nodeId, sn);
-
-      LOG.debug("add scheduler node, id=" + nodeId + ", partition=" + partition);
-    }
-  }
-
-  /**
-   * Format is:
-   * <pre>
-   * partition0=total_resource,exclusivity;
-   * partition1=total_resource,exclusivity;
-   * ...
-   * </pre>
-   */
-  private void mockNodeLabelsManager(String nodeLabelsConfigStr) throws IOException {
-    String[] partitionConfigArr = nodeLabelsConfigStr.split(";");
-    clusterResource = Resources.createResource(0);
-    for (String p : partitionConfigArr) {
-      String partitionName = p.substring(0, p.indexOf("="));
-      Resource res = parseResourceFromString(p.substring(p.indexOf("=") + 1,
-          p.indexOf(",")));
-      boolean exclusivity =
-          Boolean.valueOf(p.substring(p.indexOf(",") + 1, p.length()));
-      when(nlm.getResourceByLabel(eq(partitionName), any(Resource.class)))
-          .thenReturn(res);
-      when(nlm.isExclusiveNodeLabel(eq(partitionName))).thenReturn(exclusivity);
-
-      // add to partition to resource
-      partitionToResource.put(partitionName, res);
-      LOG.debug("add partition=" + partitionName + " totalRes=" + res
-          + " exclusivity=" + exclusivity);
-      Resources.addTo(clusterResource, res);
-    }
-
-    when(nlm.getClusterNodeLabelNames()).thenReturn(
-        partitionToResource.keySet());
-  }
-
-  private Resource parseResourceFromString(String p) {
-    String[] resource = p.split(":");
-    Resource res;
-    if (resource.length == 1) {
-      res = Resources.createResource(Integer.valueOf(resource[0]));
-    } else {
-      res = Resources.createResource(Integer.valueOf(resource[0]),
-          Integer.valueOf(resource[1]));
-      if (resource.length > 2) {
-        // Using the same order of resources from ResourceUtils, set resource
-        // informations.
-        ResourceInformation[] storedResourceInfo = ResourceUtils
-            .getResourceTypesArray();
-        for (int i = 2; i < resource.length; i++) {
-          res.setResourceInformation(storedResourceInfo[i].getName(),
-              ResourceInformation.newInstance(storedResourceInfo[i].getName(),
-                  storedResourceInfo[i].getUnits(),
-                  Integer.valueOf(resource[i])));
-        }
-      }
-    }
-    return res;
-  }
-
-  /**
-   * Format is:
-   * <pre>
-   * root (<partition-name-1>=[guaranteed max used pending (reserved)],<partition-name-2>=..);
-   * -A(...);
-   * --A1(...);
-   * --A2(...);
-   * -B...
-   * </pre>
-   * ";" splits queues, and there should no empty lines, no extra spaces
-   *
-   * For each queue, it has configurations to specify capacities (to each
-   * partition), format is:
-   * <pre>
-   * -<queueName> (<labelName1>=[guaranteed max used pending], \
-   *               <labelName2>=[guaranteed max used pending])
-   *              {key1=value1,key2=value2};  // Additional configs
-   * </pre>
-   */
-  @SuppressWarnings({ "unchecked", "rawtypes" })
-  private ParentQueue mockQueueHierarchy(String queueExprs) {
-    String[] queueExprArray = queueExprs.split(";");
-    ParentQueue rootQueue = null;
-    for (int idx = 0; idx < queueExprArray.length; idx++) {
-      String q = queueExprArray[idx];
-      CSQueue queue;
-
-      // Initialize queue
-      if (isParent(queueExprArray, idx)) {
-        ParentQueue parentQueue = mock(ParentQueue.class);
-        queue = parentQueue;
-        List<CSQueue> children = new ArrayList<CSQueue>();
-        when(parentQueue.getChildQueues()).thenReturn(children);
-        QueueOrderingPolicy policy = mock(QueueOrderingPolicy.class);
-        when(policy.getConfigName()).thenReturn(
-            CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
-        when(parentQueue.getQueueOrderingPolicy()).thenReturn(policy);
-      } else {
-        LeafQueue leafQueue = mock(LeafQueue.class);
-        final TreeSet<FiCaSchedulerApp> apps = new TreeSet<>(
-            new Comparator<FiCaSchedulerApp>() {
-              @Override
-              public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
-                if (a1.getPriority() != null
-                    && !a1.getPriority().equals(a2.getPriority())) {
-                  return a1.getPriority().compareTo(a2.getPriority());
-                }
-
-                int res = a1.getApplicationId()
-                    .compareTo(a2.getApplicationId());
-                return res;
-              }
-            });
-        when(leafQueue.getApplications()).thenReturn(apps);
-        when(leafQueue.getAllApplications()).thenReturn(apps);
-        OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
-        String opName = conf.get(CapacitySchedulerConfiguration.PREFIX
-            + CapacitySchedulerConfiguration.ROOT + "." + getQueueName(q)
-            + ".ordering-policy", "fifo");
-        if (opName.equals("fair")) {
-          so = Mockito.spy(new FairOrderingPolicy<FiCaSchedulerApp>());
-        }
-        when(so.getPreemptionIterator()).thenAnswer(new Answer() {
-          public Object answer(InvocationOnMock invocation) {
-            return apps.descendingIterator();
-          }
-        });
-        when(leafQueue.getOrderingPolicy()).thenReturn(so);
-
-        Map<String, TreeSet<RMContainer>> ignorePartitionContainers =
-            new HashMap<>();
-        when(leafQueue.getIgnoreExclusivityRMContainers()).thenReturn(
-            ignorePartitionContainers);
-        queue = leafQueue;
-      }
-
-      ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-      when(queue.getReadLock()).thenReturn(lock.readLock());
-      setupQueue(queue, q, queueExprArray, idx);
-      if (queue.getQueuePath().equals(ROOT)) {
-        rootQueue = (ParentQueue) queue;
-      }
-    }
-    return rootQueue;
-  }
-
-  private void setupQueue(CSQueue queue, String q, String[] queueExprArray,
-      int idx) {
-    LOG.debug("*** Setup queue, source=" + q);
-    String queuePath = null;
-
-    int myLevel = getLevel(q);
-    if (0 == myLevel) {
-      // It's root
-      when(queue.getQueuePath()).thenReturn(ROOT);
-      queuePath = ROOT;
-    }
-
-    String queueName = getQueueName(q);
-    when(queue.getQueueName()).thenReturn(queueName);
-
-    // Setup parent queue, and add myself to parentQueue.children-list
-    ParentQueue parentQueue = getParentQueue(queueExprArray, idx, myLevel);
-    if (null != parentQueue) {
-      when(queue.getParent()).thenReturn(parentQueue);
-      parentQueue.getChildQueues().add(queue);
-
-      // Setup my path
-      queuePath = parentQueue.getQueuePath() + "." + queueName;
-    }
-    when(queue.getQueuePath()).thenReturn(queuePath);
-
-    QueueCapacities qc = new QueueCapacities(0 == myLevel);
-    ResourceUsage ru = new ResourceUsage();
-    QueueResourceQuotas qr  = new QueueResourceQuotas();
-
-    when(queue.getQueueCapacities()).thenReturn(qc);
-    when(queue.getQueueResourceUsage()).thenReturn(ru);
-    when(queue.getQueueResourceQuotas()).thenReturn(qr);
-
-    LOG.debug("Setup queue, short name=" + queue.getQueueName() + " path="
-        + queue.getQueuePath());
-    LOG.debug("Parent=" + (parentQueue == null ? "null" : parentQueue
-        .getQueuePath()));
-
-    // Setup other fields like used resource, guaranteed resource, etc.
-    String capacitySettingStr = q.substring(q.indexOf("(") + 1, q.indexOf(")"));
-    for (String s : capacitySettingStr.split(",")) {
-      String partitionName = s.substring(0, s.indexOf("="));
-      String[] values = s.substring(s.indexOf("[") + 1, s.indexOf("]")).split(" ");
-      // Add a small epsilon to capacities to avoid truncate when doing
-      // Resources.multiply
-      float epsilon = 1e-6f;
-      Resource totResoucePerPartition = partitionToResource.get(partitionName);
-      float absGuaranteed = Resources.divide(rc, totResoucePerPartition,
-          parseResourceFromString(values[0].trim()), totResoucePerPartition)
-          + epsilon;
-      float absMax = Resources.divide(rc, totResoucePerPartition,
-          parseResourceFromString(values[1].trim()), totResoucePerPartition)
-          + epsilon;
-      float absUsed = Resources.divide(rc, totResoucePerPartition,
-          parseResourceFromString(values[2].trim()), totResoucePerPartition)
-          + epsilon;
-      float used = Resources.divide(rc, totResoucePerPartition,
-          parseResourceFromString(values[2].trim()),
-          parseResourceFromString(values[0].trim())) + epsilon;
-      Resource pending = parseResourceFromString(values[3].trim());
-      qc.setAbsoluteCapacity(partitionName, absGuaranteed);
-      qc.setAbsoluteMaximumCapacity(partitionName, absMax);
-      qc.setAbsoluteUsedCapacity(partitionName, absUsed);
-      qc.setUsedCapacity(partitionName, used);
-      qr.setEffectiveMaxResource(parseResourceFromString(values[1].trim()));
-      qr.setEffectiveMinResource(parseResourceFromString(values[0].trim()));
-      qr.setEffectiveMaxResource(partitionName,
-          parseResourceFromString(values[1].trim()));
-      qr.setEffectiveMinResource(partitionName,
-          parseResourceFromString(values[0].trim()));
-      when(queue.getUsedCapacity()).thenReturn(used);
-      when(queue.getEffectiveCapacity(partitionName))
-          .thenReturn(parseResourceFromString(values[0].trim()));
-      when(queue.getEffectiveMaxCapacity(partitionName))
-          .thenReturn(parseResourceFromString(values[1].trim()));
-      ru.setPending(partitionName, pending);
-      // Setup reserved resource if it contained by input config
-      Resource reserved = Resources.none();
-      if(values.length == 5) {
-        reserved = parseResourceFromString(values[4].trim());
-        ru.setReserved(partitionName, reserved);
-      }
-      if (!isParent(queueExprArray, idx)) {
-        LeafQueue lq = (LeafQueue) queue;
-        when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
-            isA(String.class), eq(false))).thenReturn(pending);
-        when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
-            isA(String.class), eq(true))).thenReturn(
-            Resources.subtract(pending, reserved));
-      }
-      ru.setUsed(partitionName, parseResourceFromString(values[2].trim()));
-
-      LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
-          + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
-          + ",abs_used" + absUsed + ",pending_resource=" + pending
-          + ", reserved_resource=" + reserved + "]");
-    }
-
-    // Setup preemption disabled
-    when(queue.getPreemptionDisabled()).thenReturn(
-        conf.getPreemptionDisabled(queuePath, false));
-
-    // Setup other queue configurations
-    Map<String, String> otherConfigs = getOtherConfigurations(
-        queueExprArray[idx]);
-    if (otherConfigs.containsKey("priority")) {
-      when(queue.getPriority()).thenReturn(
-          Priority.newInstance(Integer.valueOf(otherConfigs.get("priority"))));
-    } else {
-      // set queue's priority to 0 by default
-      when(queue.getPriority()).thenReturn(Priority.newInstance(0));
-    }
-
-    // Setup disable preemption of queues
-    if (otherConfigs.containsKey("disable_preemption")) {
-      when(queue.getPreemptionDisabled()).thenReturn(
-          Boolean.valueOf(otherConfigs.get("disable_preemption")));
-    }
-
-    //TODO: Refactor this test class to use queue path internally like CS
-    // does from now on
-    nameToCSQueues.put(queuePath, queue);
-    nameToCSQueues.put(queueName, queue);
-    when(cs.getQueue(eq(queuePath))).thenReturn(queue);
-    when(cs.getQueue(eq(queueName))).thenReturn(queue);
-    when(cs.normalizeQueueName(eq(queuePath))).thenReturn(queuePath);
-    when(cs.normalizeQueueName(eq(queueName))).thenReturn(queuePath);
-  }
-
-  /**
-   * Get additional queue's configurations
-   * @param queueExpr queue expr
-   * @return maps of configs
-   */
-  private Map<String, String> getOtherConfigurations(String queueExpr) {
-    if (queueExpr.contains("{")) {
-      int left = queueExpr.indexOf('{');
-      int right = queueExpr.indexOf('}');
-
-      if (right > left) {
-        Map<String, String> configs = new HashMap<>();
-
-        String subStr = queueExpr.substring(left + 1, right);
-        for (String kv : subStr.split(",")) {
-          if (kv.contains("=")) {
-            String key = kv.substring(0, kv.indexOf("="));
-            String value = kv.substring(kv.indexOf("=") + 1);
-            configs.put(key, value);
-          }
-        }
-
-        return configs;
-      }
-    }
-
-    return Collections.emptyMap();
-  }
-
-  /**
-   * Level of a queue is how many "-" at beginning, root's level is 0
-   */
-  private int getLevel(String q) {
-    int level = 0; // level = how many "-" at beginning
-    while (level < q.length() && q.charAt(level) == '-') {
-      level++;
-    }
-    return level;
-  }
-
-  private String getQueueName(String q) {
-    int idx = 0;
-    // find first != '-' char
-    while (idx < q.length() && q.charAt(idx) == '-') {
-      idx++;
-    }
-    if (idx == q.length()) {
-      throw new IllegalArgumentException("illegal input:" + q);
-    }
-    // name = after '-' and before '('
-    String name = q.substring(idx, q.indexOf('('));
-    if (name.isEmpty()) {
-      throw new IllegalArgumentException("queue name shouldn't be empty:" + q);
-    }
-    if (name.contains(".")) {
-      throw new IllegalArgumentException("queue name shouldn't contain '.':"
-          + name);
-    }
-    return name;
-  }
-
-  private ParentQueue getParentQueue(String[] queueExprArray, int idx, int myLevel) {
-    idx--;
-    while (idx >= 0) {
-      int level = getLevel(queueExprArray[idx]);
-      if (level < myLevel) {
-        String parentQueuName = getQueueName(queueExprArray[idx]);
-        return (ParentQueue) nameToCSQueues.get(parentQueuName);
-      }
-      idx--;
-    }
-
-    return null;
-  }
-
-  /**
-   * Get if a queue is ParentQueue
-   */
-  private boolean isParent(String[] queues, int idx) {
-    int myLevel = getLevel(queues[idx]);
-    idx++;
-    while (idx < queues.length && getLevel(queues[idx]) == myLevel) {
-      idx++;
-    }
-    if (idx >= queues.length || getLevel(queues[idx]) < myLevel) {
-      // It's a LeafQueue
-      return false;
-    } else {
-      return true;
-    }
-  }
-
-  public ApplicationAttemptId getAppAttemptId(int id) {
-    ApplicationId appId = ApplicationId.newInstance(0L, id);
-    ApplicationAttemptId appAttemptId =
-        ApplicationAttemptId.newInstance(appId, 1);
-    return appAttemptId;
-  }
-
-  public void checkContainerNodesInApp(FiCaSchedulerApp app,
-      int expectedContainersNumber, String host) {
-    NodeId nodeId = NodeId.newInstance(host, 1);
-    int num = 0;
-    for (RMContainer c : app.getLiveContainers()) {
-      if (c.getAllocatedNode().equals(nodeId)) {
-        num++;
-      }
-    }
-    for (RMContainer c : app.getReservedContainers()) {
-      if (c.getAllocatedNode().equals(nodeId)) {
-        num++;
-      }
-    }
-    Assert.assertEquals(expectedContainersNumber, num);
-  }
-
-  public FiCaSchedulerApp getApp(String queueName, int appId) {
-    for (FiCaSchedulerApp app : ((LeafQueue) cs.getQueue(queueName))
-        .getApplications()) {
-      if (app.getApplicationId().getId() == appId) {
-        return app;
-      }
-    }
-    return null;
-  }
-
-  public void checkAbsCapacities(CSQueue queue, String partition,
-      float guaranteed, float max, float used) {
-    QueueCapacities qc = queue.getQueueCapacities();
-    Assert.assertEquals(guaranteed, qc.getAbsoluteCapacity(partition), 1e-3);
-    Assert.assertEquals(max, qc.getAbsoluteMaximumCapacity(partition), 1e-3);
-    Assert.assertEquals(used, qc.getAbsoluteUsedCapacity(partition), 1e-3);
-  }
-
-  public void checkPendingResource(CSQueue queue, String partition, int pending) {
-    ResourceUsage ru = queue.getQueueResourceUsage();
-    Assert.assertEquals(pending, ru.getPending(partition).getMemorySize());
-  }
-
-  public void checkPriority(CSQueue queue, int expectedPriority) {
-    Assert.assertEquals(expectedPriority, queue.getPriority().getPriority());
-  }
-
-  public void checkReservedResource(CSQueue queue, String partition, int reserved) {
-    ResourceUsage ru = queue.getQueueResourceUsage();
-    Assert.assertEquals(reserved, ru.getReserved(partition).getMemorySize());
-  }
-
-  static class IsPreemptionRequestForQueueAndNode
-      implements ArgumentMatcher<ContainerPreemptEvent> {
-    private final ApplicationAttemptId appAttId;
-    private final String queueName;
-    private final NodeId nodeId;
-
-    IsPreemptionRequestForQueueAndNode(ApplicationAttemptId appAttId,
-        String queueName, NodeId nodeId) {
-      this.appAttId = appAttId;
-      this.queueName = queueName;
-      this.nodeId = nodeId;
-    }
-    @Override
-    public boolean matches(ContainerPreemptEvent cpe) {
-      return appAttId.equals(cpe.getAppId())
-          && queueName.equals(cpe.getContainer().getQueueName())
-          && nodeId.equals(cpe.getContainer().getAllocatedNode());
-    }
-    @Override
-    public String toString() {
-      return appAttId.toString();
-    }
-  }
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
index f9fda99..1f82617 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
@@ -39,7 +40,7 @@ public class TestPreemptionForQueueWithPriorities
     extends ProportionalCapacityPreemptionPolicyMockFramework {
   @Before
   public void setup() {
-    rc = new DefaultResourceCalculator();
+    resourceCalculator = new DefaultResourceCalculator();
     super.setup();
     policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
   }
@@ -82,13 +83,13 @@ public class TestPreemptionForQueueWithPriorities
 
     // 10 preempted from app1, 15 preempted from app2, and nothing preempted
     // from app3
-    verify(mDisp, times(10)).handle(argThat(
+    verify(eventHandler, times(10)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, times(15)).handle(argThat(
+    verify(eventHandler, times(15)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
-    verify(mDisp, never()).handle(argThat(
+    verify(eventHandler, never()).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
   }
@@ -135,13 +136,13 @@ public class TestPreemptionForQueueWithPriorities
 
     // 10 preempted from app1, 15 preempted from app2, and nothing preempted
     // from app3
-    verify(mDisp, times(10)).handle(argThat(
+    verify(eventHandler, times(10)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, times(15)).handle(argThat(
+    verify(eventHandler, times(15)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
-    verify(mDisp, never()).handle(argThat(
+    verify(eventHandler, never()).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
   }
@@ -185,10 +186,10 @@ public class TestPreemptionForQueueWithPriorities
     policy.editSchedule();
 
     // Nothing preempted
-    verify(mDisp, never()).handle(argThat(
+    verify(eventHandler, never()).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, never()).handle(argThat(
+    verify(eventHandler, never()).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
   }
@@ -236,16 +237,16 @@ public class TestPreemptionForQueueWithPriorities
     // from app3/app4
     // (After preemption, a has 35 - 23 = 12, b has 25 - 6 = 19, so a:b after
     //  preemption is 1.58, close to 1.50)
-    verify(mDisp, times(23)).handle(argThat(
+    verify(eventHandler, times(23)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, times(6)).handle(argThat(
+    verify(eventHandler, times(6)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
-    verify(mDisp, never()).handle(argThat(
+    verify(eventHandler, never()).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
-    verify(mDisp, never()).handle(argThat(
+    verify(eventHandler, never()).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(4))));
   }
@@ -295,16 +296,16 @@ public class TestPreemptionForQueueWithPriorities
     // We suppose to preempt some resource from A, but now since queueA
     // disables preemption, so we need to preempt some resource from B and
     // some from C even if C has higher priority than A
-    verify(mDisp, never()).handle(argThat(
+    verify(eventHandler, never()).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, times(9)).handle(argThat(
+    verify(eventHandler, times(9)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
-    verify(mDisp, times(19)).handle(argThat(
+    verify(eventHandler, times(19)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
-    verify(mDisp, never()).handle(argThat(
+    verify(eventHandler, never()).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(4))));
   }
@@ -352,16 +353,16 @@ public class TestPreemptionForQueueWithPriorities
 
     // Preemption should first divide capacities between a / b, and b2 should
     // get less preemption than b1 (because b2 has higher priority)
-    verify(mDisp, times(6)).handle(argThat(
+    verify(eventHandler, times(6)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, times(1)).handle(argThat(
+    verify(eventHandler, times(1)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
-    verify(mDisp, times(13)).handle(argThat(
+    verify(eventHandler, times(13)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
-    verify(mDisp, times(10)).handle(argThat(
+    verify(eventHandler, times(10)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(4))));
   }
@@ -408,16 +409,16 @@ public class TestPreemptionForQueueWithPriorities
 
     // Preemption should first divide capacities between a / b, and b1 should
     // get less preemption than b2 (because b1 has higher priority)
-    verify(mDisp, times(3)).handle(argThat(
+    verify(eventHandler, times(3)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, never()).handle(argThat(
+    verify(eventHandler, never()).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
-    verify(mDisp, never()).handle(argThat(
+    verify(eventHandler, never()).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
-    verify(mDisp, times(2)).handle(argThat(
+    verify(eventHandler, times(2)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(4))));
   }
@@ -474,23 +475,23 @@ public class TestPreemptionForQueueWithPriorities
     // get less preemption than a1 (because a2 has higher priority). More
     // specifically, a2 will not get preempted since the resource preempted
     // from a1 can satisfy b already.
-    verify(mDisp, times(7)).handle(argThat(
+    verify(eventHandler, times(7)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
 
-    verify(mDisp, never()).handle(argThat(
+    verify(eventHandler, never()).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
 
-    verify(mDisp, never()).handle(argThat(
+    verify(eventHandler, never()).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
   }
 
   @Test
   public void test3ResourceTypesInterQueuePreemption() throws IOException {
-    rc = new DominantResourceCalculator();
-    when(cs.getResourceCalculator()).thenReturn(rc);
+    resourceCalculator = new DominantResourceCalculator();
+    when(cs.getResourceCalculator()).thenReturn(resourceCalculator);
 
     // Initialize resource map
     String RESOURCE_1 = "res1";
@@ -535,7 +536,7 @@ public class TestPreemptionForQueueWithPriorities
     buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
     policy.editSchedule();
 
-    verify(mDisp, times(1)).handle(argThat(
+    verify(eventHandler, times(1)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
   }
@@ -589,10 +590,10 @@ public class TestPreemptionForQueueWithPriorities
     // b and c has same relativeAssigned=1.0f(idealAssigned / guaranteed),
     // since c has higher priority, c will be put in mostUnderServedQueue and
     // get all remain 30 capacity.
-    verify(mDisp, times(10)).handle(argThat(
+    verify(eventHandler, times(10)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, never()).handle(argThat(
+    verify(eventHandler, never()).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
index 0300e19..96e91d5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -90,13 +91,13 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
 
     // 30 preempted from app1, 30 preempted from app4, and nothing preempted
     // from app2/app3
-    verify(mDisp, times(30)).handle(
+    verify(eventHandler, times(30)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
-    verify(mDisp, times(30)).handle(
+    verify(eventHandler, times(30)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(4))));
-    verify(mDisp, never()).handle(
+    verify(eventHandler, never()).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
-    verify(mDisp, never()).handle(
+    verify(eventHandler, never()).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
   }
 
@@ -146,9 +147,9 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
     policy.editSchedule();
 
     // No preemption happens
-    verify(mDisp, never()).handle(
+    verify(eventHandler, never()).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
-    verify(mDisp, never()).handle(
+    verify(eventHandler, never()).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
   }
 
@@ -195,9 +196,9 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
 
     // 30 preempted from app1, 30 preempted from app4, and nothing preempted
     // from app2/app3
-    verify(mDisp, times(20)).handle(
+    verify(eventHandler, times(20)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
-    verify(mDisp, times(30)).handle(
+    verify(eventHandler, times(30)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
   }
 
@@ -247,16 +248,16 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
     policy.editSchedule();
 
     // 4 from app1
-    verify(mDisp, times(4)).handle(
+    verify(eventHandler, times(4)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
     // 19 from app2-app5
-    verify(mDisp, times(19)).handle(
+    verify(eventHandler, times(19)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
-    verify(mDisp, times(19)).handle(
+    verify(eventHandler, times(19)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
-    verify(mDisp, times(19)).handle(
+    verify(eventHandler, times(19)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(4))));
-    verify(mDisp, times(19)).handle(
+    verify(eventHandler, times(19)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(5))));
   }
 
@@ -308,16 +309,16 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
     policy.editSchedule();
 
     // 4 from app1
-    verify(mDisp, times(19)).handle(
+    verify(eventHandler, times(19)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
     // 19 from app2-app5
-    verify(mDisp, times(19)).handle(
+    verify(eventHandler, times(19)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
-    verify(mDisp, times(19)).handle(
+    verify(eventHandler, times(19)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
-    verify(mDisp, times(20)).handle(
+    verify(eventHandler, times(20)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(4))));
-    verify(mDisp, times(20)).handle(
+    verify(eventHandler, times(20)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(5))));
   }
 
@@ -370,11 +371,11 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
     policy.editSchedule();
 
     // 10 preempted from app1, nothing preempted from app2-app3
-    verify(mDisp, times(10)).handle(
+    verify(eventHandler, times(10)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
-    verify(mDisp, never()).handle(
+    verify(eventHandler, never()).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
-    verify(mDisp, never()).handle(
+    verify(eventHandler, never()).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
   }
 
@@ -429,11 +430,11 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
     policy.editSchedule();
 
     // 15 will be preempted app2/app3
-    verify(mDisp, times(15)).handle(
+    verify(eventHandler, times(15)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
-    verify(mDisp, times(15)).handle(
+    verify(eventHandler, times(15)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
-    verify(mDisp, never()).handle(
+    verify(eventHandler, never()).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
   }
 
@@ -499,13 +500,13 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
     policy.editSchedule();
 
     // 10 will be preempted from app1 (a1) /app4 (b2)
-    verify(mDisp, times(10)).handle(
+    verify(eventHandler, times(10)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
-    verify(mDisp, times(10)).handle(
+    verify(eventHandler, times(10)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(4))));
-    verify(mDisp, never()).handle(
+    verify(eventHandler, never()).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
-    verify(mDisp, never()).handle(
+    verify(eventHandler, never()).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
   }
 
@@ -559,9 +560,9 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
     buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
     policy.editSchedule();
 
-    verify(mDisp, times(50)).handle(
+    verify(eventHandler, times(50)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
-    verify(mDisp, times(30)).handle(
+    verify(eventHandler, times(30)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
   }
 
@@ -610,13 +611,13 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
 
     // 30 preempted from app1, 30 preempted from app4, and nothing preempted
     // from app2/app3
-    verify(mDisp, times(30)).handle(
+    verify(eventHandler, times(30)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
-    verify(mDisp, times(30)).handle(
+    verify(eventHandler, times(30)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(4))));
-    verify(mDisp, never()).handle(
+    verify(eventHandler, never()).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
-    verify(mDisp, never()).handle(
+    verify(eventHandler, never()).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
   }
 
@@ -683,7 +684,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
     buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
     policy.editSchedule();
 
-    verify(mDisp, times(7)).handle(
+    verify(eventHandler, times(7)).handle(
         argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
 
     riMap.remove(RESOURCE_1);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java
index 5410931..6c72349 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.junit.Before;
 import org.junit.Test;
@@ -82,14 +83,14 @@ public class TestProportionalCapacityPreemptionPolicyForReservedContainers
 
     // Total 5 preempted from app1 at n1, don't preempt container from other
     // app/node
-    verify(mDisp, times(5)).handle(argThat(
+    verify(eventHandler, times(5)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, times(5)).handle(
+    verify(eventHandler, times(5)).handle(
         argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1),
             "root.a",
             NodeId.newInstance("n1", 1))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
   }
@@ -136,16 +137,16 @@ public class TestProportionalCapacityPreemptionPolicyForReservedContainers
     buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
     policy.editSchedule();
 
-    verify(mDisp, times(15)).handle(argThat(
+    verify(eventHandler, times(15)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, times(10)).handle(
+    verify(eventHandler, times(10)).handle(
         argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
             NodeId.newInstance("n1", 1))));
-    verify(mDisp, times(5)).handle(
+    verify(eventHandler, times(5)).handle(
         argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
             NodeId.newInstance("n2", 1))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
   }
@@ -198,16 +199,16 @@ public class TestProportionalCapacityPreemptionPolicyForReservedContainers
     buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
     policy.editSchedule();
 
-    verify(mDisp, times(15)).handle(argThat(
+    verify(eventHandler, times(15)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, times(0)).handle(
+    verify(eventHandler, times(0)).handle(
         argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
             NodeId.newInstance("n1", 1))));
-    verify(mDisp, times(15)).handle(
+    verify(eventHandler, times(15)).handle(
         argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
             NodeId.newInstance("n2", 1))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
   }
@@ -256,10 +257,10 @@ public class TestProportionalCapacityPreemptionPolicyForReservedContainers
     buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
     policy.editSchedule();
 
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
   }
@@ -308,10 +309,10 @@ public class TestProportionalCapacityPreemptionPolicyForReservedContainers
 
     // Total 4 preempted from app1 at n1, don't preempt container from other
     // app/node
-    verify(mDisp, times(4)).handle(argThat(
+    verify(eventHandler, times(4)).handle(argThat(
         new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "root.a",
             NodeId.newInstance("n1", 1))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "root.a",
             NodeId.newInstance("n2", 1))));
   }
@@ -361,10 +362,10 @@ public class TestProportionalCapacityPreemptionPolicyForReservedContainers
 
     // Total 4 preempted from app1 at n1, don't preempt container from other
     // app/node
-    verify(mDisp, times(4)).handle(argThat(
+    verify(eventHandler, times(4)).handle(argThat(
         new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "root.a",
             NodeId.newInstance("n1", 1))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "root.a",
             NodeId.newInstance("n2", 1))));
   }
@@ -415,16 +416,16 @@ public class TestProportionalCapacityPreemptionPolicyForReservedContainers
     policy.editSchedule();
 
     // No preemption should happen
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
             NodeId.newInstance("n1", 1))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
             NodeId.newInstance("n2", 1))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
             NodeId.newInstance("n3", 1))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
             NodeId.newInstance("n4", 1))));
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java
index 7e3d686..c42867a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@@ -40,8 +41,8 @@ public class TestProportionalCapacityPreemptionPolicyInterQueueWithDRF
   @Before
   public void setup() {
     super.setup();
-    rc = new DominantResourceCalculator();
-    when(cs.getResourceCalculator()).thenReturn(rc);
+    resourceCalculator = new DominantResourceCalculator();
+    when(cs.getResourceCalculator()).thenReturn(resourceCalculator);
     policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
   }
 
@@ -75,10 +76,10 @@ public class TestProportionalCapacityPreemptionPolicyInterQueueWithDRF
     policy.editSchedule();
 
     // Preemption should happen in Queue b, preempt <10,20> to Queue a
-    verify(mDisp, never()).handle(argThat(
+    verify(eventHandler, never()).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, times(5)).handle(argThat(
+    verify(eventHandler, times(5)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
   }
@@ -121,7 +122,7 @@ public class TestProportionalCapacityPreemptionPolicyInterQueueWithDRF
     buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
     policy.editSchedule();
 
-    verify(mDisp, times(1)).handle(argThat(
+    verify(eventHandler, times(1)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
   }
@@ -166,10 +167,10 @@ public class TestProportionalCapacityPreemptionPolicyInterQueueWithDRF
     buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
     policy.editSchedule();
 
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, times(1)).handle(argThat(
+    verify(eventHandler, times(1)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
index e2336fb..8077962 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.junit.Before;
 import org.junit.Test;
@@ -35,7 +36,7 @@ import static org.mockito.Mockito.verify;
  */
 public class TestProportionalCapacityPreemptionPolicyIntraQueue
     extends
-      ProportionalCapacityPreemptionPolicyMockFramework {
+    ProportionalCapacityPreemptionPolicyMockFramework {
   @Before
   public void setup() {
     super.setup();
@@ -101,10 +102,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
 
     // For queue B, app3 and app4 were of lower priority. Hence take 8
     // containers from them by hitting the intraQueuePreemptionDemand of 20%.
-    verify(mDisp, times(1)).handle(argThat(
+    verify(eventHandler, times(1)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(4))));
-    verify(mDisp, times(7)).handle(argThat(
+    verify(eventHandler, times(7)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
   }
@@ -156,10 +157,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
     policy.editSchedule();
 
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(4))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
   }
@@ -215,16 +216,16 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     policy.editSchedule();
 
     // For queue B, none of the apps should be preempted.
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(4))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(5))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(6))));
   }
@@ -272,16 +273,16 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
 
     // For queue A/B, none of the apps should be preempted as used capacity
     // is under 50%.
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(4))));
   }
@@ -337,7 +338,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     // For queueB, eventhough app4 needs 100 resources, only 30 resources were
     // preempted. (max is 50% of guaranteed cap of any queue
     // "maxIntraQueuePreemptable")
-    verify(mDisp, times(30)).handle(argThat(
+    verify(eventHandler, times(30)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
   }
@@ -391,7 +392,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
 
     // For queue B eventhough app4 needs 100 resources, only 10 resources were
     // preempted. This is the 10% limit of TOTAL_PREEMPTION_PER_ROUND.
-    verify(mDisp, times(10)).handle(argThat(
+    verify(eventHandler, times(10)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
   }
@@ -448,10 +449,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     // As per intra queue preemption algorithm, 20 more containers were needed
     // for app2 (in queue a). Inter queue pre-emption had already preselected 9
     // containers and hence preempted only 11 more.
-    verify(mDisp, times(20)).handle(argThat(
+    verify(eventHandler, times(20)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, never()).handle(argThat(
+    verify(eventHandler, never()).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
   }
@@ -502,10 +503,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     policy.editSchedule();
 
     // Ensure that only 9 containers are preempted from app2 (sparing 1 AM)
-    verify(mDisp, times(11)).handle(argThat(
+    verify(eventHandler, times(11)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, times(9)).handle(argThat(
+    verify(eventHandler, times(9)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
   }
@@ -552,10 +553,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     policy.editSchedule();
 
     // Make sure that app1's Am container is spared. Only 9/10 is preempted.
-    verify(mDisp, times(9)).handle(argThat(
+    verify(eventHandler, times(9)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, never()).handle(argThat(
+    verify(eventHandler, never()).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
   }
@@ -595,7 +596,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     policy.editSchedule();
 
     // Ensure there are 0 preemptions since only one app is running in queue.
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
   }
@@ -638,13 +639,13 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     policy.editSchedule();
 
     // Complete demand request from QueueB for 20 resource must be preempted.
-    verify(mDisp, times(20)).handle(argThat(
+    verify(eventHandler, times(20)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
   }
@@ -703,11 +704,11 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     policy.editSchedule();
 
     // 20 preempted from app1
-    verify(mDisp, times(20))
+    verify(eventHandler, times(20))
         .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
-    verify(mDisp, never())
+    verify(eventHandler, never())
         .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
-    verify(mDisp, never())
+    verify(eventHandler, never())
         .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
   }
 
@@ -785,26 +786,26 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
 
     // High priority app in queueA has 30 resource demand. But low priority
     // app has only 5 resource. Hence preempt 4 here sparing AM.
-    verify(mDisp, times(4)).handle(argThat(
+    verify(eventHandler, times(4)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
     // Multiple high priority apps has demand  of 17. This will be preempted
     // from another set of low priority apps.
-    verify(mDisp, times(4)).handle(argThat(
+    verify(eventHandler, times(4)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(4))));
-    verify(mDisp, times(9)).handle(argThat(
+    verify(eventHandler, times(9)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(6))));
-    verify(mDisp, times(4)).handle(argThat(
+    verify(eventHandler, times(4)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(5))));
     // Only 3 resources will be freed in this round for queue C as we
     // are trying to save AM container.
-    verify(mDisp, times(2)).handle(argThat(
+    verify(eventHandler, times(2)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(10))));
-    verify(mDisp, times(1)).handle(argThat(
+    verify(eventHandler, times(1)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(11))));
   }
@@ -861,7 +862,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     // 14 more (5 is already running) eventhough demand is for 30. Ideally we
     // must preempt 15. But 15th container will bring user1's usage to 20 which
     // is same as user-limit. Hence skip 15th container.
-    verify(mDisp, times(14)).handle(argThat(
+    verify(eventHandler, times(14)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
   }
@@ -927,19 +928,19 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
 
     // Label X: app3 has demand of 20 for label X. Hence app2 will loose
     // 4 (sparing AM) and 16 more from app1 till preemption limit is met.
-    verify(mDisp, times(16))
+    verify(eventHandler, times(16))
         .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
-    verify(mDisp, times(4))
+    verify(eventHandler, times(4))
         .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
 
     // Default Label:For a demand of 30, preempt from all low priority
     // apps of default label. 25 will be preempted as preemption limit is
     // met.
-    verify(mDisp, times(1))
+    verify(eventHandler, times(1))
         .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(8))));
-    verify(mDisp, times(2))
+    verify(eventHandler, times(2))
         .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(7))));
-    verify(mDisp, times(22))
+    verify(eventHandler, times(22))
         .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(6))));
   }
 
@@ -1017,10 +1018,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
 
     // For queue B, app3 and app4 were of lower priority. Hence take 8
     // containers from them by hitting the intraQueuePreemptionDemand of 20%.
-    verify(mDisp, times(1)).handle(argThat(
+    verify(eventHandler, times(1)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(4))));
-    verify(mDisp, times(7)).handle(argThat(
+    verify(eventHandler, times(7)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java
index 6e56cb5..eb9d218 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.junit.Before;
 import org.junit.Test;
@@ -85,7 +86,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering
     buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
     policy.editSchedule();
 
-    verify(mDisp, times(20)).handle(argThat(
+    verify(eventHandler, times(20)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
   }
@@ -133,7 +134,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering
     buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
     policy.editSchedule();
 
-    verify(mDisp, times(5)).handle(argThat(
+    verify(eventHandler, times(5)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
 
@@ -156,7 +157,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering
     buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
     policy.editSchedule();
 
-    verify(mDisp, times(15)).handle(argThat(
+    verify(eventHandler, times(15)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
   }
@@ -208,7 +209,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering
     buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
     policy.editSchedule();
 
-    verify(mDisp, times(20)).handle(argThat(
+    verify(eventHandler, times(20)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
   }
@@ -258,18 +259,18 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering
 
     // app3 is the younges and also over its user limit. 5 should be preempted
     // from app3 until it comes down to user3's user limit.
-    verify(mDisp, times(5)).handle(argThat(
+    verify(eventHandler, times(5)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
 
     // User1's app2 is its youngest. 19 should be preempted from app2, leaving
     // only the AM
-    verify(mDisp, times(19)).handle(argThat(
+    verify(eventHandler, times(19)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
 
     // Preempt the remaining resource from User1's oldest app1.
-    verify(mDisp, times(1)).handle(argThat(
+    verify(eventHandler, times(1)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java
index ba5f4d1..9fb08cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.junit.Before;
 import org.junit.Test;
@@ -33,7 +34,7 @@ import static org.mockito.Mockito.verify;
  */
 public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
     extends
-      ProportionalCapacityPreemptionPolicyMockFramework {
+    ProportionalCapacityPreemptionPolicyMockFramework {
   @Before
   public void setup() {
     super.setup();
@@ -94,7 +95,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
 
     // app2 needs more resource and its well under its user-limit. Hence preempt
     // resources from app1.
-    verify(mDisp, times(30)).handle(argThat(
+    verify(eventHandler, times(30)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
   }
@@ -149,7 +150,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
 
     // app2 needs more resource. Since app1,2 are from same user, there wont be
     // any preemption.
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
   }
@@ -206,7 +207,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
 
     // app2 needs more resource. Since app1,2 are from same user, there wont be
     // any preemption.
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
   }
@@ -262,7 +263,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
 
     // app2 needs more resource and its well under its user-limit. Hence preempt
     // resources from app1 even though its priority is more than app2.
-    verify(mDisp, times(30)).handle(argThat(
+    verify(eventHandler, times(30)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
   }
@@ -323,7 +324,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
 
     // app2/app4 needs more resource and its well under its user-limit. Hence
     // preempt resources from app3 (compare to app1, app3 has low priority).
-    verify(mDisp, times(9)).handle(argThat(
+    verify(eventHandler, times(9)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
   }
@@ -384,16 +385,16 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
 
     // app2/app4 needs more resource and its well under its user-limit. Hence
     // preempt resources from app3 (compare to app1, app3 has low priority).
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(4))));
   }
@@ -451,7 +452,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
 
     // app2/app4 needs more resource and its well under its user-limit. Hence
     // preempt resources from app1 (compare to app3, app1 has low priority).
-    verify(mDisp, times(9)).handle(argThat(
+    verify(eventHandler, times(9)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
   }
@@ -508,10 +509,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
 
     // app2/app4 needs more resource and its well under its user-limit. Hence
     // preempt resources from app1 (compare to app3, app1 has low priority).
-    verify(mDisp, times(4)).handle(argThat(
+    verify(eventHandler, times(4)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, times(4)).handle(argThat(
+    verify(eventHandler, times(4)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(5))));
   }
@@ -566,10 +567,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
 
     // app2 needs more resource and its well under its user-limit. Hence preempt
     // resources from app1.
-    verify(mDisp, times(9)).handle(argThat(
+    verify(eventHandler, times(9)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
   }
@@ -626,10 +627,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
 
     // app2/app4 needs more resource and its well under its user-limit. Hence
     // preempt resources from app1 (compare to app3, app1 has low priority).
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
   }
@@ -686,10 +687,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
 
     // app2/app4 needs more resource and its well under its user-limit. Hence
     // preempt resources from app1 (compare to app3, app1 has low priority).
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
   }
@@ -746,10 +747,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
 
     // app2/app4 needs more resource and its well under its user-limit. Hence
     // preempt resources from app1 (compare to app3, app1 has low priority).
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
   }
@@ -806,10 +807,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
     // app2 needs more resource and its well under its user-limit. Hence preempt
     // 3 resources (9GB) from app1. We will not preempt last container as it may
     // pull user's usage under its user-limit.
-    verify(mDisp, times(3)).handle(argThat(
+    verify(eventHandler, times(3)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
   }
@@ -868,10 +869,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
     // app2 needs more resource and its well under its user-limit. Hence preempt
     // 3 resources (9GB) from app1. We will not preempt last container as it may
     // pull user's usage under its user-limit.
-    verify(mDisp, times(3)).handle(argThat(
+    verify(eventHandler, times(3)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, times(0)).handle(argThat(
+    verify(eventHandler, times(0)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
 
@@ -892,7 +893,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
     // app2 has priority demand within same user 'user1'. However user1's used
     // is alredy under UL. Hence no preemption. We will still get 3 container
     // while asserting as it was aleady selected in earlier round.
-    verify(mDisp, times(3)).handle(argThat(
+    verify(eventHandler, times(3)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
   }
@@ -927,7 +928,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
 
     // app2 is right at its user limit and app1 needs one resource. Should
     // preempt 1 container.
-    verify(mDisp, times(1)).handle(argThat(
+    verify(eventHandler, times(1)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF.java
index 1f74487..940d11c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.junit.Before;
@@ -35,14 +36,14 @@ import static org.mockito.Mockito.when;
  */
 public class TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF
     extends
-      ProportionalCapacityPreemptionPolicyMockFramework {
+    ProportionalCapacityPreemptionPolicyMockFramework {
   @Before
   public void setup() {
     super.setup();
     conf.setBoolean(
         CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true);
-    rc = new DominantResourceCalculator();
-    when(cs.getResourceCalculator()).thenReturn(rc);
+    resourceCalculator = new DominantResourceCalculator();
+    when(cs.getResourceCalculator()).thenReturn(resourceCalculator);
     policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
   }
 
@@ -102,10 +103,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF
 
     // For queue B, app3 and app4 were of lower priority. Hence take 8
     // containers from them by hitting the intraQueuePreemptionDemand of 20%.
-    verify(mDisp, times(1)).handle(argThat(
+    verify(eventHandler, times(1)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(4))));
-    verify(mDisp, times(3)).handle(argThat(
+    verify(eventHandler, times(3)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
   }
@@ -165,13 +166,13 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF
 
     // For queue B, app3 and app4 were of lower priority. Hence take 4
     // containers.
-    verify(mDisp, times(9)).handle(argThat(
+    verify(eventHandler, times(9)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
-    verify(mDisp, times(4)).handle(argThat(
+    verify(eventHandler, times(4)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(4))));
-    verify(mDisp, times(4)).handle(argThat(
+    verify(eventHandler, times(4)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(5))));
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
index 964a230..d59c854 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java
index ec71e67..2e7b01e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.log4j.Level;
@@ -57,10 +58,10 @@ public class TestProportionalCapacityPreemptionPolicyPreemptToBalance
     policy.editSchedule();
 
     // I_A: A:30 B:35 C:35, preempt 5 from B and 15 from C to A
-    verify(mDisp, times(5)).handle(argThat(
+    verify(eventHandler, times(5)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
-    verify(mDisp, times(15)).handle(argThat(
+    verify(eventHandler, times(15)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
 
@@ -99,10 +100,10 @@ public class TestProportionalCapacityPreemptionPolicyPreemptToBalance
     policy.editSchedule();
 
     // I_A: A:33 B:33 C:33, preempt 7 from B and 17 from C to A
-    verify(mDisp, times(7)).handle(argThat(
+    verify(eventHandler, times(7)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
-    verify(mDisp, times(17)).handle(argThat(
+    verify(eventHandler, times(17)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
 
@@ -142,7 +143,7 @@ public class TestProportionalCapacityPreemptionPolicyPreemptToBalance
     policy.editSchedule();
 
     // I_A: A:15 B:42 C:43, preempt 7 from B and 17 from C to A
-    verify(mDisp, times(8)).handle(argThat(
+    verify(eventHandler, times(8)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
 
@@ -178,7 +179,7 @@ public class TestProportionalCapacityPreemptionPolicyPreemptToBalance
     policy.editSchedule();
 
     // 21 containers will be preempted here
-    verify(mDisp, times(21)).handle(argThat(
+    verify(eventHandler, times(21)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.
             IsPreemptionRequestFor(getAppAttemptId(2))));
 
@@ -240,7 +241,7 @@ public class TestProportionalCapacityPreemptionPolicyPreemptToBalance
     assertEquals(hasFifoSelector, true);
 
     // 21 containers will be preempted here
-    verify(mDisp, times(21)).handle(argThat(
+    verify(eventHandler, times(21)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.
             IsPreemptionRequestFor(getAppAttemptId(2))));
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/ContainerSpecification.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/ContainerSpecification.java
new file mode 100644
index 0000000..eb24c1a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/ContainerSpecification.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework.parseResourceFromString;
+
+public class ContainerSpecification {
+  Priority priority;
+  Resource resource = Resource.newInstance(0, 0);
+  Resource pendingResource = Resource.newInstance(0, 0);
+  NodeId nodeId;
+  String label;
+  int repeat;
+  boolean reserved;
+  String username;
+
+  private ContainerSpecification(Builder builder) {
+    if (builder.resource != null) {
+      Resources.addTo(resource, builder.resource);
+    }
+    if (builder.pendingResource != null) {
+      Resources.addTo(pendingResource, builder.pendingResource);
+    }
+    this.priority = builder.priority;
+    this.nodeId = builder.nodeId;
+    this.label = builder.label;
+    this.repeat = builder.repeat;
+    this.reserved = builder.reserved;
+    this.username = builder.username;
+  }
+
+  static class Builder {
+    private Priority priority;
+    private Resource resource;
+    private NodeId nodeId;
+    private String label;
+    private int repeat;
+    private boolean reserved;
+    private Resource pendingResource;
+    private String username = "user";
+
+    public static Builder create() {
+      return new Builder();
+    }
+
+    Builder withPriority(String value) {
+      this.priority = Priority.newInstance(Integer.valueOf(value));
+      return this;
+    }
+
+    Builder withResource(String value) {
+      this.resource = parseResourceFromString(value);
+      return this;
+    }
+
+    Builder withHostname(String value) {
+      this.nodeId = NodeId.newInstance(value, 1);
+      return this;
+    }
+
+    Builder withLabel(String value) {
+      this.label = value;
+      return this;
+    }
+
+    Builder withRepeat(String repeat) {
+      this.repeat = Integer.valueOf(repeat);
+      return this;
+    }
+
+    Builder withReserved(String value) {
+      this.reserved = Boolean.valueOf(value);
+      return this;
+    }
+
+    Builder withPendingResource(String value) {
+      this.pendingResource = parseResourceFromString(value);
+      return this;
+    }
+
+    Builder withUsername(String value) {
+      this.username = value;
+      return this;
+    }
+
+    public ContainerSpecification build() {
+      return new ContainerSpecification(this);
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplication.java
new file mode 100644
index 0000000..f40b4a0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplication.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class MockApplication {
+  private static final Logger LOG = LoggerFactory.getLogger(MockApplication.class);
+  private List<RMContainer> liveContainers = new ArrayList<>();
+  private List<RMContainer> reservedContainers = new ArrayList<>();
+
+  private ApplicationId appId;
+  final String containersConfig;
+  final String queueName;
+  ApplicationAttemptId appAttemptId;
+  FiCaSchedulerApp app;
+
+  MockApplication(int id, String containersConfig, String queueName) {
+    this.appId = ApplicationId.newInstance(0L, id);
+    this.containersConfig = containersConfig;
+    this.queueName = queueName;
+
+    //dynamic fields
+    this.appAttemptId = ApplicationAttemptId
+        .newInstance(appId, 1);
+    //this must be the last step
+    setupInitialMocking(queueName);
+  }
+
+  private void setupInitialMocking(String queueName) {
+    this.app = mock(FiCaSchedulerApp.class);
+    when(app.getAMResource(anyString()))
+        .thenReturn(Resources.createResource(0, 0));
+    when(app.getLiveContainers()).thenReturn(liveContainers);
+    when(app.getReservedContainers()).thenReturn(reservedContainers);
+    when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
+    when(app.getApplicationId()).thenReturn(appId);
+    when(app.getQueueName()).thenReturn(queueName);
+  }
+
+  private void addLiveContainer(RMContainer c) {
+    this.liveContainers.add(c);
+  }
+
+  private void addReservedContainer(RMContainer c) {
+    this.reservedContainers.add(c);
+  }
+
+  void addMockContainer(MockContainer mockContainer,
+      FiCaSchedulerNode schedulerNode, LeafQueue queue) {
+    int containerId = mockContainer.containerId;
+    ContainerSpecification containerSpec = mockContainer.containerSpec;
+
+    if (containerId == 1) {
+      when(app.getAMResource(containerSpec.label)).thenReturn(containerSpec.resource);
+      when(app.getAppAMNodePartitionName()).thenReturn(containerSpec.label);
+    }
+
+    if (containerSpec.reserved) {
+      addReservedContainer(mockContainer.rmContainerMock);
+    } else {
+      addLiveContainer(mockContainer.rmContainerMock);
+    }
+
+    // Add container to scheduler-node
+    addContainerToSchedulerNode(schedulerNode, mockContainer.rmContainerMock, containerSpec.reserved);
+
+    // If this is a non-exclusive allocation
+    String partition = null;
+    if (containerSpec.label.isEmpty()
+        && !(partition = schedulerNode.getPartition())
+        .isEmpty()) {
+      Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers = queue
+          .getIgnoreExclusivityRMContainers();
+      if (!ignoreExclusivityContainers.containsKey(partition)) {
+        ignoreExclusivityContainers.put(partition, new TreeSet<>());
+      }
+      ignoreExclusivityContainers.get(partition).add(mockContainer.rmContainerMock);
+      LOG.info("Added an ignore-exclusivity container to partition {}, new size is: {}", partition, ignoreExclusivityContainers.get(partition).size());
+
+    }
+    LOG.debug("add container to app=" + appAttemptId + " res=" + containerSpec.resource + " node="
+        + containerSpec.nodeId + " nodeLabelExpression=" + containerSpec.label + " partition="
+        + partition);
+  }
+
+  void addAggregatedContainerData(ContainerSpecification containerSpec,
+      Resource usedResources) {
+    // If app has 0 container, and it has only pending, still make sure to
+    // update label.
+    if (containerSpec.repeat == 0) {
+      when(app.getAppAMNodePartitionName()).thenReturn(containerSpec.label);
+    }
+
+    // Some more app specific aggregated data can be better filled here.
+    when(app.getPriority()).thenReturn(containerSpec.priority);
+    when(app.getUser()).thenReturn(containerSpec.username);
+    when(app.getCurrentConsumption()).thenReturn(usedResources);
+    when(app.getCurrentReservation())
+        .thenReturn(Resources.createResource(0, 0));
+
+    Map<String, Resource> pendingForDefaultPartition =
+        new HashMap<>();
+    // Add for default partition for now.
+    pendingForDefaultPartition.put(containerSpec.label, containerSpec.pendingResource);
+    when(app.getTotalPendingRequestsPerPartition())
+        .thenReturn(pendingForDefaultPartition);
+
+    // need to set pending resource in resource usage as well
+    ResourceUsage ru = Mockito.spy(new ResourceUsage());
+    ru.setUsed(containerSpec.label, usedResources);
+    when(ru.getCachedUsed(anyString())).thenReturn(usedResources);
+    when(app.getAppAttemptResourceUsage()).thenReturn(ru);
+    when(app.getSchedulingResourceUsage()).thenReturn(ru);
+  }
+
+  private void addContainerToSchedulerNode(SchedulerNode node, RMContainer container,
+      boolean isReserved) {
+    assert node != null;
+
+    if (isReserved) {
+      when(node.getReservedContainer()).thenReturn(container);
+    } else {
+      node.getCopiedListOfRunningContainers().add(container);
+      Resources.subtractFrom(node.getUnallocatedResource(),
+          container.getAllocatedResource());
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java
new file mode 100644
index 0000000..b168612
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+class MockApplications {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      MockApplications.class);
+
+  private String config;
+  private ResourceCalculator resourceCalculator;
+  private Map<String, CSQueue> nameToCSQueues;
+  private Map<String, Resource> partitionToResource;
+  private Map<NodeId, FiCaSchedulerNode> nodeIdToSchedulerNodes;
+  private Map<String, Set<String>> userMap = new HashMap<>();
+  private Map<String, Map<String, HashMap<String, ResourceUsage>>> userResourceUsagePerLabel = new HashMap<>();
+  private int id = 1;
+
+  MockApplications(String appsConfig,
+      ResourceCalculator resourceCalculator,
+      Map<String, CSQueue> nameToCSQueues,
+      Map<String, Resource> partitionToResource,
+      Map<NodeId, FiCaSchedulerNode> nodeIdToSchedulerNodes) {
+    this.config = appsConfig;
+    this.resourceCalculator = resourceCalculator;
+    this.nameToCSQueues = nameToCSQueues;
+    this.partitionToResource = partitionToResource;
+    this.nodeIdToSchedulerNodes = nodeIdToSchedulerNodes;
+    init();
+  }
+
+  /**
+   * Format is:
+   * <pre>
+   * queueName\t  // app1
+   * (priority,resource,host,expression,#repeat,reserved)
+   * (priority,resource,host,expression,#repeat,reserved);
+   * queueName\t  // app2
+   * </pre>
+   */
+  private void init() {
+    int mulp = -1;
+    for (String appConfig : config.split(";")) {
+      String[] appConfigComponents = appConfig.split("\t");
+      String queueName = appConfigComponents[0];
+      if (mulp <= 0 && appConfigComponents.length > 2 && appConfigComponents[2] != null) {
+        LOG.info("Mulp value: " + appConfigComponents[2]);
+        mulp = 100 / (Integer.parseInt(appConfigComponents[2]));
+      }
+
+      String containersConfig = appConfigComponents[1];
+      MockApplication mockApp = new MockApplication(id, containersConfig, queueName);
+      new MockContainers(mockApp, nameToCSQueues, nodeIdToSchedulerNodes);
+      add(mockApp);
+      id++;
+    }
+    setupUserResourceUsagePerLabel(resourceCalculator, mulp);
+  }
+
+  private void add(MockApplication mockApp) {
+    // add to LeafQueue
+    LeafQueue queue = (LeafQueue) nameToCSQueues.get(mockApp.queueName);
+    queue.getApplications().add(mockApp.app);
+    queue.getAllApplications().add(mockApp.app);
+    when(queue.getMinimumAllocation()).thenReturn(Resource.newInstance(1,1));
+    when(mockApp.app.getCSLeafQueue()).thenReturn(queue);
+
+    LOG.debug("Application mock: queue: " + mockApp.queueName + ", appId:" + mockApp.app);
+
+    Set<String> users = userMap.computeIfAbsent(mockApp.queueName, k -> new HashSet<>());
+    users.add(mockApp.app.getUser());
+
+    String label = mockApp.app.getAppAMNodePartitionName();
+
+    // Get label to queue
+    Map<String, HashMap<String, ResourceUsage>> userResourceUsagePerQueue =
+        userResourceUsagePerLabel.computeIfAbsent(label, k -> new HashMap<>());
+
+    // Get queue to user based resource map
+    Map<String, ResourceUsage> userResourceUsage =
+        userResourceUsagePerQueue.computeIfAbsent(mockApp.queueName, k -> new HashMap<>());
+
+    // Get user to its resource usage.
+    ResourceUsage usage = userResourceUsage.get(mockApp.app.getUser());
+    if (null == usage) {
+      usage = new ResourceUsage();
+      userResourceUsage.put(mockApp.app.getUser(), usage);
+    }
+
+    usage.incAMUsed(mockApp.app.getAMResource(label));
+    usage.incUsed(mockApp.app.getAppAttemptResourceUsage().getUsed(label));
+  }
+
+  private void setupUserResourceUsagePerLabel(ResourceCalculator resourceCalculator,
+      int mulp) {
+    for (String label : userResourceUsagePerLabel.keySet()) {
+      for (String queueName : userMap.keySet()) {
+        LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
+        // Currently we have user-limit test support only for default label.
+        Resource toResourcePartition = partitionToResource.get("");
+        Resource capacity = Resources.multiply(toResourcePartition,
+            queue.getQueueCapacities().getAbsoluteCapacity());
+        Set<String> users = userMap.get(queue.getQueueName());
+        //TODO: Refactor this test class to use queue path internally like
+        // CS does from now on
+        if (users == null) {
+          users = userMap.get(queue.getQueuePath());
+        }
+        when(queue.getAllUsers()).thenReturn(users);
+        Resource userLimit = calculateUserLimit(resourceCalculator, mulp, capacity,
+            users);
+        LOG.debug("Updating user-limit from mock: toResourcePartition="
+            + toResourcePartition + ", capacity=" + capacity
+            + ", users.size()=" + users.size() + ", userLimit= " + userLimit
+            + ",label= " + label + ",queueName= " + queueName);
+
+        setupUserToQueueSettings(label, queueName, queue, users, userLimit);
+      }
+    }
+  }
+
+  private void setupUserToQueueSettings(String label, String queueName,
+      LeafQueue queue, Set<String> users, Resource userLimit) {
+    Map<String, ResourceUsage> userResourceUsage =
+        userResourceUsagePerLabel.get(label).get(queueName);
+    for (String userName : users) {
+      User user = new User(userName);
+      if (userResourceUsage != null) {
+        user.setResourceUsage(userResourceUsage.get(userName));
+      }
+      when(queue.getUser(eq(userName))).thenReturn(user);
+      when(queue.getResourceLimitForAllUsers(eq(userName),
+          any(Resource.class), anyString(), any(SchedulingMode.class)))
+          .thenReturn(userLimit);
+    }
+  }
+
+  private Resource calculateUserLimit(ResourceCalculator resourceCalculator,
+      int mulp, Resource capacity, Set<String> users) {
+    if (mulp > 0) {
+      return Resources.divideAndCeil(resourceCalculator, capacity, mulp);
+    } else {
+      return Resources.divideAndCeil(resourceCalculator, capacity, users.size());
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockContainer.java
new file mode 100644
index 0000000..95f7391
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockContainer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class MockContainer {
+  ContainerSpecification containerSpec;
+  public int containerId;
+  private MockApplication mockApp;
+  RMContainerImpl rmContainerMock;
+
+  MockContainer(ContainerSpecification containerSpec,
+      int containerId, MockApplication mockApp) {
+    this.containerSpec = containerSpec;
+    this.containerId = containerId;
+    this.mockApp = mockApp;
+    this.rmContainerMock = mock(RMContainerImpl.class);
+    init();
+  }
+
+  private void init() {
+    Container c = mock(Container.class);
+    when(c.getResource()).thenReturn(containerSpec.resource);
+    when(c.getPriority()).thenReturn(containerSpec.priority);
+    SchedulerRequestKey sk = SchedulerRequestKey.extractFrom(c);
+    when(rmContainerMock.getAllocatedSchedulerKey()).thenReturn(sk);
+    when(rmContainerMock.getAllocatedNode()).thenReturn(containerSpec.nodeId);
+    when(rmContainerMock.getNodeLabelExpression()).thenReturn(containerSpec.label);
+    when(rmContainerMock.getAllocatedResource()).thenReturn(containerSpec.resource);
+    when(rmContainerMock.getContainer()).thenReturn(c);
+    when(rmContainerMock.getApplicationAttemptId()).thenReturn(mockApp.appAttemptId);
+    when(rmContainerMock.getQueueName()).thenReturn(mockApp.queueName);
+    final ContainerId cId = ContainerId.newContainerId(mockApp.appAttemptId,
+        containerId);
+    when(rmContainerMock.getContainerId()).thenReturn(cId);
+    doAnswer(new Answer<Integer>() {
+      @Override
+      public Integer answer(InvocationOnMock invocation) {
+        return cId.compareTo(
+            ((RMContainer) invocation.getArguments()[0]).getContainerId());
+      }
+    }).when(rmContainerMock).compareTo(any(RMContainer.class));
+
+    if (containerId == 1) {
+      when(rmContainerMock.isAMContainer()).thenReturn(true);
+    }
+
+    if (containerSpec.reserved) {
+      when(rmContainerMock.getReservedResource()).thenReturn(containerSpec.resource);
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockContainers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockContainers.java
new file mode 100644
index 0000000..5a4212d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockContainers.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.Map;
+
+class MockContainers {
+  private MockApplication mockApp;
+  private Map<String, CSQueue> nameToCSQueues;
+  private Map<NodeId, FiCaSchedulerNode> nodeIdToSchedulerNodes;
+
+  MockContainers(MockApplication mockApp,
+      Map<String, CSQueue> nameToCSQueues,
+      Map<NodeId, FiCaSchedulerNode> nodeIdToSchedulerNodes) {
+    this.mockApp = mockApp;
+    this.nameToCSQueues = nameToCSQueues;
+    this.nodeIdToSchedulerNodes = nodeIdToSchedulerNodes;
+    init();
+  }
+
+  private void init() {
+    String containersConfig = mockApp.containersConfig;
+    int start = containersConfig.indexOf("=") + 1;
+    int end = -1;
+    int containerId = 1;
+
+    while (start < containersConfig.length()) {
+      while (start < containersConfig.length()
+          && containersConfig.charAt(start) != '(') {
+        start++;
+      }
+      if (start >= containersConfig.length()) {
+        throw new IllegalArgumentException(
+            "Error in containers specification, line=" + containersConfig);
+      }
+      end = start + 1;
+      while (end < containersConfig.length()
+          && containersConfig.charAt(end) != ')') {
+        end++;
+      }
+      if (end >= containersConfig.length()) {
+        throw new IllegalArgumentException(
+            "Error in containers specification, line=" + containersConfig);
+      }
+
+      // now we found start/end, get container values
+      String[] values = containersConfig.substring(start + 1, end).split(",");
+      if (values.length < 6 || values.length > 8) {
+        throw new IllegalArgumentException("Format to define container is:"
+            + "(priority,resource,host,label expression,repeat,reserved, pending)");
+      }
+
+      ContainerSpecification.Builder builder = ContainerSpecification.Builder.create()
+          .withPriority(values[0])
+          .withResource(values[1])
+          .withHostname(values[2])
+          .withLabel(values[3])
+          .withRepeat(values[4])
+          .withReserved(values[5]);
+
+      if (values.length >= 7) {
+        builder.withPendingResource(values[6]);
+      }
+      if (values.length == 8) {
+        builder.withUsername(values[7]);
+      }
+      ContainerSpecification containerSpec = builder.build();
+
+      Resource usedResources = Resource.newInstance(0, 0);
+      for (int i = 0; i < containerSpec.repeat; i++) {
+        Resources.addTo(usedResources, containerSpec.resource);
+        MockContainer mockContainer = new MockContainer(containerSpec, containerId, mockApp);
+        FiCaSchedulerNode schedulerNode =
+            nodeIdToSchedulerNodes.get(containerSpec.nodeId);
+        LeafQueue queue = (LeafQueue) nameToCSQueues.get(mockApp.queueName);
+        mockApp.addMockContainer(mockContainer, schedulerNode, queue);
+        containerId++;
+      }
+      mockApp.addAggregatedContainerData(containerSpec, usedResources);
+      start = end + 1;
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockNodeLabelsManager.java
new file mode 100644
index 0000000..2be8a78
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockNodeLabelsManager.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework.parseResourceFromString;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+class MockNodeLabelsManager {
+  private static final Logger LOG = LoggerFactory.getLogger(MockNodeLabelsManager.class);
+
+  private String config;
+  private final Resource clusterResource;
+  private final Map<String, Resource> partitionToResource;
+  private final RMNodeLabelsManager nodeLabelsManager;
+
+  MockNodeLabelsManager(String config,
+      RMNodeLabelsManager nodeLabelsManager,
+      Map<String, Resource> partitionToResource) throws IOException {
+    this.config = config;
+    this.partitionToResource = partitionToResource;
+    this.clusterResource = Resources.createResource(0);
+    this.nodeLabelsManager = nodeLabelsManager;
+    this.parse();
+  }
+
+  /**
+   * Format is:
+   * <pre>
+   * partition0=total_resource,exclusivity;
+   * partition1=total_resource,exclusivity;
+   * ...
+   * </pre>
+   */
+  private void parse() throws IOException {
+    String[] partitionConfigArr = config.split(";");
+    for (String p : partitionConfigArr) {
+      String partitionName = p.substring(0, p.indexOf("="));
+      Resource res = parseResourceFromString(p.substring(p.indexOf("=") + 1,
+          p.indexOf(",")));
+      boolean exclusivity =
+          Boolean.valueOf(p.substring(p.indexOf(",") + 1));
+      when(nodeLabelsManager.getResourceByLabel(eq(partitionName), any(Resource.class)))
+          .thenReturn(res);
+      when(nodeLabelsManager.isExclusiveNodeLabel(eq(partitionName))).thenReturn(exclusivity);
+
+      // add to partition to resource
+      partitionToResource.put(partitionName, res);
+      LOG.debug("add partition=" + partitionName + " totalRes=" + res
+          + " exclusivity=" + exclusivity);
+      Resources.addTo(clusterResource, res);
+    }
+
+    when(nodeLabelsManager.getClusterNodeLabelNames()).thenReturn(
+        partitionToResource.keySet());
+  }
+
+  public Resource getClusterResource() {
+    return clusterResource;
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockQueueHierarchy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockQueueHierarchy.java
new file mode 100644
index 0000000..ae4ff5a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockQueueHierarchy.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework.parseResourceFromString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class MockQueueHierarchy {
+  private static final Logger LOG = LoggerFactory.getLogger(MockQueueHierarchy.class);
+  private final String ROOT = CapacitySchedulerConfiguration.ROOT;
+  private final ParentQueue rootQueue;
+  private String config;
+  private final CapacityScheduler cs;
+  private CapacitySchedulerConfiguration conf;
+  private final ResourceCalculator resourceCalculator;
+  private final Map<String, CSQueue> nameToCSQueues;
+  private final Map<String, Resource> partitionToResource;
+
+  MockQueueHierarchy(String config,
+      CapacityScheduler cs,
+      CapacitySchedulerConfiguration conf,
+      ResourceCalculator resourceCalculator,
+      Map<String, Resource> partitionToResource) {
+    this.config = config;
+    this.cs = cs;
+    this.conf = conf;
+    this.resourceCalculator = resourceCalculator;
+    this.nameToCSQueues = new HashMap<>();
+    this.partitionToResource = partitionToResource;
+    this.rootQueue = init();
+  }
+
+  public ParentQueue getRootQueue() {
+    return rootQueue;
+  }
+
+  Map<String, CSQueue> getNameToCSQueues() {
+    return nameToCSQueues;
+  }
+
+  /**
+   * Format is:
+   * <pre>
+   * root (<partition-name-1>=[guaranteed max used pending (reserved)],<partition-name-2>=..);
+   * -A(...);
+   * --A1(...);
+   * --A2(...);
+   * -B...
+   * </pre>
+   * ";" splits queues, and there should no empty lines, no extra spaces
+   *
+   * For each queue, it has configurations to specify capacities (to each
+   * partition), format is:
+   * <pre>
+   * -<queueName> (<labelName1>=[guaranteed max used pending], \
+   *               <labelName2>=[guaranteed max used pending])
+   *              {key1=value1,key2=value2};  // Additional configs
+   * </pre>
+   */
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  private ParentQueue init() {
+    String[] queueExprArray = config.split(";");
+    ParentQueue rootQueue = null;
+    for (int idx = 0; idx < queueExprArray.length; idx++) {
+      String q = queueExprArray[idx];
+      CSQueue queue;
+
+      // Initialize queue
+      if (isParent(queueExprArray, idx)) {
+        ParentQueue parentQueue = mock(ParentQueue.class);
+        queue = parentQueue;
+        List<CSQueue> children = new ArrayList<>();
+        when(parentQueue.getChildQueues()).thenReturn(children);
+        QueueOrderingPolicy policy = mock(QueueOrderingPolicy.class);
+        when(policy.getConfigName()).thenReturn(
+            CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
+        when(parentQueue.getQueueOrderingPolicy()).thenReturn(policy);
+      } else {
+        LeafQueue leafQueue = mock(LeafQueue.class);
+        final TreeSet<FiCaSchedulerApp> apps = new TreeSet<>(
+            new Comparator<FiCaSchedulerApp>() {
+              @Override
+              public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
+                if (a1.getPriority() != null
+                    && !a1.getPriority().equals(a2.getPriority())) {
+                  return a1.getPriority().compareTo(a2.getPriority());
+                }
+
+                return a1.getApplicationId()
+                    .compareTo(a2.getApplicationId());
+              }
+            });
+        when(leafQueue.getApplications()).thenReturn(apps);
+        when(leafQueue.getAllApplications()).thenReturn(apps);
+        OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
+        String opName = conf.get(CapacitySchedulerConfiguration.PREFIX
+            + CapacitySchedulerConfiguration.ROOT + "." + getQueueName(q)
+            + ".ordering-policy", "fifo");
+        if (opName.equals("fair")) {
+          so = Mockito.spy(new FairOrderingPolicy<>());
+        }
+        when(so.getPreemptionIterator()).thenAnswer(new Answer() {
+          public Object answer(InvocationOnMock invocation) {
+            return apps.descendingIterator();
+          }
+        });
+        when(leafQueue.getOrderingPolicy()).thenReturn(so);
+
+        Map<String, TreeSet<RMContainer>> ignorePartitionContainers =
+            new HashMap<>();
+        when(leafQueue.getIgnoreExclusivityRMContainers()).thenReturn(
+            ignorePartitionContainers);
+        queue = leafQueue;
+      }
+
+      ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+      when(queue.getReadLock()).thenReturn(lock.readLock());
+      setupQueue(queue, q, queueExprArray, idx);
+      if (queue.getQueuePath().equals(ROOT)) {
+        rootQueue = (ParentQueue) queue;
+      }
+    }
+    return rootQueue;
+  }
+
+  private void setupQueue(CSQueue queue, String q, String[] queueExprArray,
+      int idx) {
+    LOG.debug("*** Setup queue, source=" + q);
+    String queuePath = null;
+
+    int myLevel = getLevel(q);
+    if (0 == myLevel) {
+      // It's root
+      when(queue.getQueuePath()).thenReturn(ROOT);
+      queuePath = ROOT;
+    }
+
+    String queueName = getQueueName(q);
+    when(queue.getQueueName()).thenReturn(queueName);
+
+    // Setup parent queue, and add myself to parentQueue.children-list
+    ParentQueue parentQueue = getParentQueue(queueExprArray, idx, myLevel);
+    if (null != parentQueue) {
+      when(queue.getParent()).thenReturn(parentQueue);
+      parentQueue.getChildQueues().add(queue);
+
+      // Setup my path
+      queuePath = parentQueue.getQueuePath() + "." + queueName;
+    }
+    when(queue.getQueuePath()).thenReturn(queuePath);
+
+    QueueCapacities qc = new QueueCapacities(0 == myLevel);
+    ResourceUsage ru = new ResourceUsage();
+    QueueResourceQuotas qr  = new QueueResourceQuotas();
+
+    when(queue.getQueueCapacities()).thenReturn(qc);
+    when(queue.getQueueResourceUsage()).thenReturn(ru);
+    when(queue.getQueueResourceQuotas()).thenReturn(qr);
+
+    LOG.debug("Setup queue, short name=" + queue.getQueueName() + " path="
+        + queue.getQueuePath());
+    LOG.debug("Parent=" + (parentQueue == null ? "null" : parentQueue
+        .getQueuePath()));
+
+    // Setup other fields like used resource, guaranteed resource, etc.
+    String capacitySettingStr = q.substring(q.indexOf("(") + 1, q.indexOf(")"));
+    for (String s : capacitySettingStr.split(",")) {
+      String partitionName = s.substring(0, s.indexOf("="));
+      String[] values = s.substring(s.indexOf("[") + 1, s.indexOf("]")).split(" ");
+      // Add a small epsilon to capacities to avoid truncate when doing
+      // Resources.multiply
+      float epsilon = 1e-6f;
+      Resource toResourcePerPartition = partitionToResource.get(partitionName);
+      float absGuaranteed = Resources.divide(resourceCalculator, toResourcePerPartition,
+          parseResourceFromString(values[0].trim()), toResourcePerPartition)
+          + epsilon;
+      float absMax = Resources.divide(resourceCalculator, toResourcePerPartition,
+          parseResourceFromString(values[1].trim()), toResourcePerPartition)
+          + epsilon;
+      float absUsed = Resources.divide(resourceCalculator, toResourcePerPartition,
+          parseResourceFromString(values[2].trim()), toResourcePerPartition)
+          + epsilon;
+      float used = Resources.divide(resourceCalculator, toResourcePerPartition,
+          parseResourceFromString(values[2].trim()),
+          parseResourceFromString(values[0].trim())) + epsilon;
+      Resource pending = parseResourceFromString(values[3].trim());
+      qc.setAbsoluteCapacity(partitionName, absGuaranteed);
+      qc.setAbsoluteMaximumCapacity(partitionName, absMax);
+      qc.setAbsoluteUsedCapacity(partitionName, absUsed);
+      qc.setUsedCapacity(partitionName, used);
+      qr.setEffectiveMaxResource(parseResourceFromString(values[1].trim()));
+      qr.setEffectiveMinResource(parseResourceFromString(values[0].trim()));
+      qr.setEffectiveMaxResource(partitionName,
+          parseResourceFromString(values[1].trim()));
+      qr.setEffectiveMinResource(partitionName,
+          parseResourceFromString(values[0].trim()));
+      when(queue.getUsedCapacity()).thenReturn(used);
+      when(queue.getEffectiveCapacity(partitionName))
+          .thenReturn(parseResourceFromString(values[0].trim()));
+      when(queue.getEffectiveMaxCapacity(partitionName))
+          .thenReturn(parseResourceFromString(values[1].trim()));
+      ru.setPending(partitionName, pending);
+      // Setup reserved resource if it contained by input config
+      Resource reserved = Resources.none();
+      if(values.length == 5) {
+        reserved = parseResourceFromString(values[4].trim());
+        ru.setReserved(partitionName, reserved);
+      }
+      if (!isParent(queueExprArray, idx)) {
+        LeafQueue lq = (LeafQueue) queue;
+        when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
+            isA(String.class), eq(false))).thenReturn(pending);
+        when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
+            isA(String.class), eq(true))).thenReturn(
+            Resources.subtract(pending, reserved));
+      }
+      ru.setUsed(partitionName, parseResourceFromString(values[2].trim()));
+
+      LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
+          + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
+          + ",abs_used" + absUsed + ",pending_resource=" + pending
+          + ", reserved_resource=" + reserved + "]");
+    }
+
+    // Setup preemption disabled
+    when(queue.getPreemptionDisabled()).thenReturn(
+        conf.getPreemptionDisabled(queuePath, false));
+
+    // Setup other queue configurations
+    Map<String, String> otherConfigs = getOtherConfigurations(
+        queueExprArray[idx]);
+    if (otherConfigs.containsKey("priority")) {
+      when(queue.getPriority()).thenReturn(
+          Priority.newInstance(Integer.valueOf(otherConfigs.get("priority"))));
+    } else {
+      // set queue's priority to 0 by default
+      when(queue.getPriority()).thenReturn(Priority.newInstance(0));
+    }
+
+    // Setup disable preemption of queues
+    if (otherConfigs.containsKey("disable_preemption")) {
+      when(queue.getPreemptionDisabled()).thenReturn(
+          Boolean.valueOf(otherConfigs.get("disable_preemption")));
+    }
+
+    //TODO: Refactor this test class to use queue path internally like CS
+    // does from now on
+    nameToCSQueues.put(queuePath, queue);
+    nameToCSQueues.put(queueName, queue);
+    when(cs.getQueue(eq(queuePath))).thenReturn(queue);
+    when(cs.getQueue(eq(queueName))).thenReturn(queue);
+    when(cs.normalizeQueueName(eq(queuePath))).thenReturn(queuePath);
+    when(cs.normalizeQueueName(eq(queueName))).thenReturn(queuePath);
+  }
+
+  /**
+   * Get additional queue's configurations
+   * @param queueExpr queue expr
+   * @return maps of configs
+   */
+  private Map<String, String> getOtherConfigurations(String queueExpr) {
+    if (queueExpr.contains("{")) {
+      int left = queueExpr.indexOf('{');
+      int right = queueExpr.indexOf('}');
+
+      if (right > left) {
+        Map<String, String> configs = new HashMap<>();
+
+        String subStr = queueExpr.substring(left + 1, right);
+        for (String kv : subStr.split(",")) {
+          if (kv.contains("=")) {
+            String key = kv.substring(0, kv.indexOf("="));
+            String value = kv.substring(kv.indexOf("=") + 1);
+            configs.put(key, value);
+          }
+        }
+
+        return configs;
+      }
+    }
+
+    return Collections.emptyMap();
+  }
+
+  private String getQueueName(String q) {
+    int idx = 0;
+    // find first != '-' char
+    while (idx < q.length() && q.charAt(idx) == '-') {
+      idx++;
+    }
+    if (idx == q.length()) {
+      throw new IllegalArgumentException("illegal input:" + q);
+    }
+    // name = after '-' and before '('
+    String name = q.substring(idx, q.indexOf('('));
+    if (name.isEmpty()) {
+      throw new IllegalArgumentException("queue name shouldn't be empty:" + q);
+    }
+    if (name.contains(".")) {
+      throw new IllegalArgumentException("queue name shouldn't contain '.':"
+          + name);
+    }
+    return name;
+  }
+
+  private ParentQueue getParentQueue(String[] queueExprArray, int idx, int myLevel) {
+    idx--;
+    while (idx >= 0) {
+      int level = getLevel(queueExprArray[idx]);
+      if (level < myLevel) {
+        String parentQueueName = getQueueName(queueExprArray[idx]);
+        return (ParentQueue) nameToCSQueues.get(parentQueueName);
+      }
+      idx--;
+    }
+
+    return null;
+  }
+
+  /**
+   * Get if a queue is ParentQueue
+   */
+  private boolean isParent(String[] queues, int idx) {
+    int myLevel = getLevel(queues[idx]);
+    idx++;
+    while (idx < queues.length && getLevel(queues[idx]) == myLevel) {
+      idx++;
+    }
+    if (idx >= queues.length || getLevel(queues[idx]) < myLevel) {
+      // It's a LeafQueue
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  /**
+   * Level of a queue is how many "-" at beginning, root's level is 0
+   */
+  private int getLevel(String q) {
+    int level = 0; // level = how many "-" at beginning
+    while (level < q.length() && q.charAt(level) == '-') {
+      level++;
+    }
+    return level;
+  }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockSchedulerNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockSchedulerNodes.java
new file mode 100644
index 0000000..cdea35a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockSchedulerNodes.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework.parseResourceFromString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class MockSchedulerNodes {
+  private static final Logger LOG = LoggerFactory.getLogger(MockSchedulerNodes.class);
+  private String config;
+  private Map<NodeId, FiCaSchedulerNode> nodeIdToSchedulerNodes = new HashMap<>();
+
+  MockSchedulerNodes(String config) {
+    this.config = config;
+    init();
+  }
+
+  /**
+   * Format is:
+   * host1=partition[ res=resource];
+   * host2=partition[ res=resource];
+   */
+  private void init() {
+    String[] nodesConfigStrArray = config.split(";");
+    for (String p : nodesConfigStrArray) {
+      String[] arr = p.split(" ");
+
+      NodeId nodeId = NodeId.newInstance(arr[0].substring(0, arr[0].indexOf("=")), 1);
+      String partition = arr[0].substring(arr[0].indexOf("=") + 1);
+
+      FiCaSchedulerNode sn = mock(FiCaSchedulerNode.class);
+      when(sn.getNodeID()).thenReturn(nodeId);
+      when(sn.getPartition()).thenReturn(partition);
+
+      Resource totalRes = Resources.createResource(0);
+      if (arr.length > 1) {
+        String res = arr[1];
+        if (res.contains("res=")) {
+          String resString = res.substring(
+              res.indexOf("res=") + "res=".length());
+          totalRes = parseResourceFromString(resString);
+        }
+      }
+      when(sn.getTotalResource()).thenReturn(totalRes);
+      when(sn.getUnallocatedResource()).thenReturn(Resources.clone(totalRes));
+
+      // TODO, add settings of killable resources when necessary
+      when(sn.getTotalKillableResources()).thenReturn(Resources.none());
+
+      List<RMContainer> liveContainers = new ArrayList<>();
+      when(sn.getCopiedListOfRunningContainers()).thenReturn(liveContainers);
+
+      nodeIdToSchedulerNodes.put(nodeId, sn);
+
+      LOG.debug("add scheduler node, id=" + nodeId + ", partition=" + partition);
+    }
+  }
+
+  Map<NodeId, FiCaSchedulerNode> getNodeIdToSchedulerNodes() {
+    return nodeIdToSchedulerNodes;
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/ProportionalCapacityPreemptionPolicyMockFramework.java
new file mode 100644
index 0000000..024ec86
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/ProportionalCapacityPreemptionPolicyMockFramework.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework;
+
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicyForNodePartitions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.mockito.ArgumentMatcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.event.Event;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ProportionalCapacityPreemptionPolicyMockFramework {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TestProportionalCapacityPreemptionPolicyForNodePartitions.class);
+  private static final double ALLOWED_CAPACITY_DELTA = 1e-3;
+
+  private Map<String, CSQueue> nameToCSQueues;
+  private Map<String, Resource> partitionToResource;
+  private Map<NodeId, FiCaSchedulerNode> nodeIdToSchedulerNodes;
+  private RMNodeLabelsManager nodeLabelsManager;
+  public RMContext rmContext;
+
+  public ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
+  public Clock mClock;
+  public CapacitySchedulerConfiguration conf;
+  public CapacityScheduler cs;
+  @SuppressWarnings("rawtypes")
+  public EventHandler<Event> eventHandler;
+  public ProportionalCapacityPreemptionPolicy policy;
+  private Resource clusterResource;
+  // Initialize resource map
+  public Map<String, ResourceInformation> riMap = new HashMap<>();
+
+  private void resetResourceInformationMap() {
+    // Initialize mandatory resources
+    ResourceInformation memory = ResourceInformation.newInstance(
+        ResourceInformation.MEMORY_MB.getName(),
+        ResourceInformation.MEMORY_MB.getUnits(),
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+    ResourceInformation vcores = ResourceInformation.newInstance(
+        ResourceInformation.VCORES.getName(),
+        ResourceInformation.VCORES.getUnits(),
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+    riMap.put(ResourceInformation.MEMORY_URI, memory);
+    riMap.put(ResourceInformation.VCORES_URI, vcores);
+
+    ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Before
+  public void setup() {
+    resetResourceInformationMap();
+
+    org.apache.log4j.Logger.getRootLogger().setLevel(
+        org.apache.log4j.Level.DEBUG);
+
+    conf = new CapacitySchedulerConfiguration(new Configuration(false));
+    conf.setLong(
+        CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 10000);
+    conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
+        3000);
+    // report "ideal" preempt
+    conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
+        (float) 1.0);
+    conf.setFloat(
+        CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
+        (float) 1.0);
+
+    mClock = mock(Clock.class);
+    cs = mock(CapacityScheduler.class);
+    when(cs.getResourceCalculator()).thenReturn(resourceCalculator);
+    when(cs.getPreemptionManager()).thenReturn(new PreemptionManager());
+    when(cs.getConfiguration()).thenReturn(conf);
+
+    nodeLabelsManager = mock(RMNodeLabelsManager.class);
+    eventHandler = mock(EventHandler.class);
+
+    rmContext = mock(RMContext.class);
+    when(rmContext.getNodeLabelManager()).thenReturn(nodeLabelsManager);
+    Dispatcher dispatcher = mock(Dispatcher.class);
+    when(rmContext.getDispatcher()).thenReturn(dispatcher);
+    when(dispatcher.getEventHandler()).thenReturn(eventHandler);
+    when(cs.getRMContext()).thenReturn(rmContext);
+
+    partitionToResource = new HashMap<>();
+    nodeIdToSchedulerNodes = new HashMap<>();
+    nameToCSQueues = new HashMap<>();
+    clusterResource = Resource.newInstance(0, 0);
+  }
+
+  @After
+  public void cleanup() {
+    resetResourceInformationMap();
+  }
+
+  public void buildEnv(String labelsConfig, String nodesConfig,
+      String queuesConfig, String appsConfig) throws IOException {
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, false);
+  }
+
+  public void buildEnv(String labelsConfig, String nodesConfig,
+      String queuesConfig, String appsConfig,
+      boolean useDominantResourceCalculator) throws IOException {
+    if (useDominantResourceCalculator) {
+      when(cs.getResourceCalculator()).thenReturn(
+          new DominantResourceCalculator());
+    }
+
+    MockNodeLabelsManager mockNodeLabelsManager =
+        new MockNodeLabelsManager(labelsConfig,
+            nodeLabelsManager, partitionToResource);
+    clusterResource = mockNodeLabelsManager.getClusterResource();
+
+    MockSchedulerNodes mockSchedulerNodes =
+        new MockSchedulerNodes(nodesConfig);
+    nodeIdToSchedulerNodes = mockSchedulerNodes.getNodeIdToSchedulerNodes();
+    addNodeIdDataToScheduler();
+
+    ParentQueue root = parseQueueConfig(queuesConfig);
+
+    when(cs.getRootQueue()).thenReturn(root);
+    when(cs.getClusterResource()).thenReturn(clusterResource);
+    new MockApplications(appsConfig, resourceCalculator, nameToCSQueues,
+        partitionToResource, nodeIdToSchedulerNodes);
+
+    policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
+  }
+
+  private ParentQueue parseQueueConfig(String queuesConfig) {
+    MockQueueHierarchy mockQueueHierarchy =
+        new MockQueueHierarchy(queuesConfig, cs, conf, resourceCalculator,
+            partitionToResource);
+    this.nameToCSQueues = mockQueueHierarchy.getNameToCSQueues();
+    return mockQueueHierarchy.getRootQueue();
+  }
+
+  private void addNodeIdDataToScheduler() {
+    for (NodeId nodeId : nodeIdToSchedulerNodes.keySet()) {
+      when(cs.getSchedulerNode(nodeId)).thenReturn(
+          nodeIdToSchedulerNodes.get(nodeId));
+    }
+    List<FiCaSchedulerNode> allNodes = new ArrayList<>(
+        nodeIdToSchedulerNodes.values());
+    when(cs.getAllNodes()).thenReturn(allNodes);
+  }
+
+  protected void updateQueueConfig(String queuesConfig) {
+    ParentQueue root = parseQueueConfig(queuesConfig);
+    when(cs.getRootQueue()).thenReturn(root);
+  }
+
+  //TODO this can probably be replaced with some parser logic already implemented somewhere
+  static Resource parseResourceFromString(String resString) {
+    String[] resource = resString.split(":");
+    Resource res;
+    if (resource.length == 1) {
+      res = Resources.createResource(Integer.valueOf(resource[0]));
+    } else {
+      res = Resources.createResource(Integer.valueOf(resource[0]),
+          Integer.valueOf(resource[1]));
+      if (resource.length > 2) {
+        // Using the same order of resources from ResourceUtils, set resource
+        // information.
+        ResourceInformation[] storedResourceInfo = ResourceUtils
+            .getResourceTypesArray();
+        for (int i = 2; i < resource.length; i++) {
+          res.setResourceInformation(storedResourceInfo[i].getName(),
+              ResourceInformation.newInstance(storedResourceInfo[i].getName(),
+                  storedResourceInfo[i].getUnits(),
+                  Integer.valueOf(resource[i])));
+        }
+      }
+    }
+    return res;
+  }
+
+  public ApplicationAttemptId getAppAttemptId(int id) {
+    ApplicationId appId = ApplicationId.newInstance(0L, id);
+    return ApplicationAttemptId.newInstance(appId, 1);
+  }
+
+  protected void checkContainerNodesInApp(FiCaSchedulerApp app,
+      int expectedContainersNumber, String host) {
+    NodeId nodeId = NodeId.newInstance(host, 1);
+    int num = 0;
+    for (RMContainer c : app.getLiveContainers()) {
+      if (c.getAllocatedNode().equals(nodeId)) {
+        num++;
+      }
+    }
+    for (RMContainer c : app.getReservedContainers()) {
+      if (c.getAllocatedNode().equals(nodeId)) {
+        num++;
+      }
+    }
+    Assert.assertEquals(expectedContainersNumber, num);
+  }
+
+  public FiCaSchedulerApp getApp(String queueName, int appId) {
+    for (FiCaSchedulerApp app : ((LeafQueue) cs.getQueue(queueName))
+        .getApplications()) {
+      if (app.getApplicationId().getId() == appId) {
+        return app;
+      }
+    }
+    return null;
+  }
+
+  protected void checkAbsCapacities(CSQueue queue, String partition,
+      float guaranteed, float max, float used) {
+    QueueCapacities qc = queue.getQueueCapacities();
+    Assert.assertEquals(guaranteed, qc.getAbsoluteCapacity(partition),
+        ALLOWED_CAPACITY_DELTA);
+    Assert.assertEquals(max, qc.getAbsoluteMaximumCapacity(partition),
+        ALLOWED_CAPACITY_DELTA);
+    Assert.assertEquals(used, qc.getAbsoluteUsedCapacity(partition),
+        ALLOWED_CAPACITY_DELTA);
+  }
+
+  protected void checkPendingResource(CSQueue queue, String partition,
+      int pending) {
+    ResourceUsage ru = queue.getQueueResourceUsage();
+    Assert.assertEquals(pending, ru.getPending(partition).getMemorySize());
+  }
+
+  protected void checkPriority(CSQueue queue, int expectedPriority) {
+    Assert.assertEquals(expectedPriority, queue.getPriority().getPriority());
+  }
+
+  protected void checkReservedResource(CSQueue queue, String partition,
+      int reserved) {
+    ResourceUsage ru = queue.getQueueResourceUsage();
+    Assert.assertEquals(reserved, ru.getReserved(partition).getMemorySize());
+  }
+
+  public static class IsPreemptionRequestForQueueAndNode
+      implements ArgumentMatcher<ContainerPreemptEvent> {
+    private final ApplicationAttemptId appAttId;
+    private final String queueName;
+    private final NodeId nodeId;
+
+    public IsPreemptionRequestForQueueAndNode(ApplicationAttemptId appAttId,
+        String queueName, NodeId nodeId) {
+      this.appAttId = appAttId;
+      this.queueName = queueName;
+      this.nodeId = nodeId;
+    }
+    @Override
+    public boolean matches(ContainerPreemptEvent cpe) {
+      return appAttId.equals(cpe.getAppId())
+          && queueName.equals(cpe.getContainer().getQueueName())
+          && nodeId.equals(cpe.getContainer().getAllocatedNode());
+    }
+    @Override
+    public String toString() {
+      return appAttId.toString();
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org