You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bt...@apache.org on 2022/05/11 16:05:15 UTC

[hadoop] branch trunk updated: YARN-11114. RMWebServices returns only apps matching exactly the submitted queue name. Contributed by Szilard Nemeth

This is an automated email from the ASF dual-hosted git repository.

bteke pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9af3eabdca2 YARN-11114. RMWebServices returns only apps matching exactly the submitted queue name. Contributed by Szilard Nemeth
9af3eabdca2 is described below

commit 9af3eabdca2e246dbeb583cbc89d51e89ef11ecb
Author: Szilard Nemeth <sz...@gmail.com>
AuthorDate: Wed Apr 20 19:39:47 2022 +0200

    YARN-11114. RMWebServices returns only apps matching exactly the submitted queue name. Contributed by Szilard Nemeth
---
 .../server/resourcemanager/ClientRMService.java    |  29 +++-
 .../resourcemanager/TestClientRMService.java       |   4 +-
 .../webapp/TestRMWebServicesApps.java              | 150 ++++++++++++++++++++-
 3 files changed, 178 insertions(+), 5 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index 04efc886413..6c37b7e9c0a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -37,7 +37,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
-
 import org.apache.commons.cli.UnrecognizedOptionException;
 import org.apache.commons.lang3.Range;
 import org.slf4j.Logger;
@@ -913,7 +912,17 @@ public class ClientRMService extends AbstractService implements
       }
 
       if (queues != null && !queues.isEmpty()) {
-        if (!queues.contains(application.getQueue())) {
+        Map<String, List<RMApp>> foundApps = queryApplicationsByQueues(apps, queues);
+        List<RMApp> runningAppsByQueues = foundApps.entrySet().stream()
+            .filter(e -> queues.contains(e.getKey()))
+            .map(Map.Entry::getValue)
+            .flatMap(Collection::stream)
+            .collect(Collectors.toList());
+        List<RMApp> runningAppsById = runningAppsByQueues.stream()
+            .filter(app -> app.getApplicationId().equals(application.getApplicationId()))
+            .collect(Collectors.toList());
+
+        if (runningAppsById.isEmpty() && !queues.contains(application.getQueue())) {
           continue;
         }
       }
@@ -992,6 +1001,22 @@ public class ClientRMService extends AbstractService implements
     return response;
   }
 
