You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2017/12/05 00:31:24 UTC

[2/2] hadoop git commit: YARN-7496. CS Intra-queue preemption user-limit calculations are not in line with LeafQueue user-limit calculations. (Eric Payne via wangda)

YARN-7496. CS Intra-queue preemption user-limit calculations are not in line with LeafQueue user-limit calculations. (Eric Payne via wangda)

Change-Id: I4dada78a227408a1f2d9bc18099041aad81d67d7
(cherry picked from commit dd471425652f670786bafa03537526c19180d5ee)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3bc0d0c3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3bc0d0c3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3bc0d0c3

Branch: refs/heads/branch-2.8.3
Commit: 3bc0d0c38911dd24fc39beb748099b6259f489e3
Parents: 370e08c
Author: Wangda Tan <wa...@apache.org>
Authored: Thu Nov 23 20:18:22 2017 -0800
Committer: Junping Du <ju...@apache.org>
Committed: Mon Dec 4 16:26:04 2017 -0800

----------------------------------------------------------------------
 .../scheduler/capacity/LeafQueue.java           | 37 +++-----
 ...cityPreemptionPolicyIntraQueueUserLimit.java | 42 +++++++++
 .../scheduler/capacity/TestLeafQueue.java       | 97 ++++++++++++++++++++
 3 files changed, 152 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bc0d0c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 40b7e0e..3915b1e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -142,7 +142,6 @@ public class LeafQueue extends AbstractCSQueue {
   private Set<String> activeUsersSet =
       Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
   private float activeUsersTimesWeights = 0.0f;
-  private float allUsersTimesWeights = 0.0f;
 
   @SuppressWarnings({ "unchecked", "rawtypes" })
   public LeafQueue(CapacitySchedulerContext cs,
@@ -307,7 +306,6 @@ public class LeafQueue extends AbstractCSQueue {
       ue.getValue().setWeight(getUserWeightFromQueue(ue.getKey()));
     }
     activeUsersTimesWeights = sumActiveUsersTimesWeights();
-    allUsersTimesWeights = sumAllUsersTimesWeights();
   }
 
   /**
@@ -450,7 +448,6 @@ public class LeafQueue extends AbstractCSQueue {
       user = new User(userName);
       users.put(userName, user);
       user.setWeight(getUserWeightFromQueue(userName));
-      allUsersTimesWeights = sumAllUsersTimesWeights();
     }
     return user;
   }
@@ -871,7 +868,6 @@ public class LeafQueue extends AbstractCSQueue {
     user.finishApplication(wasActive);
     if (user.getTotalApplications() == 0) {
       users.remove(application.getUser());
-      allUsersTimesWeights = sumAllUsersTimesWeights();
     }
 
     // Check if we can activate more applications
@@ -1298,18 +1294,20 @@ public class LeafQueue extends AbstractCSQueue {
     // Also, the queue's configured capacity should be higher than 
     // queue-hard-limit * ulMin
 
-    float usersSummedByWeight;
-    if (forActive) {
-      if (activeUsersManager.getActiveUsersChanged()) {
-        activeUsersSet = activeUsersManager.getActiveUsersSet();
-        activeUsersTimesWeights  = sumActiveUsersTimesWeights();
-        activeUsersManager.clearActiveUsersChanged();
-      }
-      usersSummedByWeight = activeUsersTimesWeights;
-    } else {
-      usersSummedByWeight = allUsersTimesWeights;
+    if (activeUsersManager.getActiveUsersChanged()) {
+      activeUsersSet = activeUsersManager.getActiveUsersSet();
+      activeUsersTimesWeights  = sumActiveUsersTimesWeights();
+      activeUsersManager.clearActiveUsersChanged();
     }
-    
+    float usersSummedByWeight = activeUsersTimesWeights;
+
+    // Align the preemption algorithm with the assignment algorithm.
+    // If calculating for preemption and the user is not active, calculate the
+    // limit as if the user will be preempted (since that will make it active).
+    if (!forActive && !activeUsersSet.contains(userName)) {
+      usersSummedByWeight = activeUsersTimesWeights + user.getWeight();
+    }
+
     // User limit resource is determined by:
     // max(currentCapacity / #activeUsers, currentCapacity *
     // user-limit-percentage%)
@@ -1394,15 +1392,6 @@ public class LeafQueue extends AbstractCSQueue {
     }
     return count;
   }
-
-  synchronized float sumAllUsersTimesWeights() {
-    float count = 0.0f;
-    for (String userName : users.keySet()) {
-      User user = getUser(userName);
-      count += user.getWeight();
-    }
-    return count;
-  }
   
   @Private
   protected synchronized boolean canAssignToUser(Resource clusterResource,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bc0d0c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java
index 0440db3..006c3f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java
@@ -931,4 +931,46 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
   }
+
+  @Test
+  public void testSimpleIUserLimitntraQueuePreemptionNoOverPreemption()
+      throws IOException {
+    conf.setFloat(CapacitySchedulerConfiguration.
+        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+        (float) 0.5);
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 100 1 0]);" + // root
+            "-a(=[100 100 100 1 0])"; // a
+
+    // When preempting within a queue, it should preempt no more than what the
+    // LeafQueue will then re-assign. In this use case, containers will be
+    // preempted from app1, which will cause app1 and app2 to be active.
+    // LeafQueue will calculate MULP to be 50% because
+    // (100 resources / 2 active users) = 50. We expect 14 containers to be
+    // preempted.
+    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+    //          \tMULP
+    String appsConfig =
+        "a\t" // app1 in a
+            + "(1,1,n1,,65,false,0,user1)\t50;" +
+            "a\t" // app2 in a
+            + "(1,1,n1,,35,false,0,user2)\t50;" +
+            "a\t" // app3 in a
+            + "(1,1,n1,,0,false,35,user3)\t50"
+            ;
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // app2 is right at its user limit and app1 needs one resource. Should
+    // preempt 1 container.
+    verify(mDisp, times(14)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bc0d0c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index e4fe03d..23fb29d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -3503,4 +3503,101 @@ public class TestLeafQueue {
     assertEquals(1*GB,
         app_1.getTotalPendingRequestsPerPartition().get("").getMemorySize());
   }
+
+  @Test
+  public void testGetResourceLimitForAllUsers() throws Exception {
+    // Mock the queue
+    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+    // Set minimum-user-limit-percent for queue "a" so 3 users can be active.
+    csConf.setUserLimit(a.getQueuePath(), 33);
+    // Make sure a single user can consume the entire cluster.
+    csConf.setUserLimitFactor(a.getQueuePath(), 15);
+    csConf.setMaximumCapacity(a.getQueuePath(), 100);
+
+    when(csContext.getClusterResource())
+        .thenReturn(Resources.createResource(100 * GB, 192));
+    a.reinitialize(a, csContext.getClusterResource());
+
+    // Users
+    final String user_0 = "user_0";
+    final String user_1 = "user_1";
+    final String user_2 = "user_2";
+
+    // Submit applications
+    final ApplicationAttemptId appAttemptId_0 =
+        TestUtils.getMockApplicationAttemptId(0, 0);
+    FiCaSchedulerApp app_0 =
+        new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+            a.getActiveUsersManager(), spyRMContext);
+    a.submitApplicationAttempt(app_0, user_0);
+
+    final ApplicationAttemptId appAttemptId_1 =
+        TestUtils.getMockApplicationAttemptId(1, 0);
+    FiCaSchedulerApp app_1 =
+        new FiCaSchedulerApp(appAttemptId_1, user_1, a,
+            a.getActiveUsersManager(), spyRMContext);
+    a.submitApplicationAttempt(app_1, user_1); // different user
+
+    final ApplicationAttemptId appAttemptId_2 =
+        TestUtils.getMockApplicationAttemptId(2, 0);
+    FiCaSchedulerApp app_2 =
+        new FiCaSchedulerApp(appAttemptId_2, user_2, a,
+            a.getActiveUsersManager(), spyRMContext);
+    a.submitApplicationAttempt(app_2, user_2); // different user
+
+    // Setup some nodes
+    String host_0 = "127.0.0.1";
+    FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 100*GB);
+
+    final int numNodes = 1;
+    Resource clusterResource =
+        Resources.createResource(numNodes * (100*GB), numNodes * 128);
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+    // user_0 consumes 65% of the queue
+    Priority priority = TestUtils.createMockPriority(1);
+    for (int i=0; i < 65; i++) {
+      app_0.updateResourceRequests(Collections.singletonList(
+          TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
+              priority, recordFactory)));
+      a.assignContainers(clusterResource, node_0,
+          new ResourceLimits(clusterResource),
+          SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    }
+    assertEquals(65*GB, app_0.getCurrentConsumption().getMemorySize());
+
+    // When the minimum user limit percent is set to 33%, the capacity scheduler
+    // will try to assign 35% of resources to each user. This is because the
+    // capacity scheduler leaves a slight buffer for each user.
+    for (int i=0; i < 35; i++) {
+      app_1.updateResourceRequests(Collections.singletonList(
+          TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
+              priority, recordFactory)));
+
+      a.assignContainers(clusterResource, node_0,
+          new ResourceLimits(clusterResource),
+          SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    }
+    assertEquals(35*GB, app_1.getCurrentConsumption().getMemorySize());
+    assertEquals(0, a.getActiveUsersManager().getNumActiveUsers());
+
+    app_0.updateResourceRequests(Collections.singletonList(
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 35, true,
+            priority, recordFactory)));
+    a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    assertEquals(0*GB, app_2.getCurrentConsumption().getMemorySize());
+    assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
+
+    // With one active user requesting resources (user_2), one user exactly at
+    // the user limit guarantee (user_1) and one user over its user limit
+    // (uesr_0), preemption should calculate the user limit to be 50% of the
+    // resources. Since the capacity scheduler will leave a buffer of 1
+    // container, 51GB should be the amount of resources calculated for
+    // preemption.
+    Resource ulForallUsers = a.getResourceLimitForAllUsers(user_2,
+        clusterResource, "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    assertEquals(51*GB, ulForallUsers.getMemorySize());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org