You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2022/06/20 13:04:08 UTC
[hadoop] branch trunk updated: YARN-11185. Pending app metrics are increased doubly when a queue reaches its max-parallel-apps limit. Contributed by Andras Gyori
This is an automated email from the ASF dual-hosted git repository.
snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3a66348fda2 YARN-11185. Pending app metrics are increased doubly when a queue reaches its max-parallel-apps limit. Contributed by Andras Gyori
3a66348fda2 is described below
commit 3a66348fda29b18054853baed6664fec92dc5cc5
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Mon Jun 20 15:03:58 2022 +0200
YARN-11185. Pending app metrics are increased doubly when a queue reaches its max-parallel-apps limit. Contributed by Andras Gyori
---
.../scheduler/capacity/AbstractLeafQueue.java | 4 +-
.../capacity/TestCapacitySchedulerApps.java | 77 ++++++++++++++--------
2 files changed, 51 insertions(+), 30 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
index ac5c8a15167..08fedb578ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
@@ -578,6 +578,8 @@ public class AbstractLeafQueue extends AbstractCSQueue {
public void submitApplicationAttempt(FiCaSchedulerApp application,
String userName, boolean isMoveApp) {
// Careful! Locking order is important!
+ boolean isAppAlreadySubmitted = applicationAttemptMap.containsKey(
+ application.getApplicationAttemptId());
writeLock.lock();
try {
// TODO, should use getUser, use this method just to avoid UT failure
@@ -591,7 +593,7 @@ public class AbstractLeafQueue extends AbstractCSQueue {
}
// We don't want to update metrics for move app
- if (!isMoveApp) {
+ if (!isMoveApp && !isAppAlreadySubmitted) {
boolean unmanagedAM = application.getAppSchedulingInfo() != null &&
application.getAppSchedulingInfo().isUnmanagedAM();
usageTracker.getMetrics().submitAppAttempt(userName, unmanagedAM);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerApps.java
index ea22c24b355..d192e7dcc69 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerApps.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerApps.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -116,6 +117,8 @@ import static org.mockito.Mockito.when;
public class TestCapacitySchedulerApps {
+ public static final int MAX_PARALLEL_APPS = 5;
+ public static final String USER_0 = "user_0";
private ResourceManager resourceManager = null;
private RMContext mockContext;
@@ -237,18 +240,7 @@ public class TestCapacitySchedulerApps {
YarnScheduler scheduler = rm.getResourceScheduler();
// submit an app
- MockRMAppSubmissionData data =
- MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
- .withAppName("test-move-1")
- .withUser("user_0")
- .withAcls(null)
- .withQueue("a1")
- .withUnmanagedAM(false)
- .build();
- RMApp app = MockRMAppSubmitter.submit(rm, data);
- ApplicationAttemptId appAttemptId =
- rm.getApplicationReport(app.getApplicationId())
- .getCurrentApplicationAttemptId();
+ ApplicationAttemptId appAttemptId = submitApp(rm);
// check preconditions
List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
@@ -1020,18 +1012,7 @@ public class TestCapacitySchedulerApps {
(AbstractYarnScheduler) rm.getResourceScheduler();
// submit an app
- MockRMAppSubmissionData data =
- MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
- .withAppName("test-move-1")
- .withUser("user_0")
- .withAcls(null)
- .withQueue("a1")
- .withUnmanagedAM(false)
- .build();
- RMApp app = MockRMAppSubmitter.submit(rm, data);
- ApplicationAttemptId appAttemptId =
- rm.getApplicationReport(app.getApplicationId())
- .getCurrentApplicationAttemptId();
+ ApplicationAttemptId appAttemptId = submitApp(rm);
// check preconditions
assertOneAppInQueue(scheduler, "a1");
@@ -1057,23 +1038,61 @@ public class TestCapacitySchedulerApps {
}
@Test
- public void testMoveAllAppsInvalidDestination() throws Exception {
+ public void testMaxParallelAppsPendingQueueMetrics() throws Exception {
MockRM rm = setUpMove();
ResourceScheduler scheduler = rm.getResourceScheduler();
+ CapacityScheduler cs = (CapacityScheduler) scheduler;
+ cs.getQueueContext().getConfiguration().setInt(CapacitySchedulerConfiguration.getQueuePrefix(A1)
+ + CapacitySchedulerConfiguration.MAX_PARALLEL_APPLICATIONS, MAX_PARALLEL_APPS);
+ cs.reinitialize(cs.getQueueContext().getConfiguration(), mockContext);
+ List<ApplicationAttemptId> attemptIds = new ArrayList<>();
+
+ for (int i = 0; i < 2 * MAX_PARALLEL_APPS; i++) {
+ attemptIds.add(submitApp(rm));
+ }
+
+ // Finish first batch to allow the other batch to run
+ for (int i = 0; i < MAX_PARALLEL_APPS; i++) {
+ cs.handle(new AppAttemptRemovedSchedulerEvent(attemptIds.get(i),
+ RMAppAttemptState.FINISHED, true));
+ }
+
+ // Finish the remaining apps
+ for (int i = MAX_PARALLEL_APPS; i < 2 * MAX_PARALLEL_APPS; i++) {
+ cs.handle(new AppAttemptRemovedSchedulerEvent(attemptIds.get(i),
+ RMAppAttemptState.FINISHED, true));
+ }
+
+ Assert.assertEquals("No pending app should remain for root queue", 0,
+ cs.getRootQueueMetrics().getAppsPending());
+ Assert.assertEquals("No running application should remain for root queue", 0,
+ cs.getRootQueueMetrics().getAppsRunning());
+
+ rm.stop();
+ }
+ private ApplicationAttemptId submitApp(MockRM rm) throws Exception {
// submit an app
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
.withAppName("test-move-1")
- .withUser("user_0")
+ .withUser(USER_0)
.withAcls(null)
.withQueue("a1")
.withUnmanagedAM(false)
.build();
RMApp app = MockRMAppSubmitter.submit(rm, data);
- ApplicationAttemptId appAttemptId =
- rm.getApplicationReport(app.getApplicationId())
- .getCurrentApplicationAttemptId();
+ return rm.getApplicationReport(app.getApplicationId())
+ .getCurrentApplicationAttemptId();
+ }
+
+ @Test
+ public void testMoveAllAppsInvalidDestination() throws Exception {
+ MockRM rm = setUpMove();
+ ResourceScheduler scheduler = rm.getResourceScheduler();
+
+ // submit an app
+ ApplicationAttemptId appAttemptId = submitApp(rm);
// check preconditions
assertApps(scheduler, "root", appAttemptId);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org