+  private Map<String, List<RMApp>> queryApplicationsByQueues(
+      Map<ApplicationId, RMApp> apps, Set<String> queues) {
+    final Map<String, List<RMApp>> appsToQueues = new HashMap<>();
+    for (String queue : queues) {
+      List<ApplicationAttemptId> appsInQueue = scheduler.getAppsInQueue(queue);
+      if (appsInQueue != null && !appsInQueue.isEmpty()) {
+        for (ApplicationAttemptId appAttemptId : appsInQueue) {
+          RMApp rmApp = apps.get(appAttemptId.getApplicationId());
+          appsToQueues.putIfAbsent(queue, new ArrayList<>());
+          appsToQueues.get(queue).add(rmApp);
+        }
+      }
+    }
+    return appsToQueues;
+  }
+
   private Set<String> getLowerCasedAppTypes(GetApplicationsRequest request) {
     Set<String> applicationTypes = new HashSet<>();
     if (request.getApplicationTypes() != null && !request.getApplicationTypes()
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index 7806845a2ed..9f4e9433b14 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -1402,9 +1402,9 @@ public class TestClientRMService {
     request.setQueues(queueSet);
 
     queueSet.add(queues[0]);
-    assertEquals("Incorrect number of applications in queue", 2,
+    assertEquals("Incorrect number of applications in queue", 3,
         rmService.getApplications(request).getApplicationList().size());
-    assertEquals("Incorrect number of applications in queue", 2,
+    assertEquals("Incorrect number of applications in queue", 3,
         rmService.getApplications(request).getApplicationList().size());
 
     queueSet.add(queues[1]);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java
index e22311ccbda..4c859baf789 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
 import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
@@ -82,6 +83,16 @@ public class TestRMWebServicesApps extends JerseyTestBase {
   private static final int CONTAINER_MB = 1024;
 
   private static class WebServletModule extends ServletModule {
+    private final Class<? extends AbstractYarnScheduler> scheduler;
+
+    public WebServletModule() {
+      this.scheduler = FifoScheduler.class;
+    }
+
+    public WebServletModule(Class<? extends AbstractYarnScheduler> scheduler) {
+      this.scheduler = scheduler;
+    }
+
     @Override
     protected void configureServlets() {
       bind(JAXBContextResolver.class);
@@ -90,7 +101,7 @@ public class TestRMWebServicesApps extends JerseyTestBase {
       Configuration conf = new Configuration();
       conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
           YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
-      conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
+      conf.setClass(YarnConfiguration.RM_SCHEDULER, scheduler,
           ResourceScheduler.class);
       rm = new MockRM(conf);
       bind(ResourceManager.class).toInstance(rm);
@@ -1970,5 +1981,142 @@ public class TestRMWebServicesApps extends JerseyTestBase {
         enforceExecutionType);
   }
 
+  @Test
+  public void testAppsQueryByQueueShortname() throws Exception {
+    GuiceServletConfig.setInjector(
+        Guice.createInjector(new WebServletModule(CapacityScheduler.class)));
+
+    rm.start();
+    MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
+    //YARN-11114 - Finished apps can only be queried with exactly the
+    // same queue name that the app is submitted to.
+    //As the queue is 'root.default'  and the query is 'default' here,
+    // this app won't be returned.
+    RMApp finishedApp1 = MockRMAppSubmitter.submit(rm,
+        MockRMAppSubmissionData.Builder
+            .createWithMemory(CONTAINER_MB, rm)
+            .withQueue("root.default")
+            .build());
+    RMApp finishedApp2 = MockRMAppSubmitter.submit(rm,
+        MockRMAppSubmissionData.Builder
+            .createWithMemory(CONTAINER_MB, rm)
+            .withQueue("default")
+            .build());
+
+    RMApp runningApp1 = MockRMAppSubmitter.submit(rm,
+        MockRMAppSubmissionData.Builder
+            .createWithMemory(CONTAINER_MB, rm)
+            .withQueue("default")
+            .build());
+    RMApp runningApp2 = MockRMAppSubmitter.submit(rm,
+        MockRMAppSubmissionData.Builder
+            .createWithMemory(CONTAINER_MB, rm)
+            .withQueue("root.default")
+            .build());
+    amNodeManager.nodeHeartbeat(true);
+    finishApp(amNodeManager, finishedApp1);
+    amNodeManager.nodeHeartbeat(true);
+    finishApp(amNodeManager, finishedApp2);
+
+    WebResource r = resource();
+
+    ClientResponse response = r.path("ws").path("v1").path("cluster")
+        .path("apps")
+        .queryParam("queue", "default")
+        .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+    assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
+        response.getType().toString());
+    JSONObject json = response.getEntity(JSONObject.class);
+    assertEquals("incorrect number of elements", 1, json.length());
+    JSONObject apps = json.getJSONObject("apps");
+    assertEquals("incorrect number of elements", 1, apps.length());
+
+    JSONArray array = apps.getJSONArray("app");
+
+    Set<String> appIds = getApplicationIds(array);
+    assertTrue("Running app 1 should be in the result list!",
+        appIds.contains(runningApp1.getApplicationId().toString()));
+    assertTrue("Running app 2 should be in the result list!",
+        appIds.contains(runningApp2.getApplicationId().toString()));
+    assertFalse("Finished app 1 should not be in the result list " +
+            "as it was submitted to 'root.default' but the query is for 'default'",
+        appIds.contains(finishedApp1.getApplicationId().toString()));
+    assertTrue("Finished app 2 should be in the result list " +
+            "as it was submitted to 'default' and the query is exactly for 'default'",
+        appIds.contains(finishedApp2.getApplicationId().toString()));
+    assertEquals("incorrect number of elements", 3, array.length());
+
+    rm.stop();
+  }
+
+  @Test
+  public void testAppsQueryByQueueFullname() throws Exception {
+    GuiceServletConfig.setInjector(
+        Guice.createInjector(new WebServletModule(CapacityScheduler.class)));
+
+    rm.start();
+    MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
+    RMApp finishedApp1 = MockRMAppSubmitter.submit(rm,
+        MockRMAppSubmissionData.Builder
+            .createWithMemory(CONTAINER_MB, rm)
+            .withQueue("root.default")
+            .build());
+    //YARN-11114 - Finished apps can only be queried with exactly the
+    // same queue name that the app is submitted to.
+    //As the queue is 'default'  and the query is 'root.default' here,
+    // this app won't be returned,
+    RMApp finishedApp2 = MockRMAppSubmitter.submit(rm,
+        MockRMAppSubmissionData.Builder
+            .createWithMemory(CONTAINER_MB, rm)
+            .withQueue("default")
+            .build());
+
+    RMApp runningApp1 = MockRMAppSubmitter.submit(rm,
+        MockRMAppSubmissionData.Builder
+            .createWithMemory(CONTAINER_MB, rm)
+            .withQueue("default")
+            .build());
+    RMApp runningApp2 = MockRMAppSubmitter.submit(rm,
+        MockRMAppSubmissionData.Builder
+            .createWithMemory(CONTAINER_MB, rm)
+            .withQueue("root.default")
+            .build());
+    amNodeManager.nodeHeartbeat(true);
+    finishApp(amNodeManager, finishedApp1);
+
+    amNodeManager.nodeHeartbeat(true);
+    finishApp(amNodeManager, finishedApp2);
+
+    WebResource r = resource();
+
+    ClientResponse response = r.path("ws").path("v1").path("cluster")
+        .path("apps")
+        .queryParam("queue", "root.default")
+        .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+    assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
+        response.getType().toString());
+    JSONObject json = response.getEntity(JSONObject.class);
+    assertEquals("incorrect number of elements", 1, json.length());
+    JSONObject apps = json.getJSONObject("apps");
+    assertEquals("incorrect number of elements", 1, apps.length());
+
+    JSONArray array = apps.getJSONArray("app");
+
+    Set<String> appIds = getApplicationIds(array);
+    assertTrue("Running app 1 should be in the result list!",
+        appIds.contains(runningApp1.getApplicationId().toString()));
+    assertTrue("Running app 2 should be in the result list!",
+        appIds.contains(runningApp2.getApplicationId().toString()));
+    assertTrue("Finished app 1 should be in the result list, " +
+            "as it was submitted to 'root.default' and the query is exactly for 'root.default'!",
+        appIds.contains(finishedApp1.getApplicationId().toString()));
+    assertFalse("Finished app 2 should not be in the result list, " +
+            "as it was submitted to 'default' but the query is for 'root.default'!",
+        appIds.contains(finishedApp2.getApplicationId().toString()));
+    assertEquals("incorrect number of elements", 3, array.length());
+
+    rm.stop();
+  }
+
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org