You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by eb...@apache.org on 2019/12/20 19:33:36 UTC
[hadoop] branch branch-3.2 updated: YARN-10009. In Capacity
Scheduler,
DRC can treat minimum user limit percent as a max when custom resource is
defined. Contributed by Eric Payne
This is an automated email from the ASF dual-hosted git repository.
ebadger pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 355ec33 YARN-10009. In Capacity Scheduler, DRC can treat minimum user limit percent as a max when custom resource is defined. Contributed by Eric Payne
355ec33 is described below
commit 355ec3341685deb93a40daffa3f28965b7609286
Author: Eric Badger <eb...@verizonmedia.com>
AuthorDate: Fri Dec 20 19:32:36 2019 +0000
YARN-10009. In Capacity Scheduler, DRC can treat minimum user limit percent as a max when custom resource is defined. Contributed by Eric Payne
---
.../util/resource/DominantResourceCalculator.java | 12 +-
...estCapacitySchedulerWithMultiResourceTypes.java | 140 +++++++++++++++++++++
2 files changed, 148 insertions(+), 4 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
index e472149..f2ebffc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
@@ -428,10 +428,14 @@ public class DominantResourceCalculator extends ResourceCalculator {
@Override
public Resource divideAndCeil(Resource numerator, float denominator) {
- return Resources.createResource(
- divideAndCeil(numerator.getMemorySize(), denominator),
- divideAndCeil(numerator.getVirtualCores(), denominator)
- );
+ Resource ret = Resource.newInstance(numerator);
+ int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+ for (int i = 0; i < maxLength; i++) {
+ ResourceInformation resourceInformation = ret.getResourceInformation(i);
+ resourceInformation
+ .setValue(divideAndCeil(resourceInformation.getValue(), denominator));
+ }
+ return ret;
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerWithMultiResourceTypes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerWithMultiResourceTypes.java
index 720e787..fa2b9935 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerWithMultiResourceTypes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerWithMultiResourceTypes.java
@@ -18,7 +18,15 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -28,12 +36,24 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@@ -343,4 +363,124 @@ public class TestCapacitySchedulerWithMultiResourceTypes {
Assert.assertEquals(0, report_nm1.getAvailableResource()
.getResourceInformation(ResourceInformation.GPU_URI).getValue());
}
+
+
+ @Test(timeout = 300000)
+ public void testConsumeAllExtendedResourcesWithSmallMinUserLimitPct()
+ throws Exception {
+ int GB = 1024;
+
+ // Initialize resource map for 3 types.
+ Map<String, ResourceInformation> riMap = new HashMap<>();
+
+ // Initialize mandatory resources
+ ResourceInformation memory = ResourceInformation.newInstance(
+ ResourceInformation.MEMORY_MB.getName(),
+ ResourceInformation.MEMORY_MB.getUnits(),
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+ ResourceInformation vcores = ResourceInformation.newInstance(
+ ResourceInformation.VCORES.getName(),
+ ResourceInformation.VCORES.getUnits(),
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+ ResourceInformation res1 = ResourceInformation.newInstance("res_1",
+ "", 0, 10);
+ riMap.put(ResourceInformation.MEMORY_URI, memory);
+ riMap.put(ResourceInformation.VCORES_URI, vcores);
+ riMap.put("res_1", res1);
+
+ ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
+
+ CapacitySchedulerConfiguration csconf =
+ new CapacitySchedulerConfiguration();
+ csconf.set("yarn.resource-types", "res_1");
+ csconf.set("yarn.resource-types.res_1.minimum-allocation", "0");
+ csconf.set("yarn.resource-types.res_1.maximum-allocation", "10");
+ csconf.setResourceComparator(DominantResourceCalculator.class);
+
+ // Define top-level queues
+ csconf.setQueues(CapacitySchedulerConfiguration.ROOT,
+ new String[] {"a", "b"});
+
+ // Set each queue to contain 50% each.
+ csconf.setCapacity(A_QUEUE, A_CAPACITY);
+ csconf.setCapacity(B_QUEUE, B_CAPACITY);
+ csconf.setMaximumCapacity(A_QUEUE, 100.0f);
+ csconf.setUserLimitFactor(A_QUEUE, 2);
+
+ YarnConfiguration yarnConf = new YarnConfiguration(csconf);
+ // Don't reset resource types since we have already configured resource
+ // types
+ yarnConf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES,
+ false);
+ yarnConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+
+ MockRM rm = new MockRM(yarnConf);
+ rm.start();
+
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ LeafQueue qb = (LeafQueue)cs.getQueue("a");
+ // Setting minimum user limit percent should not affect max user resource
+ // limit using extended resources with DRF (see YARN-10009).
+ qb.setUserLimit(25);
+
+ // add app 1
+ ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
+ ApplicationAttemptId appAttemptId =
+ BuilderUtils.newApplicationAttemptId(appId, 1);
+
+ RMAppAttemptMetrics attemptMetric =
+ new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
+ RMAppImpl app = mock(RMAppImpl.class);
+ when(app.getApplicationId()).thenReturn(appId);
+ RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
+ Container container = mock(Container.class);
+ when(attempt.getMasterContainer()).thenReturn(container);
+ ApplicationSubmissionContext submissionContext = mock(
+ ApplicationSubmissionContext.class);
+ when(attempt.getSubmissionContext()).thenReturn(submissionContext);
+ when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
+ when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
+ when(app.getCurrentAppAttempt()).thenReturn(attempt);
+
+ rm.getRMContext().getRMApps().put(appId, app);
+
+ SchedulerEvent addAppEvent =
+ new AppAddedSchedulerEvent(appId, "a", "user1");
+ cs.handle(addAppEvent);
+ SchedulerEvent addAttemptEvent =
+ new AppAttemptAddedSchedulerEvent(appAttemptId, false);
+ cs.handle(addAttemptEvent);
+
+ // add nodes to cluster. Cluster has 20GB, 20 vcores, 80 res_1s.
+ HashMap<String, Long> resMap = new HashMap<String, Long>();
+ resMap.put("res_1", 80L);
+ Resource newResource = Resource.newInstance(2048 * GB, 100, resMap);
+ RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
+ cs.handle(new NodeAddedSchedulerEvent(node));
+
+ FiCaSchedulerApp fiCaApp1 =
+ cs.getSchedulerApplications().get(app.getApplicationId())
+ .getCurrentAppAttempt();
+
+ // allocate 8 containers for app1 with 1GB memory, 1 vcore, 10 res_1s
+ for (int i = 0; i < 8; i++) {
+ fiCaApp1.updateResourceRequests(Collections.singletonList(
+ ResourceRequest.newBuilder()
+ .capability(TestUtils.createResource(1 * GB, 1,
+ ImmutableMap.of("res_1", 10)))
+ .numContainers(1)
+ .resourceName("*")
+ .build()));
+ cs.handle(new NodeUpdateSchedulerEvent(node));
+ }
+ Assert.assertEquals(8*GB, fiCaApp1.getCurrentConsumption().getMemorySize());
+ Assert.assertEquals(80,
+ fiCaApp1.getCurrentConsumption()
+ .getResourceInformation("res_1").getValue());
+
+ rm.close();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org