You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2022/06/20 13:04:08 UTC

[hadoop] branch trunk updated: YARN-11185. Pending app metrics are increased doubly when a queue reaches its max-parallel-apps limit. Contributed by Andras Gyori

This is an automated email from the ASF dual-hosted git repository.

snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3a66348fda2 YARN-11185. Pending app metrics are increased doubly when a queue reaches its max-parallel-apps limit. Contributed by Andras Gyori
3a66348fda2 is described below

commit 3a66348fda29b18054853baed6664fec92dc5cc5
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Mon Jun 20 15:03:58 2022 +0200

    YARN-11185. Pending app metrics are increased doubly when a queue reaches its max-parallel-apps limit. Contributed by Andras Gyori
---
 .../scheduler/capacity/AbstractLeafQueue.java      |  4 +-
 .../capacity/TestCapacitySchedulerApps.java        | 77 ++++++++++++++--------
 2 files changed, 51 insertions(+), 30 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
index ac5c8a15167..08fedb578ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
@@ -578,6 +578,8 @@ public class AbstractLeafQueue extends AbstractCSQueue {
   public void submitApplicationAttempt(FiCaSchedulerApp application,
       String userName, boolean isMoveApp) {
     // Careful! Locking order is important!
+    boolean isAppAlreadySubmitted = applicationAttemptMap.containsKey(
+        application.getApplicationAttemptId());
     writeLock.lock();
     try {
       // TODO, should use getUser, use this method just to avoid UT failure
@@ -591,7 +593,7 @@ public class AbstractLeafQueue extends AbstractCSQueue {
     }
 
     // We don't want to update metrics for move app
-    if (!isMoveApp) {
+    if (!isMoveApp && !isAppAlreadySubmitted) {
       boolean unmanagedAM = application.getAppSchedulingInfo() != null &&
           application.getAppSchedulingInfo().isUnmanagedAM();
       usageTracker.getMetrics().submitAppAttempt(userName, unmanagedAM);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerApps.java
index ea22c24b355..d192e7dcc69 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerApps.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerApps.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -116,6 +117,8 @@ import static org.mockito.Mockito.when;
 
 public class TestCapacitySchedulerApps {
 
+  public static final int MAX_PARALLEL_APPS = 5;
+  public static final String USER_0 = "user_0";
   private ResourceManager resourceManager = null;
   private RMContext mockContext;
 
@@ -237,18 +240,7 @@ public class TestCapacitySchedulerApps {
     YarnScheduler scheduler = rm.getResourceScheduler();
 
     // submit an app
-    MockRMAppSubmissionData data =
-        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
-            .withAppName("test-move-1")
-            .withUser("user_0")
-            .withAcls(null)
-            .withQueue("a1")
-            .withUnmanagedAM(false)
-            .build();
-    RMApp app = MockRMAppSubmitter.submit(rm, data);
-    ApplicationAttemptId appAttemptId =
-        rm.getApplicationReport(app.getApplicationId())
-            .getCurrentApplicationAttemptId();
+    ApplicationAttemptId appAttemptId = submitApp(rm);
 
     // check preconditions
     List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
@@ -1020,18 +1012,7 @@ public class TestCapacitySchedulerApps {
         (AbstractYarnScheduler) rm.getResourceScheduler();
 
     // submit an app
-    MockRMAppSubmissionData data =
-        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
-            .withAppName("test-move-1")
-            .withUser("user_0")
-            .withAcls(null)
-            .withQueue("a1")
-            .withUnmanagedAM(false)
-            .build();
-    RMApp app = MockRMAppSubmitter.submit(rm, data);
-    ApplicationAttemptId appAttemptId =
-        rm.getApplicationReport(app.getApplicationId())
-            .getCurrentApplicationAttemptId();
+    ApplicationAttemptId appAttemptId = submitApp(rm);
 
     // check preconditions
     assertOneAppInQueue(scheduler, "a1");
@@ -1057,23 +1038,61 @@ public class TestCapacitySchedulerApps {
   }
 
   @Test
-  public void testMoveAllAppsInvalidDestination() throws Exception {
+  public void testMaxParallelAppsPendingQueueMetrics() throws Exception {
     MockRM rm = setUpMove();
     ResourceScheduler scheduler = rm.getResourceScheduler();
+    CapacityScheduler cs = (CapacityScheduler) scheduler;
+    cs.getQueueContext().getConfiguration().setInt(CapacitySchedulerConfiguration.getQueuePrefix(A1)
+        + CapacitySchedulerConfiguration.MAX_PARALLEL_APPLICATIONS, MAX_PARALLEL_APPS);
+    cs.reinitialize(cs.getQueueContext().getConfiguration(), mockContext);
+    List<ApplicationAttemptId> attemptIds = new ArrayList<>();
+
+    for (int i = 0; i < 2 * MAX_PARALLEL_APPS; i++) {
+      attemptIds.add(submitApp(rm));
+    }
+
+    // Finish first batch to allow the other batch to run
+    for (int i = 0; i < MAX_PARALLEL_APPS; i++) {
+      cs.handle(new AppAttemptRemovedSchedulerEvent(attemptIds.get(i),
+          RMAppAttemptState.FINISHED, true));
+    }
+
+    // Finish the remaining apps
+    for (int i = MAX_PARALLEL_APPS; i < 2 * MAX_PARALLEL_APPS; i++) {
+      cs.handle(new AppAttemptRemovedSchedulerEvent(attemptIds.get(i),
+          RMAppAttemptState.FINISHED, true));
+    }
+
+    Assert.assertEquals("No pending app should remain for root queue", 0,
+        cs.getRootQueueMetrics().getAppsPending());
+    Assert.assertEquals("No running application should remain for root queue", 0,
+        cs.getRootQueueMetrics().getAppsRunning());
+
+    rm.stop();
+  }
 
+  private ApplicationAttemptId submitApp(MockRM rm) throws Exception {
     // submit an app
     MockRMAppSubmissionData data =
         MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
             .withAppName("test-move-1")
-            .withUser("user_0")
+            .withUser(USER_0)
             .withAcls(null)
             .withQueue("a1")
             .withUnmanagedAM(false)
             .build();
     RMApp app = MockRMAppSubmitter.submit(rm, data);
-    ApplicationAttemptId appAttemptId =
-        rm.getApplicationReport(app.getApplicationId())
-            .getCurrentApplicationAttemptId();
+    return rm.getApplicationReport(app.getApplicationId())
+        .getCurrentApplicationAttemptId();
+  }
+
+  @Test
+  public void testMoveAllAppsInvalidDestination() throws Exception {
+    MockRM rm = setUpMove();
+    ResourceScheduler scheduler = rm.getResourceScheduler();
+
+    // submit an app
+    ApplicationAttemptId appAttemptId = submitApp(rm);
 
     // check preconditions
     assertApps(scheduler, "root", appAttemptId);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org