You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2021/01/26 16:36:19 UTC

[hadoop] 01/02: YARN-10531. Be able to disable user limit factor for CapacityScheduler Leaf Queue. (Qi Zhu via wangda)

This is an automated email from the ASF dual-hosted git repository.

wangda pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit b7384a8d02ae3cace8c7b5eae84f8f3916a0b177
Author: Wangda Tan <wa...@apache.org>
AuthorDate: Thu Jan 21 09:27:37 2021 -0800

    YARN-10531. Be able to disable user limit factor for CapacityScheduler Leaf Queue. (Qi Zhu via wangda)
    
    Change-Id: I670e5525619b320745254609c48e7e1afb084835
---
 .../scheduler/capacity/AbstractCSQueue.java        |   9 +-
 .../scheduler/capacity/LeafQueue.java              |  26 ++++-
 .../scheduler/capacity/ParentQueue.java            |   4 +-
 .../scheduler/capacity/PlanQueue.java              |   3 +
 .../scheduler/capacity/UsersManager.java           |  12 ++-
 .../TestCapacitySchedulerNewQueueAutoCreation.java |  35 +++++++
 .../scheduler/capacity/TestLeafQueue.java          | 110 ++++++++++++++++++++-
 7 files changed, 190 insertions(+), 9 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index bc3ff22..12ce05f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -1535,8 +1535,13 @@ public abstract class AbstractCSQueue implements CSQueue {
       leafQueue.setMaxApplications(maxApplications);
 
       int maxApplicationsPerUser = Math.min(maxApplications,
-          (int) (maxApplications * (leafQueue.getUsersManager().getUserLimit()
-              / 100.0f) * leafQueue.getUsersManager().getUserLimitFactor()));
+          (int) (maxApplications
+              * (leafQueue.getUsersManager().getUserLimit() / 100.0f)
+              * leafQueue.getUsersManager().getUserLimitFactor()));
+      if (leafQueue.getUsersManager().getUserLimitFactor() == -1) {
+        maxApplicationsPerUser =  maxApplications;
+      }
+
       leafQueue.setMaxApplicationsPerUser(maxApplicationsPerUser);
       LOG.info("LeafQueue:" + leafQueue.getQueuePath() + ", maxApplications="
           + maxApplications + ", maxApplicationsPerUser="
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 15c321f..6bf8d0a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -708,16 +708,33 @@ public class LeafQueue extends AbstractCSQueue {
           queueCapacities.getMaxAMResourcePercentage(nodePartition)
               * effectiveUserLimit * usersManager.getUserLimitFactor(),
           minimumAllocation);
+
+      if (getUserLimitFactor() == -1) {
+        userAMLimit = Resources.multiplyAndNormalizeUp(
+            resourceCalculator, queuePartitionResource,
+            queueCapacities.getMaxAMResourcePercentage(nodePartition),
+            minimumAllocation);
+      }
+
       userAMLimit =
           Resources.min(resourceCalculator, lastClusterResource,
               userAMLimit,
               Resources.clone(getAMResourceLimitPerPartition(nodePartition)));
 
-      Resource preWeighteduserAMLimit = Resources.multiplyAndNormalizeUp(
+      Resource preWeighteduserAMLimit =
+          Resources.multiplyAndNormalizeUp(
           resourceCalculator, queuePartitionResource,
           queueCapacities.getMaxAMResourcePercentage(nodePartition)
               * preWeightedUserLimit * usersManager.getUserLimitFactor(),
           minimumAllocation);
+
+      if (getUserLimitFactor() == -1) {
+        preWeighteduserAMLimit = Resources.multiplyAndNormalizeUp(
+            resourceCalculator, queuePartitionResource,
+            queueCapacities.getMaxAMResourcePercentage(nodePartition),
+            minimumAllocation);
+      }
+
       preWeighteduserAMLimit =
           Resources.min(resourceCalculator, lastClusterResource,
               preWeighteduserAMLimit,
@@ -1896,9 +1913,14 @@ public class LeafQueue extends AbstractCSQueue {
       maxApplications =
           (int) (maxSystemApps * queueCapacities.getAbsoluteCapacity());
     }
-    maxApplicationsPerUser = Math.min(maxApplications,
+    maxApplicationsPerUser =
+        Math.min(maxApplications,
         (int) (maxApplications * (usersManager.getUserLimit() / 100.0f)
             * usersManager.getUserLimitFactor()));
+
+    if (getUserLimitFactor() == -1) {
+      maxApplicationsPerUser = maxApplications;
+    }
   }
 
   @Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 0a2f082..b412e8a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -478,8 +478,8 @@ public class ParentQueue extends AbstractCSQueue {
         new CapacitySchedulerConfiguration(
         csContext.getConfiguration(), false);
     if (isLeaf) {
-      // FIXME: Ideally we should disable user limit factor, see YARN-10531
-      // dupCSConfig.setUserLimitFactor(childQueuePath, );
+      // set to -1, to disable it
+      dupCSConfig.setUserLimitFactor(childQueuePath, -1);
 
       // Set Max AM percentage to a higher value
       dupCSConfig.setMaximumApplicationMasterResourcePerQueuePercent(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
index 4dd3317..f2b0e5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
@@ -64,6 +64,9 @@ public class PlanQueue extends AbstractManagedParentQueue {
     float userLimitFactor = conf.getUserLimitFactor(queuePath);
     int maxAppsPerUserForReservation =
         (int) (maxAppsForReservation * (userLimit / 100.0f) * userLimitFactor);
+    if (userLimitFactor == -1) {
+      maxAppsPerUserForReservation = maxAppsForReservation;
+    }
     updateQuotas(userLimit, userLimitFactor, maxAppsForReservation,
         maxAppsPerUserForReservation);
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
index 14766e9..6f7d8f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
@@ -791,8 +791,16 @@ public class UsersManager implements AbstractUsersManager {
     // IGNORE_PARTITION_EXCLUSIVITY allocation.
     Resource maxUserLimit = Resources.none();
     if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
-      maxUserLimit = Resources.multiplyAndRoundDown(queueCapacity,
-          getUserLimitFactor());
+      // If user-limit-factor set to -1, we should disabled user limit.
+      if (getUserLimitFactor() != -1) {
+        maxUserLimit = Resources.multiplyAndRoundDown(queueCapacity,
+            getUserLimitFactor());
+      } else {
+        maxUserLimit = lQueue.
+            getEffectiveMaxCapacityDown(
+                nodePartition, lQueue.getMinimumAllocation());
+      }
+
     } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
       maxUserLimit = partitionResource;
     }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java
index eada112..0c5375e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java
@@ -461,6 +461,41 @@ public class TestCapacitySchedulerNewQueueAutoCreation
             "for auto queue creation",
         ((ParentQueue)empty).isEligibleForAutoQueueCreation());
   }
+  
+  public void testAutoCreateQueueUserLimitDisabled() throws Exception {
+    startScheduler();
+    createBasicQueueStructureAndValidate();
+
+    submitApp(cs, USER0, USER0, "root.e-auto");
+
+    AbstractCSQueue e = (AbstractCSQueue) cs.getQueue("root.e-auto");
+    Assert.assertNotNull(e);
+    Assert.assertTrue(e.isDynamicQueue());
+
+    AbstractCSQueue user0 = (AbstractCSQueue) cs.getQueue(
+        "root.e-auto." + USER0);
+    Assert.assertNotNull(user0);
+    Assert.assertTrue(user0.isDynamicQueue());
+    Assert.assertTrue(user0 instanceof LeafQueue);
+
+    LeafQueue user0LeafQueue = (LeafQueue)user0;
+
+    // Assert user limit factor is -1
+    Assert.assertTrue(user0LeafQueue.getUserLimitFactor() == -1);
+
+    // Assert user max applications not limited
+    Assert.assertEquals(user0LeafQueue.getMaxApplicationsPerUser(),
+        user0LeafQueue.getMaxApplications());
+
+    // Assert AM Resource
+    Assert.assertEquals(user0LeafQueue.getAMResourceLimit().getMemorySize(),
+        user0LeafQueue.getMaxAMResourcePerQueuePercent()*MAX_MEMORY*GB, 1e-6);
+
+    // Assert user limit (no limit) when limit factor is -1
+    Assert.assertEquals(MAX_MEMORY*GB,
+        user0LeafQueue.getEffectiveMaxCapacityDown("",
+            user0LeafQueue.getMinimumAllocation()).getMemorySize(), 1e-6);
+  }
 
   private LeafQueue createQueue(String queuePath) throws YarnException {
     return autoQueueHandler.autoCreateQueue(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 0c9799d..889da07 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -1437,6 +1437,114 @@ public class TestLeafQueue {
   }
 
   @Test
+  public void testDisabledUserLimitFactor() throws Exception {
+    // Mock the queue
+    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+    //unset maxCapacity
+    a.setMaxCapacity(1.0f);
+
+    when(csContext.getClusterResource())
+        .thenReturn(Resources.createResource(16 * GB, 32));
+
+    // Users
+    final String user0 = "user0";
+    final String user1 = "user1";
+
+    // Submit applications
+    final ApplicationAttemptId appAttemptId0 =
+        TestUtils.getMockApplicationAttemptId(0, 0);
+    FiCaSchedulerApp app0 =
+        new FiCaSchedulerApp(appAttemptId0, user0, a,
+            a.getAbstractUsersManager(), spyRMContext);
+    a.submitApplicationAttempt(app0, user0);
+
+    final ApplicationAttemptId appAttemptId1 =
+        TestUtils.getMockApplicationAttemptId(1, 0);
+    FiCaSchedulerApp app1 =
+        new FiCaSchedulerApp(appAttemptId1, user1, a,
+            a.getAbstractUsersManager(), spyRMContext);
+    a.submitApplicationAttempt(app1, user1); // different user
+
+    // Setup some nodes
+    String host0 = "127.0.0.1";
+    FiCaSchedulerNode node0 =
+        TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 8*GB);
+    String host1 = "127.0.0.2";
+    FiCaSchedulerNode node1 =
+        TestUtils.getMockNode(host1, DEFAULT_RACK, 0, 8*GB);
+
+    final int numNodes = 2;
+    Resource clusterResource =
+        Resources.createResource(numNodes * (8*GB), numNodes * 16);
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+    root.updateClusterResource(clusterResource,
+        new ResourceLimits(clusterResource));
+
+    // Setup resource-requests
+    Priority priority = TestUtils.createMockPriority(1);
+    app0.updateResourceRequests(Collections.singletonList(
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 2, true,
+            priority, recordFactory)));
+
+    app1.updateResourceRequests(Collections.singletonList(
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
+            priority, recordFactory)));
+
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app0.getApplicationAttemptId(), app0, app1.getApplicationAttemptId(),
+        app1);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node0.getNodeID(),
+        node0, node1.getNodeID(), node1);
+
+    /**
+     * Start testing ...
+     */
+    a.setUserLimitFactor(1);
+    a.setUserLimit(50);
+
+    root.updateClusterResource(clusterResource,
+        new ResourceLimits(clusterResource));
+
+    // There're two active users
+    assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers());
+
+    // 1 container to user0
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
+    assertEquals(3*GB, a.getUsedResources().getMemorySize());
+    assertEquals(3*GB, app0.getCurrentConsumption().getMemorySize());
+    assertEquals(0*GB, app1.getCurrentConsumption().getMemorySize());
+
+    // Allocate one container to app1. Even if app0
+    // submit earlier, it cannot get this container assigned since user0
+    // exceeded user-limit already.
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node0,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
+    assertEquals(4*GB, a.getUsedResources().getMemorySize());
+    assertEquals(3*GB, app0.getCurrentConsumption().getMemorySize());
+    assertEquals(1*GB, app1.getCurrentConsumption().getMemorySize());
+
+    // Set to -1 , disabled user limit factor
+    // There will be not limited
+    a.setUserLimitFactor(-1);
+    root.updateClusterResource(clusterResource,
+        new ResourceLimits(clusterResource));
+
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node1,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
+    assertEquals(7*GB, a.getUsedResources().getMemorySize());
+    assertEquals(6*GB, app0.getCurrentConsumption().getMemorySize());
+    assertEquals(1*GB, app1.getCurrentConsumption().getMemorySize());
+
+  }
+
+  @Test
   public void testUserLimits() throws Exception {
     // Mock the queue
     LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
@@ -1497,7 +1605,7 @@ public class TestLeafQueue {
     /**
      * Start testing...
      */
-    
+
     // Set user-limit
     a.setUserLimit(50);
     a.setUserLimitFactor(2);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org