You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2020/12/01 22:40:34 UTC
[hadoop] branch trunk updated: YARN-10278: CapacityScheduler test
framework ProportionalCapacityPreemptionPolicyMockFramework. Contributed by
Szilard Nemeth (snemeth)
This is an automated email from the ASF dual-hosted git repository.
epayne pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new fa773a8 YARN-10278: CapacityScheduler test framework ProportionalCapacityPreemptionPolicyMockFramework. Contributed by Szilard Nemeth (snemeth)
fa773a8 is described below
commit fa773a832652ac6f2a24f93fc2bfc2ae251bfe1d
Author: Eric Payne <ep...@apache.org>
AuthorDate: Tue Dec 1 22:06:47 2020 +0000
YARN-10278: CapacityScheduler test framework ProportionalCapacityPreemptionPolicyMockFramework. Contributed by Szilard Nemeth (snemeth)
---
...ionalCapacityPreemptionPolicyMockFramework.java | 985 ---------------------
.../TestPreemptionForQueueWithPriorities.java | 67 +-
...lCapacityPreemptionPolicyForNodePartitions.java | 71 +-
...acityPreemptionPolicyForReservedContainers.java | 43 +-
...lCapacityPreemptionPolicyInterQueueWithDRF.java | 15 +-
...ortionalCapacityPreemptionPolicyIntraQueue.java | 85 +-
...cityPreemptionPolicyIntraQueueFairOrdering.java | 15 +-
...apacityPreemptionPolicyIntraQueueUserLimit.java | 55 +-
...lCapacityPreemptionPolicyIntraQueueWithDRF.java | 17 +-
...ionalCapacityPreemptionPolicyMockFramework.java | 1 +
...alCapacityPreemptionPolicyPreemptToBalance.java | 15 +-
.../mockframework/ContainerSpecification.java | 109 +++
.../capacity/mockframework/MockApplication.java | 165 ++++
.../capacity/mockframework/MockApplications.java | 182 ++++
.../capacity/mockframework/MockContainer.java | 78 ++
.../capacity/mockframework/MockContainers.java | 104 +++
.../mockframework/MockNodeLabelsManager.java | 85 ++
.../capacity/mockframework/MockQueueHierarchy.java | 397 +++++++++
.../capacity/mockframework/MockSchedulerNodes.java | 90 ++
...ionalCapacityPreemptionPolicyMockFramework.java | 318 +++++++
20 files changed, 1725 insertions(+), 1172 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
deleted file mode 100644
index 87f2f1f..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
+++ /dev/null
@@ -1,985 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceInformation;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
-import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceUtils;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.hadoop.yarn.event.Event;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isA;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class ProportionalCapacityPreemptionPolicyMockFramework {
- static final Logger LOG = LoggerFactory.getLogger(
- TestProportionalCapacityPreemptionPolicyForNodePartitions.class);
- final String ROOT = CapacitySchedulerConfiguration.ROOT;
-
- Map<String, CSQueue> nameToCSQueues = null;
- Map<String, Resource> partitionToResource = null;
- Map<NodeId, FiCaSchedulerNode> nodeIdToSchedulerNodes = null;
- RMNodeLabelsManager nlm = null;
- RMContext rmContext = null;
-
- ResourceCalculator rc = new DefaultResourceCalculator();
- Clock mClock = null;
- CapacitySchedulerConfiguration conf = null;
- CapacityScheduler cs = null;
- @SuppressWarnings("rawtypes")
- EventHandler<Event> mDisp = null;
- ProportionalCapacityPreemptionPolicy policy = null;
- Resource clusterResource = null;
- // Initialize resource map
- Map<String, ResourceInformation> riMap = new HashMap<>();
-
- private void resetResourceInformationMap() {
- // Initialize mandatory resources
- ResourceInformation memory = ResourceInformation.newInstance(
- ResourceInformation.MEMORY_MB.getName(),
- ResourceInformation.MEMORY_MB.getUnits(),
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
- ResourceInformation vcores = ResourceInformation.newInstance(
- ResourceInformation.VCORES.getName(),
- ResourceInformation.VCORES.getUnits(),
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
- riMap.put(ResourceInformation.MEMORY_URI, memory);
- riMap.put(ResourceInformation.VCORES_URI, vcores);
-
- ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
- }
-
- @SuppressWarnings("unchecked")
- @Before
- public void setup() {
- resetResourceInformationMap();
-
- org.apache.log4j.Logger.getRootLogger().setLevel(
- org.apache.log4j.Level.DEBUG);
-
- conf = new CapacitySchedulerConfiguration(new Configuration(false));
- conf.setLong(
- CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 10000);
- conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
- 3000);
- // report "ideal" preempt
- conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
- (float) 1.0);
- conf.setFloat(
- CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
- (float) 1.0);
-
- mClock = mock(Clock.class);
- cs = mock(CapacityScheduler.class);
- when(cs.getResourceCalculator()).thenReturn(rc);
- when(cs.getPreemptionManager()).thenReturn(new PreemptionManager());
- when(cs.getConfiguration()).thenReturn(conf);
-
- nlm = mock(RMNodeLabelsManager.class);
- mDisp = mock(EventHandler.class);
-
- rmContext = mock(RMContext.class);
- when(rmContext.getNodeLabelManager()).thenReturn(nlm);
- Dispatcher disp = mock(Dispatcher.class);
- when(rmContext.getDispatcher()).thenReturn(disp);
- when(disp.getEventHandler()).thenReturn(mDisp);
- when(cs.getRMContext()).thenReturn(rmContext);
-
- partitionToResource = new HashMap<>();
- nodeIdToSchedulerNodes = new HashMap<>();
- nameToCSQueues = new HashMap<>();
- clusterResource = Resource.newInstance(0, 0);
- }
-
- @After
- public void cleanup() {
- resetResourceInformationMap();
- }
-
- public void buildEnv(String labelsConfig, String nodesConfig,
- String queuesConfig, String appsConfig) throws IOException {
- buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, false);
- }
-
- public void buildEnv(String labelsConfig, String nodesConfig,
- String queuesConfig, String appsConfig,
- boolean useDominantResourceCalculator) throws IOException {
- if (useDominantResourceCalculator) {
- when(cs.getResourceCalculator()).thenReturn(
- new DominantResourceCalculator());
- }
- mockNodeLabelsManager(labelsConfig);
- mockSchedulerNodes(nodesConfig);
- for (NodeId nodeId : nodeIdToSchedulerNodes.keySet()) {
- when(cs.getSchedulerNode(nodeId)).thenReturn(
- nodeIdToSchedulerNodes.get(nodeId));
- }
- List<FiCaSchedulerNode> allNodes = new ArrayList<>(
- nodeIdToSchedulerNodes.values());
- when(cs.getAllNodes()).thenReturn(allNodes);
- ParentQueue root = mockQueueHierarchy(queuesConfig);
- when(cs.getRootQueue()).thenReturn(root);
- when(cs.getClusterResource()).thenReturn(clusterResource);
- mockApplications(appsConfig);
-
- policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs,
- mClock);
- }
-
- public void updateQueueConfig(String queuesConfig) {
- ParentQueue root = mockQueueHierarchy(queuesConfig);
- when(cs.getRootQueue()).thenReturn(root);
- }
-
- private void mockContainers(String containersConfig, FiCaSchedulerApp app,
- ApplicationAttemptId attemptId, String queueName,
- List<RMContainer> reservedContainers, List<RMContainer> liveContainers) {
- int containerId = 1;
- int start = containersConfig.indexOf("=") + 1;
- int end = -1;
-
- Resource used = Resource.newInstance(0, 0);
- Resource pending = Resource.newInstance(0, 0);
- Priority pri = Priority.newInstance(0);
-
- while (start < containersConfig.length()) {
- while (start < containersConfig.length()
- && containersConfig.charAt(start) != '(') {
- start++;
- }
- if (start >= containersConfig.length()) {
- throw new IllegalArgumentException(
- "Error containers specification, line=" + containersConfig);
- }
- end = start + 1;
- while (end < containersConfig.length()
- && containersConfig.charAt(end) != ')') {
- end++;
- }
- if (end >= containersConfig.length()) {
- throw new IllegalArgumentException(
- "Error containers specification, line=" + containersConfig);
- }
-
- // now we found start/end, get container values
- String[] values = containersConfig.substring(start + 1, end).split(",");
- if (values.length < 6 || values.length > 8) {
- throw new IllegalArgumentException("Format to define container is:"
- + "(priority,resource,host,expression,repeat,reserved, pending)");
- }
- pri.setPriority(Integer.valueOf(values[0]));
- Resource res = parseResourceFromString(values[1]);
- NodeId host = NodeId.newInstance(values[2], 1);
- String label = values[3];
- String userName = "user";
- int repeat = Integer.valueOf(values[4]);
- boolean reserved = Boolean.valueOf(values[5]);
- if (values.length >= 7) {
- Resources.addTo(pending, parseResourceFromString(values[6]));
- }
- if (values.length == 8) {
- userName = values[7];
- }
-
- for (int i = 0; i < repeat; i++) {
- Container c = mock(Container.class);
- Resources.addTo(used, res);
- when(c.getResource()).thenReturn(res);
- when(c.getPriority()).thenReturn(pri);
- SchedulerRequestKey sk = SchedulerRequestKey.extractFrom(c);
- RMContainerImpl rmc = mock(RMContainerImpl.class);
- when(rmc.getAllocatedSchedulerKey()).thenReturn(sk);
- when(rmc.getAllocatedNode()).thenReturn(host);
- when(rmc.getNodeLabelExpression()).thenReturn(label);
- when(rmc.getAllocatedResource()).thenReturn(res);
- when(rmc.getContainer()).thenReturn(c);
- when(rmc.getApplicationAttemptId()).thenReturn(attemptId);
- when(rmc.getQueueName()).thenReturn(queueName);
- final ContainerId cId = ContainerId.newContainerId(attemptId,
- containerId);
- when(rmc.getContainerId()).thenReturn(cId);
- doAnswer(new Answer<Integer>() {
- @Override
- public Integer answer(InvocationOnMock invocation) throws Throwable {
- return cId.compareTo(
- ((RMContainer) invocation.getArguments()[0]).getContainerId());
- }
- }).when(rmc).compareTo(any(RMContainer.class));
-
- if (containerId == 1) {
- when(rmc.isAMContainer()).thenReturn(true);
- when(app.getAMResource(label)).thenReturn(res);
- when(app.getAppAMNodePartitionName()).thenReturn(label);
- }
-
- if (reserved) {
- reservedContainers.add(rmc);
- when(rmc.getReservedResource()).thenReturn(res);
- } else {
- liveContainers.add(rmc);
- }
-
- // Add container to scheduler-node
- addContainerToSchedulerNode(host, rmc, reserved);
-
- // If this is a non-exclusive allocation
- String partition = null;
- if (label.isEmpty()
- && !(partition = nodeIdToSchedulerNodes.get(host).getPartition())
- .isEmpty()) {
- LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
- Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers = queue
- .getIgnoreExclusivityRMContainers();
- if (!ignoreExclusivityContainers.containsKey(partition)) {
- ignoreExclusivityContainers.put(partition,
- new TreeSet<RMContainer>());
- }
- ignoreExclusivityContainers.get(partition).add(rmc);
- }
- LOG.debug("add container to app=" + attemptId + " res=" + res + " node="
- + host + " nodeLabelExpression=" + label + " partition="
- + partition);
-
- containerId++;
- }
-
- // If app has 0 container, and it has only pending, still make sure to
- // update label.
- if (repeat == 0) {
- when(app.getAppAMNodePartitionName()).thenReturn(label);
- }
-
- // Some more app specific aggregated data can be better filled here.
- when(app.getPriority()).thenReturn(pri);
- when(app.getUser()).thenReturn(userName);
- when(app.getCurrentConsumption()).thenReturn(used);
- when(app.getCurrentReservation())
- .thenReturn(Resources.createResource(0, 0));
-
- Map<String, Resource> pendingForDefaultPartition =
- new HashMap<String, Resource>();
- // Add for default partition for now.
- pendingForDefaultPartition.put(label, pending);
- when(app.getTotalPendingRequestsPerPartition())
- .thenReturn(pendingForDefaultPartition);
-
- // need to set pending resource in resource usage as well
- ResourceUsage ru = Mockito.spy(new ResourceUsage());
- ru.setUsed(label, used);
- when(ru.getCachedUsed(anyString())).thenReturn(used);
- when(app.getAppAttemptResourceUsage()).thenReturn(ru);
- when(app.getSchedulingResourceUsage()).thenReturn(ru);
-
- start = end + 1;
- }
- }
-
- /**
- * Format is:
- * <pre>
- * queueName\t // app1
- * (priority,resource,host,expression,#repeat,reserved)
- * (priority,resource,host,expression,#repeat,reserved);
- * queueName\t // app2
- * </pre>
- */
- private void mockApplications(String appsConfig) {
- int id = 1;
- HashMap<String, HashSet<String>> userMap = new HashMap<String, HashSet<String>>();
- HashMap<String, HashMap<String, HashMap<String, ResourceUsage>>> userResourceUsagePerLabel = new HashMap<>();
- LeafQueue queue = null;
- int mulp = -1;
- for (String a : appsConfig.split(";")) {
- String[] strs = a.split("\t");
- String queueName = strs[0];
- if (mulp <= 0 && strs.length > 2 && strs[2] != null) {
- mulp = 100 / (new Integer(strs[2]).intValue());
- }
-
- // get containers
- List<RMContainer> liveContainers = new ArrayList<RMContainer>();
- List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
- ApplicationId appId = ApplicationId.newInstance(0L, id);
- ApplicationAttemptId appAttemptId = ApplicationAttemptId
- .newInstance(appId, 1);
-
- FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
- when(app.getAMResource(anyString()))
- .thenReturn(Resources.createResource(0, 0));
- mockContainers(strs[1], app, appAttemptId, queueName, reservedContainers,
- liveContainers);
- LOG.debug("Application mock: queue: " + queueName + ", appId:" + appId);
-
- when(app.getLiveContainers()).thenReturn(liveContainers);
- when(app.getReservedContainers()).thenReturn(reservedContainers);
- when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
- when(app.getApplicationId()).thenReturn(appId);
- when(app.getQueueName()).thenReturn(queueName);
-
- // add to LeafQueue
- queue = (LeafQueue) nameToCSQueues.get(queueName);
- queue.getApplications().add(app);
- queue.getAllApplications().add(app);
- when(queue.getMinimumAllocation())
- .thenReturn(Resource.newInstance(1,1));
- when(app.getCSLeafQueue()).thenReturn(queue);
-
- HashSet<String> users = userMap.get(queueName);
- if (null == users) {
- users = new HashSet<String>();
- userMap.put(queueName, users);
- }
- users.add(app.getUser());
-
- String label = app.getAppAMNodePartitionName();
-
- // Get label to queue
- HashMap<String, HashMap<String, ResourceUsage>> userResourceUsagePerQueue = userResourceUsagePerLabel
- .get(label);
- if (null == userResourceUsagePerQueue) {
- userResourceUsagePerQueue = new HashMap<>();
- userResourceUsagePerLabel.put(label, userResourceUsagePerQueue);
- }
-
- // Get queue to user based resource map
- HashMap<String, ResourceUsage> userResourceUsage = userResourceUsagePerQueue
- .get(queueName);
- if (null == userResourceUsage) {
- userResourceUsage = new HashMap<>();
- userResourceUsagePerQueue.put(queueName, userResourceUsage);
- }
-
- // Get user to its resource usage.
- ResourceUsage usage = userResourceUsage.get(app.getUser());
- if (null == usage) {
- usage = new ResourceUsage();
- userResourceUsage.put(app.getUser(), usage);
- }
-
- usage.incAMUsed(app.getAMResource(label));
- usage.incUsed(app.getAppAttemptResourceUsage().getUsed(label));
- id++;
- }
-
- for (String label : userResourceUsagePerLabel.keySet()) {
- for (String queueName : userMap.keySet()) {
- queue = (LeafQueue) nameToCSQueues.get(queueName);
- // Currently we have user-limit test support only for default label.
- Resource totResoucePerPartition = partitionToResource.get("");
- Resource capacity = Resources.multiply(totResoucePerPartition,
- queue.getQueueCapacities().getAbsoluteCapacity());
- HashSet<String> users = userMap.get(queue.getQueueName());
- //TODO: Refactor this test class to use queue path internally like
- // CS does from now on
- if (users == null) {
- users = userMap.get(queue.getQueuePath());
- }
- when(queue.getAllUsers()).thenReturn(users);
- Resource userLimit;
- if (mulp > 0) {
- userLimit = Resources.divideAndCeil(rc, capacity, mulp);
- } else {
- userLimit = Resources.divideAndCeil(rc, capacity,
- users.size());
- }
- LOG.debug("Updating user-limit from mock: totResoucePerPartition="
- + totResoucePerPartition + ", capacity=" + capacity
- + ", users.size()=" + users.size() + ", userlimit= " + userLimit
- + ",label= " + label + ",queueName= " + queueName);
-
- HashMap<String, ResourceUsage> userResourceUsage = userResourceUsagePerLabel
- .get(label).get(queueName);
- for (String userName : users) {
- User user = new User(userName);
- if (userResourceUsage != null) {
- user.setResourceUsage(userResourceUsage.get(userName));
- }
- when(queue.getUser(eq(userName))).thenReturn(user);
- when(queue.getResourceLimitForAllUsers(eq(userName),
- any(Resource.class), anyString(), any(SchedulingMode.class)))
- .thenReturn(userLimit);
- }
- }
- }
- }
-
- private void addContainerToSchedulerNode(NodeId nodeId, RMContainer container,
- boolean isReserved) {
- SchedulerNode node = nodeIdToSchedulerNodes.get(nodeId);
- assert node != null;
-
- if (isReserved) {
- when(node.getReservedContainer()).thenReturn(container);
- } else {
- node.getCopiedListOfRunningContainers().add(container);
- Resources.subtractFrom(node.getUnallocatedResource(),
- container.getAllocatedResource());
- }
- }
-
- /**
- * Format is:
- * host1=partition[ res=resource];
- * host2=partition[ res=resource];
- */
- private void mockSchedulerNodes(String schedulerNodesConfigStr)
- throws IOException {
- String[] nodesConfigStrArray = schedulerNodesConfigStr.split(";");
- for (String p : nodesConfigStrArray) {
- String[] arr = p.split(" ");
-
- NodeId nodeId = NodeId.newInstance(arr[0].substring(0, arr[0].indexOf("=")), 1);
- String partition = arr[0].substring(arr[0].indexOf("=") + 1, arr[0].length());
-
- FiCaSchedulerNode sn = mock(FiCaSchedulerNode.class);
- when(sn.getNodeID()).thenReturn(nodeId);
- when(sn.getPartition()).thenReturn(partition);
-
- Resource totalRes = Resources.createResource(0);
- if (arr.length > 1) {
- String res = arr[1];
- if (res.contains("res=")) {
- String resSring = res.substring(
- res.indexOf("res=") + "res=".length());
- totalRes = parseResourceFromString(resSring);
- }
- }
- when(sn.getTotalResource()).thenReturn(totalRes);
- when(sn.getUnallocatedResource()).thenReturn(Resources.clone(totalRes));
-
- // TODO, add settings of killable resources when necessary
- when(sn.getTotalKillableResources()).thenReturn(Resources.none());
-
- List<RMContainer> liveContainers = new ArrayList<>();
- when(sn.getCopiedListOfRunningContainers()).thenReturn(liveContainers);
-
- nodeIdToSchedulerNodes.put(nodeId, sn);
-
- LOG.debug("add scheduler node, id=" + nodeId + ", partition=" + partition);
- }
- }
-
- /**
- * Format is:
- * <pre>
- * partition0=total_resource,exclusivity;
- * partition1=total_resource,exclusivity;
- * ...
- * </pre>
- */
- private void mockNodeLabelsManager(String nodeLabelsConfigStr) throws IOException {
- String[] partitionConfigArr = nodeLabelsConfigStr.split(";");
- clusterResource = Resources.createResource(0);
- for (String p : partitionConfigArr) {
- String partitionName = p.substring(0, p.indexOf("="));
- Resource res = parseResourceFromString(p.substring(p.indexOf("=") + 1,
- p.indexOf(",")));
- boolean exclusivity =
- Boolean.valueOf(p.substring(p.indexOf(",") + 1, p.length()));
- when(nlm.getResourceByLabel(eq(partitionName), any(Resource.class)))
- .thenReturn(res);
- when(nlm.isExclusiveNodeLabel(eq(partitionName))).thenReturn(exclusivity);
-
- // add to partition to resource
- partitionToResource.put(partitionName, res);
- LOG.debug("add partition=" + partitionName + " totalRes=" + res
- + " exclusivity=" + exclusivity);
- Resources.addTo(clusterResource, res);
- }
-
- when(nlm.getClusterNodeLabelNames()).thenReturn(
- partitionToResource.keySet());
- }
-
- private Resource parseResourceFromString(String p) {
- String[] resource = p.split(":");
- Resource res;
- if (resource.length == 1) {
- res = Resources.createResource(Integer.valueOf(resource[0]));
- } else {
- res = Resources.createResource(Integer.valueOf(resource[0]),
- Integer.valueOf(resource[1]));
- if (resource.length > 2) {
- // Using the same order of resources from ResourceUtils, set resource
- // informations.
- ResourceInformation[] storedResourceInfo = ResourceUtils
- .getResourceTypesArray();
- for (int i = 2; i < resource.length; i++) {
- res.setResourceInformation(storedResourceInfo[i].getName(),
- ResourceInformation.newInstance(storedResourceInfo[i].getName(),
- storedResourceInfo[i].getUnits(),
- Integer.valueOf(resource[i])));
- }
- }
- }
- return res;
- }
-
- /**
- * Format is:
- * <pre>
- * root (<partition-name-1>=[guaranteed max used pending (reserved)],<partition-name-2>=..);
- * -A(...);
- * --A1(...);
- * --A2(...);
- * -B...
- * </pre>
- * ";" splits queues, and there should no empty lines, no extra spaces
- *
- * For each queue, it has configurations to specify capacities (to each
- * partition), format is:
- * <pre>
- * -<queueName> (<labelName1>=[guaranteed max used pending], \
- * <labelName2>=[guaranteed max used pending])
- * {key1=value1,key2=value2}; // Additional configs
- * </pre>
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- private ParentQueue mockQueueHierarchy(String queueExprs) {
- String[] queueExprArray = queueExprs.split(";");
- ParentQueue rootQueue = null;
- for (int idx = 0; idx < queueExprArray.length; idx++) {
- String q = queueExprArray[idx];
- CSQueue queue;
-
- // Initialize queue
- if (isParent(queueExprArray, idx)) {
- ParentQueue parentQueue = mock(ParentQueue.class);
- queue = parentQueue;
- List<CSQueue> children = new ArrayList<CSQueue>();
- when(parentQueue.getChildQueues()).thenReturn(children);
- QueueOrderingPolicy policy = mock(QueueOrderingPolicy.class);
- when(policy.getConfigName()).thenReturn(
- CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
- when(parentQueue.getQueueOrderingPolicy()).thenReturn(policy);
- } else {
- LeafQueue leafQueue = mock(LeafQueue.class);
- final TreeSet<FiCaSchedulerApp> apps = new TreeSet<>(
- new Comparator<FiCaSchedulerApp>() {
- @Override
- public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
- if (a1.getPriority() != null
- && !a1.getPriority().equals(a2.getPriority())) {
- return a1.getPriority().compareTo(a2.getPriority());
- }
-
- int res = a1.getApplicationId()
- .compareTo(a2.getApplicationId());
- return res;
- }
- });
- when(leafQueue.getApplications()).thenReturn(apps);
- when(leafQueue.getAllApplications()).thenReturn(apps);
- OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
- String opName = conf.get(CapacitySchedulerConfiguration.PREFIX
- + CapacitySchedulerConfiguration.ROOT + "." + getQueueName(q)
- + ".ordering-policy", "fifo");
- if (opName.equals("fair")) {
- so = Mockito.spy(new FairOrderingPolicy<FiCaSchedulerApp>());
- }
- when(so.getPreemptionIterator()).thenAnswer(new Answer() {
- public Object answer(InvocationOnMock invocation) {
- return apps.descendingIterator();
- }
- });
- when(leafQueue.getOrderingPolicy()).thenReturn(so);
-
- Map<String, TreeSet<RMContainer>> ignorePartitionContainers =
- new HashMap<>();
- when(leafQueue.getIgnoreExclusivityRMContainers()).thenReturn(
- ignorePartitionContainers);
- queue = leafQueue;
- }
-
- ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- when(queue.getReadLock()).thenReturn(lock.readLock());
- setupQueue(queue, q, queueExprArray, idx);
- if (queue.getQueuePath().equals(ROOT)) {
- rootQueue = (ParentQueue) queue;
- }
- }
- return rootQueue;
- }
-
- private void setupQueue(CSQueue queue, String q, String[] queueExprArray,
- int idx) {
- LOG.debug("*** Setup queue, source=" + q);
- String queuePath = null;
-
- int myLevel = getLevel(q);
- if (0 == myLevel) {
- // It's root
- when(queue.getQueuePath()).thenReturn(ROOT);
- queuePath = ROOT;
- }
-
- String queueName = getQueueName(q);
- when(queue.getQueueName()).thenReturn(queueName);
-
- // Setup parent queue, and add myself to parentQueue.children-list
- ParentQueue parentQueue = getParentQueue(queueExprArray, idx, myLevel);
- if (null != parentQueue) {
- when(queue.getParent()).thenReturn(parentQueue);
- parentQueue.getChildQueues().add(queue);
-
- // Setup my path
- queuePath = parentQueue.getQueuePath() + "." + queueName;
- }
- when(queue.getQueuePath()).thenReturn(queuePath);
-
- QueueCapacities qc = new QueueCapacities(0 == myLevel);
- ResourceUsage ru = new ResourceUsage();
- QueueResourceQuotas qr = new QueueResourceQuotas();
-
- when(queue.getQueueCapacities()).thenReturn(qc);
- when(queue.getQueueResourceUsage()).thenReturn(ru);
- when(queue.getQueueResourceQuotas()).thenReturn(qr);
-
- LOG.debug("Setup queue, short name=" + queue.getQueueName() + " path="
- + queue.getQueuePath());
- LOG.debug("Parent=" + (parentQueue == null ? "null" : parentQueue
- .getQueuePath()));
-
- // Setup other fields like used resource, guaranteed resource, etc.
- String capacitySettingStr = q.substring(q.indexOf("(") + 1, q.indexOf(")"));
- for (String s : capacitySettingStr.split(",")) {
- String partitionName = s.substring(0, s.indexOf("="));
- String[] values = s.substring(s.indexOf("[") + 1, s.indexOf("]")).split(" ");
- // Add a small epsilon to capacities to avoid truncate when doing
- // Resources.multiply
- float epsilon = 1e-6f;
- Resource totResoucePerPartition = partitionToResource.get(partitionName);
- float absGuaranteed = Resources.divide(rc, totResoucePerPartition,
- parseResourceFromString(values[0].trim()), totResoucePerPartition)
- + epsilon;
- float absMax = Resources.divide(rc, totResoucePerPartition,
- parseResourceFromString(values[1].trim()), totResoucePerPartition)
- + epsilon;
- float absUsed = Resources.divide(rc, totResoucePerPartition,
- parseResourceFromString(values[2].trim()), totResoucePerPartition)
- + epsilon;
- float used = Resources.divide(rc, totResoucePerPartition,
- parseResourceFromString(values[2].trim()),
- parseResourceFromString(values[0].trim())) + epsilon;
- Resource pending = parseResourceFromString(values[3].trim());
- qc.setAbsoluteCapacity(partitionName, absGuaranteed);
- qc.setAbsoluteMaximumCapacity(partitionName, absMax);
- qc.setAbsoluteUsedCapacity(partitionName, absUsed);
- qc.setUsedCapacity(partitionName, used);
- qr.setEffectiveMaxResource(parseResourceFromString(values[1].trim()));
- qr.setEffectiveMinResource(parseResourceFromString(values[0].trim()));
- qr.setEffectiveMaxResource(partitionName,
- parseResourceFromString(values[1].trim()));
- qr.setEffectiveMinResource(partitionName,
- parseResourceFromString(values[0].trim()));
- when(queue.getUsedCapacity()).thenReturn(used);
- when(queue.getEffectiveCapacity(partitionName))
- .thenReturn(parseResourceFromString(values[0].trim()));
- when(queue.getEffectiveMaxCapacity(partitionName))
- .thenReturn(parseResourceFromString(values[1].trim()));
- ru.setPending(partitionName, pending);
- // Setup reserved resource if it contained by input config
- Resource reserved = Resources.none();
- if(values.length == 5) {
- reserved = parseResourceFromString(values[4].trim());
- ru.setReserved(partitionName, reserved);
- }
- if (!isParent(queueExprArray, idx)) {
- LeafQueue lq = (LeafQueue) queue;
- when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
- isA(String.class), eq(false))).thenReturn(pending);
- when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
- isA(String.class), eq(true))).thenReturn(
- Resources.subtract(pending, reserved));
- }
- ru.setUsed(partitionName, parseResourceFromString(values[2].trim()));
-
- LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
- + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
- + ",abs_used" + absUsed + ",pending_resource=" + pending
- + ", reserved_resource=" + reserved + "]");
- }
-
- // Setup preemption disabled
- when(queue.getPreemptionDisabled()).thenReturn(
- conf.getPreemptionDisabled(queuePath, false));
-
- // Setup other queue configurations
- Map<String, String> otherConfigs = getOtherConfigurations(
- queueExprArray[idx]);
- if (otherConfigs.containsKey("priority")) {
- when(queue.getPriority()).thenReturn(
- Priority.newInstance(Integer.valueOf(otherConfigs.get("priority"))));
- } else {
- // set queue's priority to 0 by default
- when(queue.getPriority()).thenReturn(Priority.newInstance(0));
- }
-
- // Setup disable preemption of queues
- if (otherConfigs.containsKey("disable_preemption")) {
- when(queue.getPreemptionDisabled()).thenReturn(
- Boolean.valueOf(otherConfigs.get("disable_preemption")));
- }
-
- //TODO: Refactor this test class to use queue path internally like CS
- // does from now on
- nameToCSQueues.put(queuePath, queue);
- nameToCSQueues.put(queueName, queue);
- when(cs.getQueue(eq(queuePath))).thenReturn(queue);
- when(cs.getQueue(eq(queueName))).thenReturn(queue);
- when(cs.normalizeQueueName(eq(queuePath))).thenReturn(queuePath);
- when(cs.normalizeQueueName(eq(queueName))).thenReturn(queuePath);
- }
-
- /**
- * Get additional queue's configurations
- * @param queueExpr queue expr
- * @return maps of configs
- */
- private Map<String, String> getOtherConfigurations(String queueExpr) {
- if (queueExpr.contains("{")) {
- int left = queueExpr.indexOf('{');
- int right = queueExpr.indexOf('}');
-
- if (right > left) {
- Map<String, String> configs = new HashMap<>();
-
- String subStr = queueExpr.substring(left + 1, right);
- for (String kv : subStr.split(",")) {
- if (kv.contains("=")) {
- String key = kv.substring(0, kv.indexOf("="));
- String value = kv.substring(kv.indexOf("=") + 1);
- configs.put(key, value);
- }
- }
-
- return configs;
- }
- }
-
- return Collections.emptyMap();
- }
-
- /**
- * Level of a queue is how many "-" at beginning, root's level is 0
- */
- private int getLevel(String q) {
- int level = 0; // level = how many "-" at beginning
- while (level < q.length() && q.charAt(level) == '-') {
- level++;
- }
- return level;
- }
-
- private String getQueueName(String q) {
- int idx = 0;
- // find first != '-' char
- while (idx < q.length() && q.charAt(idx) == '-') {
- idx++;
- }
- if (idx == q.length()) {
- throw new IllegalArgumentException("illegal input:" + q);
- }
- // name = after '-' and before '('
- String name = q.substring(idx, q.indexOf('('));
- if (name.isEmpty()) {
- throw new IllegalArgumentException("queue name shouldn't be empty:" + q);
- }
- if (name.contains(".")) {
- throw new IllegalArgumentException("queue name shouldn't contain '.':"
- + name);
- }
- return name;
- }
-
- private ParentQueue getParentQueue(String[] queueExprArray, int idx, int myLevel) {
- idx--;
- while (idx >= 0) {
- int level = getLevel(queueExprArray[idx]);
- if (level < myLevel) {
- String parentQueuName = getQueueName(queueExprArray[idx]);
- return (ParentQueue) nameToCSQueues.get(parentQueuName);
- }
- idx--;
- }
-
- return null;
- }
-
- /**
- * Get if a queue is ParentQueue
- */
- private boolean isParent(String[] queues, int idx) {
- int myLevel = getLevel(queues[idx]);
- idx++;
- while (idx < queues.length && getLevel(queues[idx]) == myLevel) {
- idx++;
- }
- if (idx >= queues.length || getLevel(queues[idx]) < myLevel) {
- // It's a LeafQueue
- return false;
- } else {
- return true;
- }
- }
-
- public ApplicationAttemptId getAppAttemptId(int id) {
- ApplicationId appId = ApplicationId.newInstance(0L, id);
- ApplicationAttemptId appAttemptId =
- ApplicationAttemptId.newInstance(appId, 1);
- return appAttemptId;
- }
-
- public void checkContainerNodesInApp(FiCaSchedulerApp app,
- int expectedContainersNumber, String host) {
- NodeId nodeId = NodeId.newInstance(host, 1);
- int num = 0;
- for (RMContainer c : app.getLiveContainers()) {
- if (c.getAllocatedNode().equals(nodeId)) {
- num++;
- }
- }
- for (RMContainer c : app.getReservedContainers()) {
- if (c.getAllocatedNode().equals(nodeId)) {
- num++;
- }
- }
- Assert.assertEquals(expectedContainersNumber, num);
- }
-
- public FiCaSchedulerApp getApp(String queueName, int appId) {
- for (FiCaSchedulerApp app : ((LeafQueue) cs.getQueue(queueName))
- .getApplications()) {
- if (app.getApplicationId().getId() == appId) {
- return app;
- }
- }
- return null;
- }
-
- public void checkAbsCapacities(CSQueue queue, String partition,
- float guaranteed, float max, float used) {
- QueueCapacities qc = queue.getQueueCapacities();
- Assert.assertEquals(guaranteed, qc.getAbsoluteCapacity(partition), 1e-3);
- Assert.assertEquals(max, qc.getAbsoluteMaximumCapacity(partition), 1e-3);
- Assert.assertEquals(used, qc.getAbsoluteUsedCapacity(partition), 1e-3);
- }
-
- public void checkPendingResource(CSQueue queue, String partition, int pending) {
- ResourceUsage ru = queue.getQueueResourceUsage();
- Assert.assertEquals(pending, ru.getPending(partition).getMemorySize());
- }
-
- public void checkPriority(CSQueue queue, int expectedPriority) {
- Assert.assertEquals(expectedPriority, queue.getPriority().getPriority());
- }
-
- public void checkReservedResource(CSQueue queue, String partition, int reserved) {
- ResourceUsage ru = queue.getQueueResourceUsage();
- Assert.assertEquals(reserved, ru.getReserved(partition).getMemorySize());
- }
-
- static class IsPreemptionRequestForQueueAndNode
- implements ArgumentMatcher<ContainerPreemptEvent> {
- private final ApplicationAttemptId appAttId;
- private final String queueName;
- private final NodeId nodeId;
-
- IsPreemptionRequestForQueueAndNode(ApplicationAttemptId appAttId,
- String queueName, NodeId nodeId) {
- this.appAttId = appAttId;
- this.queueName = queueName;
- this.nodeId = nodeId;
- }
- @Override
- public boolean matches(ContainerPreemptEvent cpe) {
- return appAttId.equals(cpe.getAppId())
- && queueName.equals(cpe.getContainer().getQueueName())
- && nodeId.equals(cpe.getContainer().getAllocatedNode());
- }
- @Override
- public String toString() {
- return appAttId.toString();
- }
- }
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
index f9fda99..1f82617 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
@@ -39,7 +40,7 @@ public class TestPreemptionForQueueWithPriorities
extends ProportionalCapacityPreemptionPolicyMockFramework {
@Before
public void setup() {
- rc = new DefaultResourceCalculator();
+ resourceCalculator = new DefaultResourceCalculator();
super.setup();
policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
}
@@ -82,13 +83,13 @@ public class TestPreemptionForQueueWithPriorities
// 10 preempted from app1, 15 preempted from app2, and nothing preempted
// from app3
- verify(mDisp, times(10)).handle(argThat(
+ verify(eventHandler, times(10)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, times(15)).handle(argThat(
+ verify(eventHandler, times(15)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
- verify(mDisp, never()).handle(argThat(
+ verify(eventHandler, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@@ -135,13 +136,13 @@ public class TestPreemptionForQueueWithPriorities
// 10 preempted from app1, 15 preempted from app2, and nothing preempted
// from app3
- verify(mDisp, times(10)).handle(argThat(
+ verify(eventHandler, times(10)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, times(15)).handle(argThat(
+ verify(eventHandler, times(15)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
- verify(mDisp, never()).handle(argThat(
+ verify(eventHandler, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@@ -185,10 +186,10 @@ public class TestPreemptionForQueueWithPriorities
policy.editSchedule();
// Nothing preempted
- verify(mDisp, never()).handle(argThat(
+ verify(eventHandler, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, never()).handle(argThat(
+ verify(eventHandler, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@@ -236,16 +237,16 @@ public class TestPreemptionForQueueWithPriorities
// from app3/app4
// (After preemption, a has 35 - 23 = 12, b has 25 - 6 = 19, so a:b after
// preemption is 1.58, close to 1.50)
- verify(mDisp, times(23)).handle(argThat(
+ verify(eventHandler, times(23)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, times(6)).handle(argThat(
+ verify(eventHandler, times(6)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
- verify(mDisp, never()).handle(argThat(
+ verify(eventHandler, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
- verify(mDisp, never()).handle(argThat(
+ verify(eventHandler, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
}
@@ -295,16 +296,16 @@ public class TestPreemptionForQueueWithPriorities
// We suppose to preempt some resource from A, but now since queueA
// disables preemption, so we need to preempt some resource from B and
// some from C even if C has higher priority than A
- verify(mDisp, never()).handle(argThat(
+ verify(eventHandler, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, times(9)).handle(argThat(
+ verify(eventHandler, times(9)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
- verify(mDisp, times(19)).handle(argThat(
+ verify(eventHandler, times(19)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
- verify(mDisp, never()).handle(argThat(
+ verify(eventHandler, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
}
@@ -352,16 +353,16 @@ public class TestPreemptionForQueueWithPriorities
// Preemption should first divide capacities between a / b, and b2 should
// get less preemption than b1 (because b2 has higher priority)
- verify(mDisp, times(6)).handle(argThat(
+ verify(eventHandler, times(6)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, times(1)).handle(argThat(
+ verify(eventHandler, times(1)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
- verify(mDisp, times(13)).handle(argThat(
+ verify(eventHandler, times(13)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
- verify(mDisp, times(10)).handle(argThat(
+ verify(eventHandler, times(10)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
}
@@ -408,16 +409,16 @@ public class TestPreemptionForQueueWithPriorities
// Preemption should first divide capacities between a / b, and b1 should
// get less preemption than b2 (because b1 has higher priority)
- verify(mDisp, times(3)).handle(argThat(
+ verify(eventHandler, times(3)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, never()).handle(argThat(
+ verify(eventHandler, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
- verify(mDisp, never()).handle(argThat(
+ verify(eventHandler, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
- verify(mDisp, times(2)).handle(argThat(
+ verify(eventHandler, times(2)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
}
@@ -474,23 +475,23 @@ public class TestPreemptionForQueueWithPriorities
// get less preemption than a1 (because a2 has higher priority). More
// specifically, a2 will not get preempted since the resource preempted
// from a1 can satisfy b already.
- verify(mDisp, times(7)).handle(argThat(
+ verify(eventHandler, times(7)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, never()).handle(argThat(
+ verify(eventHandler, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
- verify(mDisp, never()).handle(argThat(
+ verify(eventHandler, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@Test
public void test3ResourceTypesInterQueuePreemption() throws IOException {
- rc = new DominantResourceCalculator();
- when(cs.getResourceCalculator()).thenReturn(rc);
+ resourceCalculator = new DominantResourceCalculator();
+ when(cs.getResourceCalculator()).thenReturn(resourceCalculator);
// Initialize resource map
String RESOURCE_1 = "res1";
@@ -535,7 +536,7 @@ public class TestPreemptionForQueueWithPriorities
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
- verify(mDisp, times(1)).handle(argThat(
+ verify(eventHandler, times(1)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
@@ -589,10 +590,10 @@ public class TestPreemptionForQueueWithPriorities
// b and c has same relativeAssigned=1.0f(idealAssigned / guaranteed),
// since c has higher priority, c will be put in mostUnderServedQueue and
// get all remain 30 capacity.
- verify(mDisp, times(10)).handle(argThat(
+ verify(eventHandler, times(10)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, never()).handle(argThat(
+ verify(eventHandler, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
index 0300e19..96e91d5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Before;
import org.junit.Test;
@@ -90,13 +91,13 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
// 30 preempted from app1, 30 preempted from app4, and nothing preempted
// from app2/app3
- verify(mDisp, times(30)).handle(
+ verify(eventHandler, times(30)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
- verify(mDisp, times(30)).handle(
+ verify(eventHandler, times(30)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(4))));
- verify(mDisp, never()).handle(
+ verify(eventHandler, never()).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
- verify(mDisp, never()).handle(
+ verify(eventHandler, never()).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
}
@@ -146,9 +147,9 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
policy.editSchedule();
// No preemption happens
- verify(mDisp, never()).handle(
+ verify(eventHandler, never()).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
- verify(mDisp, never()).handle(
+ verify(eventHandler, never()).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
}
@@ -195,9 +196,9 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
// 30 preempted from app1, 30 preempted from app4, and nothing preempted
// from app2/app3
- verify(mDisp, times(20)).handle(
+ verify(eventHandler, times(20)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
- verify(mDisp, times(30)).handle(
+ verify(eventHandler, times(30)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
}
@@ -247,16 +248,16 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
policy.editSchedule();
// 4 from app1
- verify(mDisp, times(4)).handle(
+ verify(eventHandler, times(4)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
// 19 from app2-app5
- verify(mDisp, times(19)).handle(
+ verify(eventHandler, times(19)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
- verify(mDisp, times(19)).handle(
+ verify(eventHandler, times(19)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
- verify(mDisp, times(19)).handle(
+ verify(eventHandler, times(19)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(4))));
- verify(mDisp, times(19)).handle(
+ verify(eventHandler, times(19)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(5))));
}
@@ -308,16 +309,16 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
policy.editSchedule();
// 4 from app1
- verify(mDisp, times(19)).handle(
+ verify(eventHandler, times(19)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
// 19 from app2-app5
- verify(mDisp, times(19)).handle(
+ verify(eventHandler, times(19)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
- verify(mDisp, times(19)).handle(
+ verify(eventHandler, times(19)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
- verify(mDisp, times(20)).handle(
+ verify(eventHandler, times(20)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(4))));
- verify(mDisp, times(20)).handle(
+ verify(eventHandler, times(20)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(5))));
}
@@ -370,11 +371,11 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
policy.editSchedule();
// 10 preempted from app1, nothing preempted from app2-app3
- verify(mDisp, times(10)).handle(
+ verify(eventHandler, times(10)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
- verify(mDisp, never()).handle(
+ verify(eventHandler, never()).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
- verify(mDisp, never()).handle(
+ verify(eventHandler, never()).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
}
@@ -429,11 +430,11 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
policy.editSchedule();
// 15 will be preempted app2/app3
- verify(mDisp, times(15)).handle(
+ verify(eventHandler, times(15)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
- verify(mDisp, times(15)).handle(
+ verify(eventHandler, times(15)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
- verify(mDisp, never()).handle(
+ verify(eventHandler, never()).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
}
@@ -499,13 +500,13 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
policy.editSchedule();
// 10 will be preempted from app1 (a1) /app4 (b2)
- verify(mDisp, times(10)).handle(
+ verify(eventHandler, times(10)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
- verify(mDisp, times(10)).handle(
+ verify(eventHandler, times(10)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(4))));
- verify(mDisp, never()).handle(
+ verify(eventHandler, never()).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
- verify(mDisp, never()).handle(
+ verify(eventHandler, never()).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
}
@@ -559,9 +560,9 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
- verify(mDisp, times(50)).handle(
+ verify(eventHandler, times(50)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
- verify(mDisp, times(30)).handle(
+ verify(eventHandler, times(30)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
}
@@ -610,13 +611,13 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
// 30 preempted from app1, 30 preempted from app4, and nothing preempted
// from app2/app3
- verify(mDisp, times(30)).handle(
+ verify(eventHandler, times(30)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
- verify(mDisp, times(30)).handle(
+ verify(eventHandler, times(30)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(4))));
- verify(mDisp, never()).handle(
+ verify(eventHandler, never()).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
- verify(mDisp, never()).handle(
+ verify(eventHandler, never()).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
}
@@ -683,7 +684,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
- verify(mDisp, times(7)).handle(
+ verify(eventHandler, times(7)).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
riMap.remove(RESOURCE_1);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java
index 5410931..6c72349 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.junit.Before;
import org.junit.Test;
@@ -82,14 +83,14 @@ public class TestProportionalCapacityPreemptionPolicyForReservedContainers
// Total 5 preempted from app1 at n1, don't preempt container from other
// app/node
- verify(mDisp, times(5)).handle(argThat(
+ verify(eventHandler, times(5)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, times(5)).handle(
+ verify(eventHandler, times(5)).handle(
argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1),
"root.a",
NodeId.newInstance("n1", 1))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@@ -136,16 +137,16 @@ public class TestProportionalCapacityPreemptionPolicyForReservedContainers
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
- verify(mDisp, times(15)).handle(argThat(
+ verify(eventHandler, times(15)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, times(10)).handle(
+ verify(eventHandler, times(10)).handle(
argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
NodeId.newInstance("n1", 1))));
- verify(mDisp, times(5)).handle(
+ verify(eventHandler, times(5)).handle(
argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
NodeId.newInstance("n2", 1))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@@ -198,16 +199,16 @@ public class TestProportionalCapacityPreemptionPolicyForReservedContainers
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
- verify(mDisp, times(15)).handle(argThat(
+ verify(eventHandler, times(15)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, times(0)).handle(
+ verify(eventHandler, times(0)).handle(
argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
NodeId.newInstance("n1", 1))));
- verify(mDisp, times(15)).handle(
+ verify(eventHandler, times(15)).handle(
argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
NodeId.newInstance("n2", 1))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@@ -256,10 +257,10 @@ public class TestProportionalCapacityPreemptionPolicyForReservedContainers
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@@ -308,10 +309,10 @@ public class TestProportionalCapacityPreemptionPolicyForReservedContainers
// Total 4 preempted from app1 at n1, don't preempt container from other
// app/node
- verify(mDisp, times(4)).handle(argThat(
+ verify(eventHandler, times(4)).handle(argThat(
new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "root.a",
NodeId.newInstance("n1", 1))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "root.a",
NodeId.newInstance("n2", 1))));
}
@@ -361,10 +362,10 @@ public class TestProportionalCapacityPreemptionPolicyForReservedContainers
// Total 4 preempted from app1 at n1, don't preempt container from other
// app/node
- verify(mDisp, times(4)).handle(argThat(
+ verify(eventHandler, times(4)).handle(argThat(
new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "root.a",
NodeId.newInstance("n1", 1))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "root.a",
NodeId.newInstance("n2", 1))));
}
@@ -415,16 +416,16 @@ public class TestProportionalCapacityPreemptionPolicyForReservedContainers
policy.editSchedule();
// No preemption should happen
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
NodeId.newInstance("n1", 1))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
NodeId.newInstance("n2", 1))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
NodeId.newInstance("n3", 1))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
NodeId.newInstance("n4", 1))));
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java
index 7e3d686..c42867a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@@ -40,8 +41,8 @@ public class TestProportionalCapacityPreemptionPolicyInterQueueWithDRF
@Before
public void setup() {
super.setup();
- rc = new DominantResourceCalculator();
- when(cs.getResourceCalculator()).thenReturn(rc);
+ resourceCalculator = new DominantResourceCalculator();
+ when(cs.getResourceCalculator()).thenReturn(resourceCalculator);
policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
}
@@ -75,10 +76,10 @@ public class TestProportionalCapacityPreemptionPolicyInterQueueWithDRF
policy.editSchedule();
// Preemption should happen in Queue b, preempt <10,20> to Queue a
- verify(mDisp, never()).handle(argThat(
+ verify(eventHandler, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, times(5)).handle(argThat(
+ verify(eventHandler, times(5)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@@ -121,7 +122,7 @@ public class TestProportionalCapacityPreemptionPolicyInterQueueWithDRF
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
- verify(mDisp, times(1)).handle(argThat(
+ verify(eventHandler, times(1)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
@@ -166,10 +167,10 @@ public class TestProportionalCapacityPreemptionPolicyInterQueueWithDRF
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, times(1)).handle(argThat(
+ verify(eventHandler, times(1)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
index e2336fb..8077962 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.junit.Before;
import org.junit.Test;
@@ -35,7 +36,7 @@ import static org.mockito.Mockito.verify;
*/
public class TestProportionalCapacityPreemptionPolicyIntraQueue
extends
- ProportionalCapacityPreemptionPolicyMockFramework {
+ ProportionalCapacityPreemptionPolicyMockFramework {
@Before
public void setup() {
super.setup();
@@ -101,10 +102,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
// For queue B, app3 and app4 were of lower priority. Hence take 8
// containers from them by hitting the intraQueuePreemptionDemand of 20%.
- verify(mDisp, times(1)).handle(argThat(
+ verify(eventHandler, times(1)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
- verify(mDisp, times(7)).handle(argThat(
+ verify(eventHandler, times(7)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@@ -156,10 +157,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@@ -215,16 +216,16 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
policy.editSchedule();
// For queue B, none of the apps should be preempted.
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(5))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(6))));
}
@@ -272,16 +273,16 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
// For queue A/B, none of the apps should be preempted as used capacity
// is under 50%.
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
}
@@ -337,7 +338,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
// For queueB, eventhough app4 needs 100 resources, only 30 resources were
// preempted. (max is 50% of guaranteed cap of any queue
// "maxIntraQueuePreemptable")
- verify(mDisp, times(30)).handle(argThat(
+ verify(eventHandler, times(30)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@@ -391,7 +392,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
// For queue B eventhough app4 needs 100 resources, only 10 resources were
// preempted. This is the 10% limit of TOTAL_PREEMPTION_PER_ROUND.
- verify(mDisp, times(10)).handle(argThat(
+ verify(eventHandler, times(10)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@@ -448,10 +449,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
// As per intra queue preemption algorithm, 20 more containers were needed
// for app2 (in queue a). Inter queue pre-emption had already preselected 9
// containers and hence preempted only 11 more.
- verify(mDisp, times(20)).handle(argThat(
+ verify(eventHandler, times(20)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, never()).handle(argThat(
+ verify(eventHandler, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@@ -502,10 +503,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
policy.editSchedule();
// Ensure that only 9 containers are preempted from app2 (sparing 1 AM)
- verify(mDisp, times(11)).handle(argThat(
+ verify(eventHandler, times(11)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, times(9)).handle(argThat(
+ verify(eventHandler, times(9)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@@ -552,10 +553,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
policy.editSchedule();
// Make sure that app1's Am container is spared. Only 9/10 is preempted.
- verify(mDisp, times(9)).handle(argThat(
+ verify(eventHandler, times(9)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, never()).handle(argThat(
+ verify(eventHandler, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@@ -595,7 +596,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
policy.editSchedule();
// Ensure there are 0 preemptions since only one app is running in queue.
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
@@ -638,13 +639,13 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
policy.editSchedule();
// Complete demand request from QueueB for 20 resource must be preempted.
- verify(mDisp, times(20)).handle(argThat(
+ verify(eventHandler, times(20)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@@ -703,11 +704,11 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
policy.editSchedule();
// 20 preempted from app1
- verify(mDisp, times(20))
+ verify(eventHandler, times(20))
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
- verify(mDisp, never())
+ verify(eventHandler, never())
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
- verify(mDisp, never())
+ verify(eventHandler, never())
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
}
@@ -785,26 +786,26 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
// High priority app in queueA has 30 resource demand. But low priority
// app has only 5 resource. Hence preempt 4 here sparing AM.
- verify(mDisp, times(4)).handle(argThat(
+ verify(eventHandler, times(4)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
// Multiple high priority apps has demand of 17. This will be preempted
// from another set of low priority apps.
- verify(mDisp, times(4)).handle(argThat(
+ verify(eventHandler, times(4)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
- verify(mDisp, times(9)).handle(argThat(
+ verify(eventHandler, times(9)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(6))));
- verify(mDisp, times(4)).handle(argThat(
+ verify(eventHandler, times(4)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(5))));
// Only 3 resources will be freed in this round for queue C as we
// are trying to save AM container.
- verify(mDisp, times(2)).handle(argThat(
+ verify(eventHandler, times(2)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(10))));
- verify(mDisp, times(1)).handle(argThat(
+ verify(eventHandler, times(1)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(11))));
}
@@ -861,7 +862,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
// 14 more (5 is already running) eventhough demand is for 30. Ideally we
// must preempt 15. But 15th container will bring user1's usage to 20 which
// is same as user-limit. Hence skip 15th container.
- verify(mDisp, times(14)).handle(argThat(
+ verify(eventHandler, times(14)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@@ -927,19 +928,19 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
// Label X: app3 has demand of 20 for label X. Hence app2 will loose
// 4 (sparing AM) and 16 more from app1 till preemption limit is met.
- verify(mDisp, times(16))
+ verify(eventHandler, times(16))
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
- verify(mDisp, times(4))
+ verify(eventHandler, times(4))
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
// Default Label:For a demand of 30, preempt from all low priority
// apps of default label. 25 will be preempted as preemption limit is
// met.
- verify(mDisp, times(1))
+ verify(eventHandler, times(1))
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(8))));
- verify(mDisp, times(2))
+ verify(eventHandler, times(2))
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(7))));
- verify(mDisp, times(22))
+ verify(eventHandler, times(22))
.handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(6))));
}
@@ -1017,10 +1018,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
// For queue B, app3 and app4 were of lower priority. Hence take 8
// containers from them by hitting the intraQueuePreemptionDemand of 20%.
- verify(mDisp, times(1)).handle(argThat(
+ verify(eventHandler, times(1)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
- verify(mDisp, times(7)).handle(argThat(
+ verify(eventHandler, times(7)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java
index 6e56cb5..eb9d218 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.verify;
import java.io.IOException;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.junit.Before;
import org.junit.Test;
@@ -85,7 +86,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
- verify(mDisp, times(20)).handle(argThat(
+ verify(eventHandler, times(20)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
@@ -133,7 +134,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
- verify(mDisp, times(5)).handle(argThat(
+ verify(eventHandler, times(5)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
@@ -156,7 +157,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
- verify(mDisp, times(15)).handle(argThat(
+ verify(eventHandler, times(15)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
@@ -208,7 +209,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
- verify(mDisp, times(20)).handle(argThat(
+ verify(eventHandler, times(20)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
@@ -258,18 +259,18 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering
// app3 is the younges and also over its user limit. 5 should be preempted
// from app3 until it comes down to user3's user limit.
- verify(mDisp, times(5)).handle(argThat(
+ verify(eventHandler, times(5)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
// User1's app2 is its youngest. 19 should be preempted from app2, leaving
// only the AM
- verify(mDisp, times(19)).handle(argThat(
+ verify(eventHandler, times(19)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
// Preempt the remaining resource from User1's oldest app1.
- verify(mDisp, times(1)).handle(argThat(
+ verify(eventHandler, times(1)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java
index ba5f4d1..9fb08cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.junit.Before;
import org.junit.Test;
@@ -33,7 +34,7 @@ import static org.mockito.Mockito.verify;
*/
public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
extends
- ProportionalCapacityPreemptionPolicyMockFramework {
+ ProportionalCapacityPreemptionPolicyMockFramework {
@Before
public void setup() {
super.setup();
@@ -94,7 +95,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
// app2 needs more resource and its well under its user-limit. Hence preempt
// resources from app1.
- verify(mDisp, times(30)).handle(argThat(
+ verify(eventHandler, times(30)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
@@ -149,7 +150,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
// app2 needs more resource. Since app1,2 are from same user, there wont be
// any preemption.
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
@@ -206,7 +207,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
// app2 needs more resource. Since app1,2 are from same user, there wont be
// any preemption.
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
@@ -262,7 +263,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
// app2 needs more resource and its well under its user-limit. Hence preempt
// resources from app1 even though its priority is more than app2.
- verify(mDisp, times(30)).handle(argThat(
+ verify(eventHandler, times(30)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
@@ -323,7 +324,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
// app2/app4 needs more resource and its well under its user-limit. Hence
// preempt resources from app3 (compare to app1, app3 has low priority).
- verify(mDisp, times(9)).handle(argThat(
+ verify(eventHandler, times(9)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@@ -384,16 +385,16 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
// app2/app4 needs more resource and its well under its user-limit. Hence
// preempt resources from app3 (compare to app1, app3 has low priority).
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
}
@@ -451,7 +452,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
// app2/app4 needs more resource and its well under its user-limit. Hence
// preempt resources from app1 (compare to app3, app1 has low priority).
- verify(mDisp, times(9)).handle(argThat(
+ verify(eventHandler, times(9)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
@@ -508,10 +509,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
// app2/app4 needs more resource and its well under its user-limit. Hence
// preempt resources from app1 (compare to app3, app1 has low priority).
- verify(mDisp, times(4)).handle(argThat(
+ verify(eventHandler, times(4)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, times(4)).handle(argThat(
+ verify(eventHandler, times(4)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(5))));
}
@@ -566,10 +567,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
// app2 needs more resource and its well under its user-limit. Hence preempt
// resources from app1.
- verify(mDisp, times(9)).handle(argThat(
+ verify(eventHandler, times(9)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@@ -626,10 +627,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
// app2/app4 needs more resource and its well under its user-limit. Hence
// preempt resources from app1 (compare to app3, app1 has low priority).
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@@ -686,10 +687,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
// app2/app4 needs more resource and its well under its user-limit. Hence
// preempt resources from app1 (compare to app3, app1 has low priority).
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@@ -746,10 +747,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
// app2/app4 needs more resource and its well under its user-limit. Hence
// preempt resources from app1 (compare to app3, app1 has low priority).
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@@ -806,10 +807,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
// app2 needs more resource and its well under its user-limit. Hence preempt
// 3 resources (9GB) from app1. We will not preempt last container as it may
// pull user's usage under its user-limit.
- verify(mDisp, times(3)).handle(argThat(
+ verify(eventHandler, times(3)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@@ -868,10 +869,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
// app2 needs more resource and its well under its user-limit. Hence preempt
// 3 resources (9GB) from app1. We will not preempt last container as it may
// pull user's usage under its user-limit.
- verify(mDisp, times(3)).handle(argThat(
+ verify(eventHandler, times(3)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
- verify(mDisp, times(0)).handle(argThat(
+ verify(eventHandler, times(0)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
@@ -892,7 +893,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
// app2 has priority demand within same user 'user1'. However user1's used
// is alredy under UL. Hence no preemption. We will still get 3 container
// while asserting as it was aleady selected in earlier round.
- verify(mDisp, times(3)).handle(argThat(
+ verify(eventHandler, times(3)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
@@ -927,7 +928,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
// app2 is right at its user limit and app1 needs one resource. Should
// preempt 1 container.
- verify(mDisp, times(1)).handle(argThat(
+ verify(eventHandler, times(1)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF.java
index 1f74487..940d11c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.junit.Before;
@@ -35,14 +36,14 @@ import static org.mockito.Mockito.when;
*/
public class TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF
extends
- ProportionalCapacityPreemptionPolicyMockFramework {
+ ProportionalCapacityPreemptionPolicyMockFramework {
@Before
public void setup() {
super.setup();
conf.setBoolean(
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true);
- rc = new DominantResourceCalculator();
- when(cs.getResourceCalculator()).thenReturn(rc);
+ resourceCalculator = new DominantResourceCalculator();
+ when(cs.getResourceCalculator()).thenReturn(resourceCalculator);
policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
}
@@ -102,10 +103,10 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF
// For queue B, app3 and app4 were of lower priority. Hence take 8
// containers from them by hitting the intraQueuePreemptionDemand of 20%.
- verify(mDisp, times(1)).handle(argThat(
+ verify(eventHandler, times(1)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
- verify(mDisp, times(3)).handle(argThat(
+ verify(eventHandler, times(3)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@@ -165,13 +166,13 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF
// For queue B, app3 and app4 were of lower priority. Hence take 4
// containers.
- verify(mDisp, times(9)).handle(argThat(
+ verify(eventHandler, times(9)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
- verify(mDisp, times(4)).handle(argThat(
+ verify(eventHandler, times(4)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
- verify(mDisp, times(4)).handle(argThat(
+ verify(eventHandler, times(4)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(5))));
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
index 964a230..d59c854 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java
index ec71e67..2e7b01e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.log4j.Level;
@@ -57,10 +58,10 @@ public class TestProportionalCapacityPreemptionPolicyPreemptToBalance
policy.editSchedule();
// I_A: A:30 B:35 C:35, preempt 5 from B and 15 from C to A
- verify(mDisp, times(5)).handle(argThat(
+ verify(eventHandler, times(5)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
- verify(mDisp, times(15)).handle(argThat(
+ verify(eventHandler, times(15)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
@@ -99,10 +100,10 @@ public class TestProportionalCapacityPreemptionPolicyPreemptToBalance
policy.editSchedule();
// I_A: A:33 B:33 C:33, preempt 7 from B and 17 from C to A
- verify(mDisp, times(7)).handle(argThat(
+ verify(eventHandler, times(7)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
- verify(mDisp, times(17)).handle(argThat(
+ verify(eventHandler, times(17)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
@@ -142,7 +143,7 @@ public class TestProportionalCapacityPreemptionPolicyPreemptToBalance
policy.editSchedule();
// I_A: A:15 B:42 C:43, preempt 7 from B and 17 from C to A
- verify(mDisp, times(8)).handle(argThat(
+ verify(eventHandler, times(8)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
@@ -178,7 +179,7 @@ public class TestProportionalCapacityPreemptionPolicyPreemptToBalance
policy.editSchedule();
// 21 containers will be preempted here
- verify(mDisp, times(21)).handle(argThat(
+ verify(eventHandler, times(21)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.
IsPreemptionRequestFor(getAppAttemptId(2))));
@@ -240,7 +241,7 @@ public class TestProportionalCapacityPreemptionPolicyPreemptToBalance
assertEquals(hasFifoSelector, true);
// 21 containers will be preempted here
- verify(mDisp, times(21)).handle(argThat(
+ verify(eventHandler, times(21)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.
IsPreemptionRequestFor(getAppAttemptId(2))));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/ContainerSpecification.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/ContainerSpecification.java
new file mode 100644
index 0000000..eb24c1a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/ContainerSpecification.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework.parseResourceFromString;
+
+public class ContainerSpecification {
+ Priority priority;
+ Resource resource = Resource.newInstance(0, 0);
+ Resource pendingResource = Resource.newInstance(0, 0);
+ NodeId nodeId;
+ String label;
+ int repeat;
+ boolean reserved;
+ String username;
+
+ private ContainerSpecification(Builder builder) {
+ if (builder.resource != null) {
+ Resources.addTo(resource, builder.resource);
+ }
+ if (builder.pendingResource != null) {
+ Resources.addTo(pendingResource, builder.pendingResource);
+ }
+ this.priority = builder.priority;
+ this.nodeId = builder.nodeId;
+ this.label = builder.label;
+ this.repeat = builder.repeat;
+ this.reserved = builder.reserved;
+ this.username = builder.username;
+ }
+
+ static class Builder {
+ private Priority priority;
+ private Resource resource;
+ private NodeId nodeId;
+ private String label;
+ private int repeat;
+ private boolean reserved;
+ private Resource pendingResource;
+ private String username = "user";
+
+ public static Builder create() {
+ return new Builder();
+ }
+
+ Builder withPriority(String value) {
+ this.priority = Priority.newInstance(Integer.valueOf(value));
+ return this;
+ }
+
+ Builder withResource(String value) {
+ this.resource = parseResourceFromString(value);
+ return this;
+ }
+
+ Builder withHostname(String value) {
+ this.nodeId = NodeId.newInstance(value, 1);
+ return this;
+ }
+
+ Builder withLabel(String value) {
+ this.label = value;
+ return this;
+ }
+
+ Builder withRepeat(String repeat) {
+ this.repeat = Integer.valueOf(repeat);
+ return this;
+ }
+
+ Builder withReserved(String value) {
+ this.reserved = Boolean.valueOf(value);
+ return this;
+ }
+
+ Builder withPendingResource(String value) {
+ this.pendingResource = parseResourceFromString(value);
+ return this;
+ }
+
+ Builder withUsername(String value) {
+ this.username = value;
+ return this;
+ }
+
+ public ContainerSpecification build() {
+ return new ContainerSpecification(this);
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplication.java
new file mode 100644
index 0000000..f40b4a0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplication.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class MockApplication {
+ private static final Logger LOG = LoggerFactory.getLogger(MockApplication.class);
+ private List<RMContainer> liveContainers = new ArrayList<>();
+ private List<RMContainer> reservedContainers = new ArrayList<>();
+
+ private ApplicationId appId;
+ final String containersConfig;
+ final String queueName;
+ ApplicationAttemptId appAttemptId;
+ FiCaSchedulerApp app;
+
+ MockApplication(int id, String containersConfig, String queueName) {
+ this.appId = ApplicationId.newInstance(0L, id);
+ this.containersConfig = containersConfig;
+ this.queueName = queueName;
+
+ //dynamic fields
+ this.appAttemptId = ApplicationAttemptId
+ .newInstance(appId, 1);
+ //this must be the last step
+ setupInitialMocking(queueName);
+ }
+
+ private void setupInitialMocking(String queueName) {
+ this.app = mock(FiCaSchedulerApp.class);
+ when(app.getAMResource(anyString()))
+ .thenReturn(Resources.createResource(0, 0));
+ when(app.getLiveContainers()).thenReturn(liveContainers);
+ when(app.getReservedContainers()).thenReturn(reservedContainers);
+ when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
+ when(app.getApplicationId()).thenReturn(appId);
+ when(app.getQueueName()).thenReturn(queueName);
+ }
+
+ private void addLiveContainer(RMContainer c) {
+ this.liveContainers.add(c);
+ }
+
+ private void addReservedContainer(RMContainer c) {
+ this.reservedContainers.add(c);
+ }
+
+ void addMockContainer(MockContainer mockContainer,
+ FiCaSchedulerNode schedulerNode, LeafQueue queue) {
+ int containerId = mockContainer.containerId;
+ ContainerSpecification containerSpec = mockContainer.containerSpec;
+
+ if (containerId == 1) {
+ when(app.getAMResource(containerSpec.label)).thenReturn(containerSpec.resource);
+ when(app.getAppAMNodePartitionName()).thenReturn(containerSpec.label);
+ }
+
+ if (containerSpec.reserved) {
+ addReservedContainer(mockContainer.rmContainerMock);
+ } else {
+ addLiveContainer(mockContainer.rmContainerMock);
+ }
+
+ // Add container to scheduler-node
+ addContainerToSchedulerNode(schedulerNode, mockContainer.rmContainerMock, containerSpec.reserved);
+
+ // If this is a non-exclusive allocation
+ String partition = null;
+ if (containerSpec.label.isEmpty()
+ && !(partition = schedulerNode.getPartition())
+ .isEmpty()) {
+ Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers = queue
+ .getIgnoreExclusivityRMContainers();
+ if (!ignoreExclusivityContainers.containsKey(partition)) {
+ ignoreExclusivityContainers.put(partition, new TreeSet<>());
+ }
+ ignoreExclusivityContainers.get(partition).add(mockContainer.rmContainerMock);
+ LOG.info("Added an ignore-exclusivity container to partition {}, new size is: {}", partition, ignoreExclusivityContainers.get(partition).size());
+
+ }
+ LOG.debug("add container to app=" + appAttemptId + " res=" + containerSpec.resource + " node="
+ + containerSpec.nodeId + " nodeLabelExpression=" + containerSpec.label + " partition="
+ + partition);
+ }
+
+ void addAggregatedContainerData(ContainerSpecification containerSpec,
+ Resource usedResources) {
+ // If app has 0 container, and it has only pending, still make sure to
+ // update label.
+ if (containerSpec.repeat == 0) {
+ when(app.getAppAMNodePartitionName()).thenReturn(containerSpec.label);
+ }
+
+ // Some more app specific aggregated data can be better filled here.
+ when(app.getPriority()).thenReturn(containerSpec.priority);
+ when(app.getUser()).thenReturn(containerSpec.username);
+ when(app.getCurrentConsumption()).thenReturn(usedResources);
+ when(app.getCurrentReservation())
+ .thenReturn(Resources.createResource(0, 0));
+
+ Map<String, Resource> pendingForDefaultPartition =
+ new HashMap<>();
+ // Add for default partition for now.
+ pendingForDefaultPartition.put(containerSpec.label, containerSpec.pendingResource);
+ when(app.getTotalPendingRequestsPerPartition())
+ .thenReturn(pendingForDefaultPartition);
+
+ // need to set pending resource in resource usage as well
+ ResourceUsage ru = Mockito.spy(new ResourceUsage());
+ ru.setUsed(containerSpec.label, usedResources);
+ when(ru.getCachedUsed(anyString())).thenReturn(usedResources);
+ when(app.getAppAttemptResourceUsage()).thenReturn(ru);
+ when(app.getSchedulingResourceUsage()).thenReturn(ru);
+ }
+
+ private void addContainerToSchedulerNode(SchedulerNode node, RMContainer container,
+ boolean isReserved) {
+ assert node != null;
+
+ if (isReserved) {
+ when(node.getReservedContainer()).thenReturn(container);
+ } else {
+ node.getCopiedListOfRunningContainers().add(container);
+ Resources.subtractFrom(node.getUnallocatedResource(),
+ container.getAllocatedResource());
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java
new file mode 100644
index 0000000..b168612
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+class MockApplications {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ MockApplications.class);
+
+ private String config;
+ private ResourceCalculator resourceCalculator;
+ private Map<String, CSQueue> nameToCSQueues;
+ private Map<String, Resource> partitionToResource;
+ private Map<NodeId, FiCaSchedulerNode> nodeIdToSchedulerNodes;
+ private Map<String, Set<String>> userMap = new HashMap<>();
+ private Map<String, Map<String, HashMap<String, ResourceUsage>>> userResourceUsagePerLabel = new HashMap<>();
+ private int id = 1;
+
+ MockApplications(String appsConfig,
+ ResourceCalculator resourceCalculator,
+ Map<String, CSQueue> nameToCSQueues,
+ Map<String, Resource> partitionToResource,
+ Map<NodeId, FiCaSchedulerNode> nodeIdToSchedulerNodes) {
+ this.config = appsConfig;
+ this.resourceCalculator = resourceCalculator;
+ this.nameToCSQueues = nameToCSQueues;
+ this.partitionToResource = partitionToResource;
+ this.nodeIdToSchedulerNodes = nodeIdToSchedulerNodes;
+ init();
+ }
+
+ /**
+ * Format is:
+ * <pre>
+ * queueName\t // app1
+ * (priority,resource,host,expression,#repeat,reserved)
+ * (priority,resource,host,expression,#repeat,reserved);
+ * queueName\t // app2
+ * </pre>
+ */
+ private void init() {
+ int mulp = -1;
+ for (String appConfig : config.split(";")) {
+ String[] appConfigComponents = appConfig.split("\t");
+ String queueName = appConfigComponents[0];
+ if (mulp <= 0 && appConfigComponents.length > 2 && appConfigComponents[2] != null) {
+ LOG.info("Mulp value: " + appConfigComponents[2]);
+ mulp = 100 / (Integer.parseInt(appConfigComponents[2]));
+ }
+
+ String containersConfig = appConfigComponents[1];
+ MockApplication mockApp = new MockApplication(id, containersConfig, queueName);
+ new MockContainers(mockApp, nameToCSQueues, nodeIdToSchedulerNodes);
+ add(mockApp);
+ id++;
+ }
+ setupUserResourceUsagePerLabel(resourceCalculator, mulp);
+ }
+
+ private void add(MockApplication mockApp) {
+ // add to LeafQueue
+ LeafQueue queue = (LeafQueue) nameToCSQueues.get(mockApp.queueName);
+ queue.getApplications().add(mockApp.app);
+ queue.getAllApplications().add(mockApp.app);
+ when(queue.getMinimumAllocation()).thenReturn(Resource.newInstance(1,1));
+ when(mockApp.app.getCSLeafQueue()).thenReturn(queue);
+
+ LOG.debug("Application mock: queue: " + mockApp.queueName + ", appId:" + mockApp.app);
+
+ Set<String> users = userMap.computeIfAbsent(mockApp.queueName, k -> new HashSet<>());
+ users.add(mockApp.app.getUser());
+
+ String label = mockApp.app.getAppAMNodePartitionName();
+
+ // Get label to queue
+ Map<String, HashMap<String, ResourceUsage>> userResourceUsagePerQueue =
+ userResourceUsagePerLabel.computeIfAbsent(label, k -> new HashMap<>());
+
+ // Get queue to user based resource map
+ Map<String, ResourceUsage> userResourceUsage =
+ userResourceUsagePerQueue.computeIfAbsent(mockApp.queueName, k -> new HashMap<>());
+
+ // Get user to its resource usage.
+ ResourceUsage usage = userResourceUsage.get(mockApp.app.getUser());
+ if (null == usage) {
+ usage = new ResourceUsage();
+ userResourceUsage.put(mockApp.app.getUser(), usage);
+ }
+
+ usage.incAMUsed(mockApp.app.getAMResource(label));
+ usage.incUsed(mockApp.app.getAppAttemptResourceUsage().getUsed(label));
+ }
+
+ private void setupUserResourceUsagePerLabel(ResourceCalculator resourceCalculator,
+ int mulp) {
+ for (String label : userResourceUsagePerLabel.keySet()) {
+ for (String queueName : userMap.keySet()) {
+ LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
+ // Currently we have user-limit test support only for default label.
+ Resource toResourcePartition = partitionToResource.get("");
+ Resource capacity = Resources.multiply(toResourcePartition,
+ queue.getQueueCapacities().getAbsoluteCapacity());
+ Set<String> users = userMap.get(queue.getQueueName());
+ //TODO: Refactor this test class to use queue path internally like
+ // CS does from now on
+ if (users == null) {
+ users = userMap.get(queue.getQueuePath());
+ }
+ when(queue.getAllUsers()).thenReturn(users);
+ Resource userLimit = calculateUserLimit(resourceCalculator, mulp, capacity,
+ users);
+ LOG.debug("Updating user-limit from mock: toResourcePartition="
+ + toResourcePartition + ", capacity=" + capacity
+ + ", users.size()=" + users.size() + ", userLimit= " + userLimit
+ + ",label= " + label + ",queueName= " + queueName);
+
+ setupUserToQueueSettings(label, queueName, queue, users, userLimit);
+ }
+ }
+ }
+
+ private void setupUserToQueueSettings(String label, String queueName,
+ LeafQueue queue, Set<String> users, Resource userLimit) {
+ Map<String, ResourceUsage> userResourceUsage =
+ userResourceUsagePerLabel.get(label).get(queueName);
+ for (String userName : users) {
+ User user = new User(userName);
+ if (userResourceUsage != null) {
+ user.setResourceUsage(userResourceUsage.get(userName));
+ }
+ when(queue.getUser(eq(userName))).thenReturn(user);
+ when(queue.getResourceLimitForAllUsers(eq(userName),
+ any(Resource.class), anyString(), any(SchedulingMode.class)))
+ .thenReturn(userLimit);
+ }
+ }
+
+ private Resource calculateUserLimit(ResourceCalculator resourceCalculator,
+ int mulp, Resource capacity, Set<String> users) {
+ if (mulp > 0) {
+ return Resources.divideAndCeil(resourceCalculator, capacity, mulp);
+ } else {
+ return Resources.divideAndCeil(resourceCalculator, capacity, users.size());
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockContainer.java
new file mode 100644
index 0000000..95f7391
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockContainer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class MockContainer {
+ ContainerSpecification containerSpec;
+ public int containerId;
+ private MockApplication mockApp;
+ RMContainerImpl rmContainerMock;
+
+ MockContainer(ContainerSpecification containerSpec,
+ int containerId, MockApplication mockApp) {
+ this.containerSpec = containerSpec;
+ this.containerId = containerId;
+ this.mockApp = mockApp;
+ this.rmContainerMock = mock(RMContainerImpl.class);
+ init();
+ }
+
+ private void init() {
+ Container c = mock(Container.class);
+ when(c.getResource()).thenReturn(containerSpec.resource);
+ when(c.getPriority()).thenReturn(containerSpec.priority);
+ SchedulerRequestKey sk = SchedulerRequestKey.extractFrom(c);
+ when(rmContainerMock.getAllocatedSchedulerKey()).thenReturn(sk);
+ when(rmContainerMock.getAllocatedNode()).thenReturn(containerSpec.nodeId);
+ when(rmContainerMock.getNodeLabelExpression()).thenReturn(containerSpec.label);
+ when(rmContainerMock.getAllocatedResource()).thenReturn(containerSpec.resource);
+ when(rmContainerMock.getContainer()).thenReturn(c);
+ when(rmContainerMock.getApplicationAttemptId()).thenReturn(mockApp.appAttemptId);
+ when(rmContainerMock.getQueueName()).thenReturn(mockApp.queueName);
+ final ContainerId cId = ContainerId.newContainerId(mockApp.appAttemptId,
+ containerId);
+ when(rmContainerMock.getContainerId()).thenReturn(cId);
+ doAnswer(new Answer<Integer>() {
+ @Override
+ public Integer answer(InvocationOnMock invocation) {
+ return cId.compareTo(
+ ((RMContainer) invocation.getArguments()[0]).getContainerId());
+ }
+ }).when(rmContainerMock).compareTo(any(RMContainer.class));
+
+ if (containerId == 1) {
+ when(rmContainerMock.isAMContainer()).thenReturn(true);
+ }
+
+ if (containerSpec.reserved) {
+ when(rmContainerMock.getReservedResource()).thenReturn(containerSpec.resource);
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockContainers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockContainers.java
new file mode 100644
index 0000000..5a4212d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockContainers.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.Map;
+
+class MockContainers {
+ private MockApplication mockApp;
+ private Map<String, CSQueue> nameToCSQueues;
+ private Map<NodeId, FiCaSchedulerNode> nodeIdToSchedulerNodes;
+
+ MockContainers(MockApplication mockApp,
+ Map<String, CSQueue> nameToCSQueues,
+ Map<NodeId, FiCaSchedulerNode> nodeIdToSchedulerNodes) {
+ this.mockApp = mockApp;
+ this.nameToCSQueues = nameToCSQueues;
+ this.nodeIdToSchedulerNodes = nodeIdToSchedulerNodes;
+ init();
+ }
+
+ private void init() {
+ String containersConfig = mockApp.containersConfig;
+ int start = containersConfig.indexOf("=") + 1;
+ int end = -1;
+ int containerId = 1;
+
+ while (start < containersConfig.length()) {
+ while (start < containersConfig.length()
+ && containersConfig.charAt(start) != '(') {
+ start++;
+ }
+ if (start >= containersConfig.length()) {
+ throw new IllegalArgumentException(
+ "Error in containers specification, line=" + containersConfig);
+ }
+ end = start + 1;
+ while (end < containersConfig.length()
+ && containersConfig.charAt(end) != ')') {
+ end++;
+ }
+ if (end >= containersConfig.length()) {
+ throw new IllegalArgumentException(
+ "Error in containers specification, line=" + containersConfig);
+ }
+
+ // now we found start/end, get container values
+ String[] values = containersConfig.substring(start + 1, end).split(",");
+ if (values.length < 6 || values.length > 8) {
+ throw new IllegalArgumentException("Format to define container is:"
+ + "(priority,resource,host,label expression,repeat,reserved, pending)");
+ }
+
+ ContainerSpecification.Builder builder = ContainerSpecification.Builder.create()
+ .withPriority(values[0])
+ .withResource(values[1])
+ .withHostname(values[2])
+ .withLabel(values[3])
+ .withRepeat(values[4])
+ .withReserved(values[5]);
+
+ if (values.length >= 7) {
+ builder.withPendingResource(values[6]);
+ }
+ if (values.length == 8) {
+ builder.withUsername(values[7]);
+ }
+ ContainerSpecification containerSpec = builder.build();
+
+ Resource usedResources = Resource.newInstance(0, 0);
+ for (int i = 0; i < containerSpec.repeat; i++) {
+ Resources.addTo(usedResources, containerSpec.resource);
+ MockContainer mockContainer = new MockContainer(containerSpec, containerId, mockApp);
+ FiCaSchedulerNode schedulerNode =
+ nodeIdToSchedulerNodes.get(containerSpec.nodeId);
+ LeafQueue queue = (LeafQueue) nameToCSQueues.get(mockApp.queueName);
+ mockApp.addMockContainer(mockContainer, schedulerNode, queue);
+ containerId++;
+ }
+ mockApp.addAggregatedContainerData(containerSpec, usedResources);
+ start = end + 1;
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockNodeLabelsManager.java
new file mode 100644
index 0000000..2be8a78
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockNodeLabelsManager.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework.parseResourceFromString;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+class MockNodeLabelsManager {
+ private static final Logger LOG = LoggerFactory.getLogger(MockNodeLabelsManager.class);
+
+ private String config;
+ private final Resource clusterResource;
+ private final Map<String, Resource> partitionToResource;
+ private final RMNodeLabelsManager nodeLabelsManager;
+
+ MockNodeLabelsManager(String config,
+ RMNodeLabelsManager nodeLabelsManager,
+ Map<String, Resource> partitionToResource) throws IOException {
+ this.config = config;
+ this.partitionToResource = partitionToResource;
+ this.clusterResource = Resources.createResource(0);
+ this.nodeLabelsManager = nodeLabelsManager;
+ this.parse();
+ }
+
+ /**
+ * Format is:
+ * <pre>
+ * partition0=total_resource,exclusivity;
+ * partition1=total_resource,exclusivity;
+ * ...
+ * </pre>
+ */
+ private void parse() throws IOException {
+ String[] partitionConfigArr = config.split(";");
+ for (String p : partitionConfigArr) {
+ String partitionName = p.substring(0, p.indexOf("="));
+ Resource res = parseResourceFromString(p.substring(p.indexOf("=") + 1,
+ p.indexOf(",")));
+ boolean exclusivity =
+ Boolean.valueOf(p.substring(p.indexOf(",") + 1));
+ when(nodeLabelsManager.getResourceByLabel(eq(partitionName), any(Resource.class)))
+ .thenReturn(res);
+ when(nodeLabelsManager.isExclusiveNodeLabel(eq(partitionName))).thenReturn(exclusivity);
+
+ // add to partition to resource
+ partitionToResource.put(partitionName, res);
+ LOG.debug("add partition=" + partitionName + " totalRes=" + res
+ + " exclusivity=" + exclusivity);
+ Resources.addTo(clusterResource, res);
+ }
+
+ when(nodeLabelsManager.getClusterNodeLabelNames()).thenReturn(
+ partitionToResource.keySet());
+ }
+
+ public Resource getClusterResource() {
+ return clusterResource;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockQueueHierarchy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockQueueHierarchy.java
new file mode 100644
index 0000000..ae4ff5a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockQueueHierarchy.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework.parseResourceFromString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class MockQueueHierarchy {
+ private static final Logger LOG = LoggerFactory.getLogger(MockQueueHierarchy.class);
+ private final String ROOT = CapacitySchedulerConfiguration.ROOT;
+ private final ParentQueue rootQueue;
+ private String config;
+ private final CapacityScheduler cs;
+ private CapacitySchedulerConfiguration conf;
+ private final ResourceCalculator resourceCalculator;
+ private final Map<String, CSQueue> nameToCSQueues;
+ private final Map<String, Resource> partitionToResource;
+
+ MockQueueHierarchy(String config,
+ CapacityScheduler cs,
+ CapacitySchedulerConfiguration conf,
+ ResourceCalculator resourceCalculator,
+ Map<String, Resource> partitionToResource) {
+ this.config = config;
+ this.cs = cs;
+ this.conf = conf;
+ this.resourceCalculator = resourceCalculator;
+ this.nameToCSQueues = new HashMap<>();
+ this.partitionToResource = partitionToResource;
+ this.rootQueue = init();
+ }
+
+ public ParentQueue getRootQueue() {
+ return rootQueue;
+ }
+
+ Map<String, CSQueue> getNameToCSQueues() {
+ return nameToCSQueues;
+ }
+
+ /**
+ * Format is:
+ * <pre>
+ * root (<partition-name-1>=[guaranteed max used pending (reserved)],<partition-name-2>=..);
+ * -A(...);
+ * --A1(...);
+ * --A2(...);
+ * -B...
+ * </pre>
+ * ";" splits queues, and there should no empty lines, no extra spaces
+ *
+ * For each queue, it has configurations to specify capacities (to each
+ * partition), format is:
+ * <pre>
+ * -<queueName> (<labelName1>=[guaranteed max used pending], \
+ * <labelName2>=[guaranteed max used pending])
+ * {key1=value1,key2=value2}; // Additional configs
+ * </pre>
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private ParentQueue init() {
+ String[] queueExprArray = config.split(";");
+ ParentQueue rootQueue = null;
+ for (int idx = 0; idx < queueExprArray.length; idx++) {
+ String q = queueExprArray[idx];
+ CSQueue queue;
+
+ // Initialize queue
+ if (isParent(queueExprArray, idx)) {
+ ParentQueue parentQueue = mock(ParentQueue.class);
+ queue = parentQueue;
+ List<CSQueue> children = new ArrayList<>();
+ when(parentQueue.getChildQueues()).thenReturn(children);
+ QueueOrderingPolicy policy = mock(QueueOrderingPolicy.class);
+ when(policy.getConfigName()).thenReturn(
+ CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
+ when(parentQueue.getQueueOrderingPolicy()).thenReturn(policy);
+ } else {
+ LeafQueue leafQueue = mock(LeafQueue.class);
+ final TreeSet<FiCaSchedulerApp> apps = new TreeSet<>(
+ new Comparator<FiCaSchedulerApp>() {
+ @Override
+ public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
+ if (a1.getPriority() != null
+ && !a1.getPriority().equals(a2.getPriority())) {
+ return a1.getPriority().compareTo(a2.getPriority());
+ }
+
+ return a1.getApplicationId()
+ .compareTo(a2.getApplicationId());
+ }
+ });
+ when(leafQueue.getApplications()).thenReturn(apps);
+ when(leafQueue.getAllApplications()).thenReturn(apps);
+ OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
+ String opName = conf.get(CapacitySchedulerConfiguration.PREFIX
+ + CapacitySchedulerConfiguration.ROOT + "." + getQueueName(q)
+ + ".ordering-policy", "fifo");
+ if (opName.equals("fair")) {
+ so = Mockito.spy(new FairOrderingPolicy<>());
+ }
+ when(so.getPreemptionIterator()).thenAnswer(new Answer() {
+ public Object answer(InvocationOnMock invocation) {
+ return apps.descendingIterator();
+ }
+ });
+ when(leafQueue.getOrderingPolicy()).thenReturn(so);
+
+ Map<String, TreeSet<RMContainer>> ignorePartitionContainers =
+ new HashMap<>();
+ when(leafQueue.getIgnoreExclusivityRMContainers()).thenReturn(
+ ignorePartitionContainers);
+ queue = leafQueue;
+ }
+
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ when(queue.getReadLock()).thenReturn(lock.readLock());
+ setupQueue(queue, q, queueExprArray, idx);
+ if (queue.getQueuePath().equals(ROOT)) {
+ rootQueue = (ParentQueue) queue;
+ }
+ }
+ return rootQueue;
+ }
+
+ private void setupQueue(CSQueue queue, String q, String[] queueExprArray,
+ int idx) {
+ LOG.debug("*** Setup queue, source=" + q);
+ String queuePath = null;
+
+ int myLevel = getLevel(q);
+ if (0 == myLevel) {
+ // It's root
+ when(queue.getQueuePath()).thenReturn(ROOT);
+ queuePath = ROOT;
+ }
+
+ String queueName = getQueueName(q);
+ when(queue.getQueueName()).thenReturn(queueName);
+
+ // Setup parent queue, and add myself to parentQueue.children-list
+ ParentQueue parentQueue = getParentQueue(queueExprArray, idx, myLevel);
+ if (null != parentQueue) {
+ when(queue.getParent()).thenReturn(parentQueue);
+ parentQueue.getChildQueues().add(queue);
+
+ // Setup my path
+ queuePath = parentQueue.getQueuePath() + "." + queueName;
+ }
+ when(queue.getQueuePath()).thenReturn(queuePath);
+
+ QueueCapacities qc = new QueueCapacities(0 == myLevel);
+ ResourceUsage ru = new ResourceUsage();
+ QueueResourceQuotas qr = new QueueResourceQuotas();
+
+ when(queue.getQueueCapacities()).thenReturn(qc);
+ when(queue.getQueueResourceUsage()).thenReturn(ru);
+ when(queue.getQueueResourceQuotas()).thenReturn(qr);
+
+ LOG.debug("Setup queue, short name=" + queue.getQueueName() + " path="
+ + queue.getQueuePath());
+ LOG.debug("Parent=" + (parentQueue == null ? "null" : parentQueue
+ .getQueuePath()));
+
+ // Setup other fields like used resource, guaranteed resource, etc.
+ String capacitySettingStr = q.substring(q.indexOf("(") + 1, q.indexOf(")"));
+ for (String s : capacitySettingStr.split(",")) {
+ String partitionName = s.substring(0, s.indexOf("="));
+ String[] values = s.substring(s.indexOf("[") + 1, s.indexOf("]")).split(" ");
+ // Add a small epsilon to capacities to avoid truncate when doing
+ // Resources.multiply
+ float epsilon = 1e-6f;
+ Resource toResourcePerPartition = partitionToResource.get(partitionName);
+ float absGuaranteed = Resources.divide(resourceCalculator, toResourcePerPartition,
+ parseResourceFromString(values[0].trim()), toResourcePerPartition)
+ + epsilon;
+ float absMax = Resources.divide(resourceCalculator, toResourcePerPartition,
+ parseResourceFromString(values[1].trim()), toResourcePerPartition)
+ + epsilon;
+ float absUsed = Resources.divide(resourceCalculator, toResourcePerPartition,
+ parseResourceFromString(values[2].trim()), toResourcePerPartition)
+ + epsilon;
+ float used = Resources.divide(resourceCalculator, toResourcePerPartition,
+ parseResourceFromString(values[2].trim()),
+ parseResourceFromString(values[0].trim())) + epsilon;
+ Resource pending = parseResourceFromString(values[3].trim());
+ qc.setAbsoluteCapacity(partitionName, absGuaranteed);
+ qc.setAbsoluteMaximumCapacity(partitionName, absMax);
+ qc.setAbsoluteUsedCapacity(partitionName, absUsed);
+ qc.setUsedCapacity(partitionName, used);
+ qr.setEffectiveMaxResource(parseResourceFromString(values[1].trim()));
+ qr.setEffectiveMinResource(parseResourceFromString(values[0].trim()));
+ qr.setEffectiveMaxResource(partitionName,
+ parseResourceFromString(values[1].trim()));
+ qr.setEffectiveMinResource(partitionName,
+ parseResourceFromString(values[0].trim()));
+ when(queue.getUsedCapacity()).thenReturn(used);
+ when(queue.getEffectiveCapacity(partitionName))
+ .thenReturn(parseResourceFromString(values[0].trim()));
+ when(queue.getEffectiveMaxCapacity(partitionName))
+ .thenReturn(parseResourceFromString(values[1].trim()));
+ ru.setPending(partitionName, pending);
+ // Setup reserved resource if it contained by input config
+ Resource reserved = Resources.none();
+ if(values.length == 5) {
+ reserved = parseResourceFromString(values[4].trim());
+ ru.setReserved(partitionName, reserved);
+ }
+ if (!isParent(queueExprArray, idx)) {
+ LeafQueue lq = (LeafQueue) queue;
+ when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
+ isA(String.class), eq(false))).thenReturn(pending);
+ when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
+ isA(String.class), eq(true))).thenReturn(
+ Resources.subtract(pending, reserved));
+ }
+ ru.setUsed(partitionName, parseResourceFromString(values[2].trim()));
+
+ LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
+ + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
+ + ",abs_used" + absUsed + ",pending_resource=" + pending
+ + ", reserved_resource=" + reserved + "]");
+ }
+
+ // Setup preemption disabled
+ when(queue.getPreemptionDisabled()).thenReturn(
+ conf.getPreemptionDisabled(queuePath, false));
+
+ // Setup other queue configurations
+ Map<String, String> otherConfigs = getOtherConfigurations(
+ queueExprArray[idx]);
+ if (otherConfigs.containsKey("priority")) {
+ when(queue.getPriority()).thenReturn(
+ Priority.newInstance(Integer.valueOf(otherConfigs.get("priority"))));
+ } else {
+ // set queue's priority to 0 by default
+ when(queue.getPriority()).thenReturn(Priority.newInstance(0));
+ }
+
+ // Setup disable preemption of queues
+ if (otherConfigs.containsKey("disable_preemption")) {
+ when(queue.getPreemptionDisabled()).thenReturn(
+ Boolean.valueOf(otherConfigs.get("disable_preemption")));
+ }
+
+ //TODO: Refactor this test class to use queue path internally like CS
+ // does from now on
+ nameToCSQueues.put(queuePath, queue);
+ nameToCSQueues.put(queueName, queue);
+ when(cs.getQueue(eq(queuePath))).thenReturn(queue);
+ when(cs.getQueue(eq(queueName))).thenReturn(queue);
+ when(cs.normalizeQueueName(eq(queuePath))).thenReturn(queuePath);
+ when(cs.normalizeQueueName(eq(queueName))).thenReturn(queuePath);
+ }
+
+ /**
+ * Get additional queue's configurations
+ * @param queueExpr queue expr
+ * @return maps of configs
+ */
+ private Map<String, String> getOtherConfigurations(String queueExpr) {
+ if (queueExpr.contains("{")) {
+ int left = queueExpr.indexOf('{');
+ int right = queueExpr.indexOf('}');
+
+ if (right > left) {
+ Map<String, String> configs = new HashMap<>();
+
+ String subStr = queueExpr.substring(left + 1, right);
+ for (String kv : subStr.split(",")) {
+ if (kv.contains("=")) {
+ String key = kv.substring(0, kv.indexOf("="));
+ String value = kv.substring(kv.indexOf("=") + 1);
+ configs.put(key, value);
+ }
+ }
+
+ return configs;
+ }
+ }
+
+ return Collections.emptyMap();
+ }
+
+ private String getQueueName(String q) {
+ int idx = 0;
+ // find first != '-' char
+ while (idx < q.length() && q.charAt(idx) == '-') {
+ idx++;
+ }
+ if (idx == q.length()) {
+ throw new IllegalArgumentException("illegal input:" + q);
+ }
+ // name = after '-' and before '('
+ String name = q.substring(idx, q.indexOf('('));
+ if (name.isEmpty()) {
+ throw new IllegalArgumentException("queue name shouldn't be empty:" + q);
+ }
+ if (name.contains(".")) {
+ throw new IllegalArgumentException("queue name shouldn't contain '.':"
+ + name);
+ }
+ return name;
+ }
+
+ private ParentQueue getParentQueue(String[] queueExprArray, int idx, int myLevel) {
+ idx--;
+ while (idx >= 0) {
+ int level = getLevel(queueExprArray[idx]);
+ if (level < myLevel) {
+ String parentQueueName = getQueueName(queueExprArray[idx]);
+ return (ParentQueue) nameToCSQueues.get(parentQueueName);
+ }
+ idx--;
+ }
+
+ return null;
+ }
+
+ /**
+ * Get if a queue is ParentQueue
+ */
+ private boolean isParent(String[] queues, int idx) {
+ int myLevel = getLevel(queues[idx]);
+ idx++;
+ while (idx < queues.length && getLevel(queues[idx]) == myLevel) {
+ idx++;
+ }
+ if (idx >= queues.length || getLevel(queues[idx]) < myLevel) {
+ // It's a LeafQueue
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ /**
+ * Level of a queue is how many "-" at beginning, root's level is 0
+ */
+ private int getLevel(String q) {
+ int level = 0; // level = how many "-" at beginning
+ while (level < q.length() && q.charAt(level) == '-') {
+ level++;
+ }
+ return level;
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockSchedulerNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockSchedulerNodes.java
new file mode 100644
index 0000000..cdea35a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockSchedulerNodes.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework.parseResourceFromString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class MockSchedulerNodes {
+ private static final Logger LOG = LoggerFactory.getLogger(MockSchedulerNodes.class);
+ private String config;
+ private Map<NodeId, FiCaSchedulerNode> nodeIdToSchedulerNodes = new HashMap<>();
+
+ MockSchedulerNodes(String config) {
+ this.config = config;
+ init();
+ }
+
+ /**
+ * Format is:
+ * host1=partition[ res=resource];
+ * host2=partition[ res=resource];
+ */
+ private void init() {
+ String[] nodesConfigStrArray = config.split(";");
+ for (String p : nodesConfigStrArray) {
+ String[] arr = p.split(" ");
+
+ NodeId nodeId = NodeId.newInstance(arr[0].substring(0, arr[0].indexOf("=")), 1);
+ String partition = arr[0].substring(arr[0].indexOf("=") + 1);
+
+ FiCaSchedulerNode sn = mock(FiCaSchedulerNode.class);
+ when(sn.getNodeID()).thenReturn(nodeId);
+ when(sn.getPartition()).thenReturn(partition);
+
+ Resource totalRes = Resources.createResource(0);
+ if (arr.length > 1) {
+ String res = arr[1];
+ if (res.contains("res=")) {
+ String resString = res.substring(
+ res.indexOf("res=") + "res=".length());
+ totalRes = parseResourceFromString(resString);
+ }
+ }
+ when(sn.getTotalResource()).thenReturn(totalRes);
+ when(sn.getUnallocatedResource()).thenReturn(Resources.clone(totalRes));
+
+ // TODO, add settings of killable resources when necessary
+ when(sn.getTotalKillableResources()).thenReturn(Resources.none());
+
+ List<RMContainer> liveContainers = new ArrayList<>();
+ when(sn.getCopiedListOfRunningContainers()).thenReturn(liveContainers);
+
+ nodeIdToSchedulerNodes.put(nodeId, sn);
+
+ LOG.debug("add scheduler node, id=" + nodeId + ", partition=" + partition);
+ }
+ }
+
+ Map<NodeId, FiCaSchedulerNode> getNodeIdToSchedulerNodes() {
+ return nodeIdToSchedulerNodes;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/ProportionalCapacityPreemptionPolicyMockFramework.java
new file mode 100644
index 0000000..024ec86
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/ProportionalCapacityPreemptionPolicyMockFramework.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework;
+
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicyForNodePartitions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.mockito.ArgumentMatcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.event.Event;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ProportionalCapacityPreemptionPolicyMockFramework {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestProportionalCapacityPreemptionPolicyForNodePartitions.class);
+ private static final double ALLOWED_CAPACITY_DELTA = 1e-3;
+
+ private Map<String, CSQueue> nameToCSQueues;
+ private Map<String, Resource> partitionToResource;
+ private Map<NodeId, FiCaSchedulerNode> nodeIdToSchedulerNodes;
+ private RMNodeLabelsManager nodeLabelsManager;
+ public RMContext rmContext;
+
+ public ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
+ public Clock mClock;
+ public CapacitySchedulerConfiguration conf;
+ public CapacityScheduler cs;
+ @SuppressWarnings("rawtypes")
+ public EventHandler<Event> eventHandler;
+ public ProportionalCapacityPreemptionPolicy policy;
+ private Resource clusterResource;
+ // Initialize resource map
+ public Map<String, ResourceInformation> riMap = new HashMap<>();
+
+ private void resetResourceInformationMap() {
+ // Initialize mandatory resources
+ ResourceInformation memory = ResourceInformation.newInstance(
+ ResourceInformation.MEMORY_MB.getName(),
+ ResourceInformation.MEMORY_MB.getUnits(),
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+ ResourceInformation vcores = ResourceInformation.newInstance(
+ ResourceInformation.VCORES.getName(),
+ ResourceInformation.VCORES.getUnits(),
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+ riMap.put(ResourceInformation.MEMORY_URI, memory);
+ riMap.put(ResourceInformation.VCORES_URI, vcores);
+
+ ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Before
+ public void setup() {
+ resetResourceInformationMap();
+
+ org.apache.log4j.Logger.getRootLogger().setLevel(
+ org.apache.log4j.Level.DEBUG);
+
+ conf = new CapacitySchedulerConfiguration(new Configuration(false));
+ conf.setLong(
+ CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 10000);
+ conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
+ 3000);
+ // report "ideal" preempt
+ conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
+ (float) 1.0);
+ conf.setFloat(
+ CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
+ (float) 1.0);
+
+ mClock = mock(Clock.class);
+ cs = mock(CapacityScheduler.class);
+ when(cs.getResourceCalculator()).thenReturn(resourceCalculator);
+ when(cs.getPreemptionManager()).thenReturn(new PreemptionManager());
+ when(cs.getConfiguration()).thenReturn(conf);
+
+ nodeLabelsManager = mock(RMNodeLabelsManager.class);
+ eventHandler = mock(EventHandler.class);
+
+ rmContext = mock(RMContext.class);
+ when(rmContext.getNodeLabelManager()).thenReturn(nodeLabelsManager);
+ Dispatcher dispatcher = mock(Dispatcher.class);
+ when(rmContext.getDispatcher()).thenReturn(dispatcher);
+ when(dispatcher.getEventHandler()).thenReturn(eventHandler);
+ when(cs.getRMContext()).thenReturn(rmContext);
+
+ partitionToResource = new HashMap<>();
+ nodeIdToSchedulerNodes = new HashMap<>();
+ nameToCSQueues = new HashMap<>();
+ clusterResource = Resource.newInstance(0, 0);
+ }
+
+ @After
+ public void cleanup() {
+ resetResourceInformationMap();
+ }
+
+ public void buildEnv(String labelsConfig, String nodesConfig,
+ String queuesConfig, String appsConfig) throws IOException {
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, false);
+ }
+
+ public void buildEnv(String labelsConfig, String nodesConfig,
+ String queuesConfig, String appsConfig,
+ boolean useDominantResourceCalculator) throws IOException {
+ if (useDominantResourceCalculator) {
+ when(cs.getResourceCalculator()).thenReturn(
+ new DominantResourceCalculator());
+ }
+
+ MockNodeLabelsManager mockNodeLabelsManager =
+ new MockNodeLabelsManager(labelsConfig,
+ nodeLabelsManager, partitionToResource);
+ clusterResource = mockNodeLabelsManager.getClusterResource();
+
+ MockSchedulerNodes mockSchedulerNodes =
+ new MockSchedulerNodes(nodesConfig);
+ nodeIdToSchedulerNodes = mockSchedulerNodes.getNodeIdToSchedulerNodes();
+ addNodeIdDataToScheduler();
+
+ ParentQueue root = parseQueueConfig(queuesConfig);
+
+ when(cs.getRootQueue()).thenReturn(root);
+ when(cs.getClusterResource()).thenReturn(clusterResource);
+ new MockApplications(appsConfig, resourceCalculator, nameToCSQueues,
+ partitionToResource, nodeIdToSchedulerNodes);
+
+ policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
+ }
+
+ private ParentQueue parseQueueConfig(String queuesConfig) {
+ MockQueueHierarchy mockQueueHierarchy =
+ new MockQueueHierarchy(queuesConfig, cs, conf, resourceCalculator,
+ partitionToResource);
+ this.nameToCSQueues = mockQueueHierarchy.getNameToCSQueues();
+ return mockQueueHierarchy.getRootQueue();
+ }
+
+ private void addNodeIdDataToScheduler() {
+ for (NodeId nodeId : nodeIdToSchedulerNodes.keySet()) {
+ when(cs.getSchedulerNode(nodeId)).thenReturn(
+ nodeIdToSchedulerNodes.get(nodeId));
+ }
+ List<FiCaSchedulerNode> allNodes = new ArrayList<>(
+ nodeIdToSchedulerNodes.values());
+ when(cs.getAllNodes()).thenReturn(allNodes);
+ }
+
+ protected void updateQueueConfig(String queuesConfig) {
+ ParentQueue root = parseQueueConfig(queuesConfig);
+ when(cs.getRootQueue()).thenReturn(root);
+ }
+
+ //TODO this can probably be replaced with some parser logic already implemented somewhere
+ static Resource parseResourceFromString(String resString) {
+ String[] resource = resString.split(":");
+ Resource res;
+ if (resource.length == 1) {
+ res = Resources.createResource(Integer.valueOf(resource[0]));
+ } else {
+ res = Resources.createResource(Integer.valueOf(resource[0]),
+ Integer.valueOf(resource[1]));
+ if (resource.length > 2) {
+ // Using the same order of resources from ResourceUtils, set resource
+ // information.
+ ResourceInformation[] storedResourceInfo = ResourceUtils
+ .getResourceTypesArray();
+ for (int i = 2; i < resource.length; i++) {
+ res.setResourceInformation(storedResourceInfo[i].getName(),
+ ResourceInformation.newInstance(storedResourceInfo[i].getName(),
+ storedResourceInfo[i].getUnits(),
+ Integer.valueOf(resource[i])));
+ }
+ }
+ }
+ return res;
+ }
+
+ public ApplicationAttemptId getAppAttemptId(int id) {
+ ApplicationId appId = ApplicationId.newInstance(0L, id);
+ return ApplicationAttemptId.newInstance(appId, 1);
+ }
+
+ protected void checkContainerNodesInApp(FiCaSchedulerApp app,
+ int expectedContainersNumber, String host) {
+ NodeId nodeId = NodeId.newInstance(host, 1);
+ int num = 0;
+ for (RMContainer c : app.getLiveContainers()) {
+ if (c.getAllocatedNode().equals(nodeId)) {
+ num++;
+ }
+ }
+ for (RMContainer c : app.getReservedContainers()) {
+ if (c.getAllocatedNode().equals(nodeId)) {
+ num++;
+ }
+ }
+ Assert.assertEquals(expectedContainersNumber, num);
+ }
+
+ public FiCaSchedulerApp getApp(String queueName, int appId) {
+ for (FiCaSchedulerApp app : ((LeafQueue) cs.getQueue(queueName))
+ .getApplications()) {
+ if (app.getApplicationId().getId() == appId) {
+ return app;
+ }
+ }
+ return null;
+ }
+
+ protected void checkAbsCapacities(CSQueue queue, String partition,
+ float guaranteed, float max, float used) {
+ QueueCapacities qc = queue.getQueueCapacities();
+ Assert.assertEquals(guaranteed, qc.getAbsoluteCapacity(partition),
+ ALLOWED_CAPACITY_DELTA);
+ Assert.assertEquals(max, qc.getAbsoluteMaximumCapacity(partition),
+ ALLOWED_CAPACITY_DELTA);
+ Assert.assertEquals(used, qc.getAbsoluteUsedCapacity(partition),
+ ALLOWED_CAPACITY_DELTA);
+ }
+
+ protected void checkPendingResource(CSQueue queue, String partition,
+ int pending) {
+ ResourceUsage ru = queue.getQueueResourceUsage();
+ Assert.assertEquals(pending, ru.getPending(partition).getMemorySize());
+ }
+
+ protected void checkPriority(CSQueue queue, int expectedPriority) {
+ Assert.assertEquals(expectedPriority, queue.getPriority().getPriority());
+ }
+
+ protected void checkReservedResource(CSQueue queue, String partition,
+ int reserved) {
+ ResourceUsage ru = queue.getQueueResourceUsage();
+ Assert.assertEquals(reserved, ru.getReserved(partition).getMemorySize());
+ }
+
+ public static class IsPreemptionRequestForQueueAndNode
+ implements ArgumentMatcher<ContainerPreemptEvent> {
+ private final ApplicationAttemptId appAttId;
+ private final String queueName;
+ private final NodeId nodeId;
+
+ public IsPreemptionRequestForQueueAndNode(ApplicationAttemptId appAttId,
+ String queueName, NodeId nodeId) {
+ this.appAttId = appAttId;
+ this.queueName = queueName;
+ this.nodeId = nodeId;
+ }
+ @Override
+ public boolean matches(ContainerPreemptEvent cpe) {
+ return appAttId.equals(cpe.getAppId())
+ && queueName.equals(cpe.getContainer().getQueueName())
+ && nodeId.equals(cpe.getContainer().getAllocatedNode());
+ }
+ @Override
+ public String toString() {
+ return appAttId.toString();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org