You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2021/07/23 11:29:35 UTC

[GitHub] [hadoop] bibinchundatt commented on a change in pull request #3135: YARN-10829. Support getApplications API in FederationClientInterceptor

bibinchundatt commented on a change in pull request #3135:
URL: https://github.com/apache/hadoop/pull/3135#discussion_r665334747



##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java
##########
@@ -52,4 +63,131 @@ public static GetClusterMetricsResponse merge(
     }
     return GetClusterMetricsResponse.newInstance(tmp);
   }
+
+  /**
+   * Merges a list of ApplicationReports grouping by ApplicationId.
+   * Our current policy is to merge the application reports from the reachable
+   * SubClusters.
+   * @param responses a list of ApplicationResponse to merge
+   * @param returnPartialResult if the merge ApplicationReports should contain
+   * partial result or not
+   * @return the merged ApplicationsResponse
+   */
+  public static GetApplicationsResponse mergeApplications(
+      Collection<GetApplicationsResponse> responses,
+      boolean returnPartialResult){
+    Map<ApplicationId, ApplicationReport> federationAM = new HashMap<>();
+    Map<ApplicationId, ApplicationReport> federationUAMSum = new HashMap<>();
+
+    for (GetApplicationsResponse appResponse : responses){
+      for (ApplicationReport appReport : appResponse.getApplicationList()){
+        ApplicationId appId = appReport.getApplicationId();
+        // Check if this ApplicationReport is an AM
+        if (appReport.getHost() != null) {
+          // Insert in the list of AM
+          federationAM.put(appId, appReport);
+          // Check if there are any UAM found before
+          if (federationUAMSum.containsKey(appId)) {
+            // Merge the current AM with the found UAM
+            mergeAMWithUAM(appReport, federationUAMSum.get(appId));
+            // Remove the sum of the UAMs
+            federationUAMSum.remove(appId);
+          }
+          // This ApplicationReport is an UAM
+        } else if (federationAM.containsKey(appId)) {
+          // Merge the current UAM with its own AM
+          mergeAMWithUAM(federationAM.get(appId), appReport);
+        } else if (federationUAMSum.containsKey(appId)) {
+          // Merge the current UAM with its own UAM and update the list of UAM
+          ApplicationReport mergedUAMReport =
+              mergeUAMWithUAM(federationUAMSum.get(appId), appReport);
+          federationUAMSum.put(appId, mergedUAMReport);
+        } else {
+          // Insert in the list of UAM
+          federationUAMSum.put(appId, appReport);
+        }
+      }
+    }
+    // Check the remaining UAMs are depending or not from federation
+    for (ApplicationReport appReport : federationUAMSum.values()) {
+      if (mergeUamToReport(appReport.getName(), returnPartialResult)) {
+        federationAM.put(appReport.getApplicationId(), appReport);
+      }
+    }
+
+    return GetApplicationsResponse.newInstance(federationAM.values());
+  }
+
+  private static ApplicationReport mergeUAMWithUAM(ApplicationReport uam1,

Review comment:
       We are adding witht mergeAMWithUAM(uam1, uam1); - this will merge the record twice.. 

##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java
##########
@@ -52,4 +63,131 @@ public static GetClusterMetricsResponse merge(
     }
     return GetClusterMetricsResponse.newInstance(tmp);
   }
+
+  /**
+   * Merges a list of ApplicationReports grouping by ApplicationId.
+   * Our current policy is to merge the application reports from the reachable
+   * SubClusters.
+   * @param responses a list of ApplicationResponse to merge
+   * @param returnPartialResult if the merge ApplicationReports should contain
+   * partial result or not
+   * @return the merged ApplicationsResponse
+   */
+  public static GetApplicationsResponse mergeApplications(
+      Collection<GetApplicationsResponse> responses,
+      boolean returnPartialResult){
+    Map<ApplicationId, ApplicationReport> federationAM = new HashMap<>();
+    Map<ApplicationId, ApplicationReport> federationUAMSum = new HashMap<>();
+
+    for (GetApplicationsResponse appResponse : responses){
+      for (ApplicationReport appReport : appResponse.getApplicationList()){
+        ApplicationId appId = appReport.getApplicationId();
+        // Check if this ApplicationReport is an AM
+        if (appReport.getHost() != null) {
+          // Insert in the list of AM
+          federationAM.put(appId, appReport);
+          // Check if there are any UAM found before
+          if (federationUAMSum.containsKey(appId)) {
+            // Merge the current AM with the found UAM
+            mergeAMWithUAM(appReport, federationUAMSum.get(appId));
+            // Remove the sum of the UAMs
+            federationUAMSum.remove(appId);
+          }
+          // This ApplicationReport is an UAM
+        } else if (federationAM.containsKey(appId)) {
+          // Merge the current UAM with its own AM
+          mergeAMWithUAM(federationAM.get(appId), appReport);
+        } else if (federationUAMSum.containsKey(appId)) {
+          // Merge the current UAM with its own UAM and update the list of UAM
+          ApplicationReport mergedUAMReport =
+              mergeUAMWithUAM(federationUAMSum.get(appId), appReport);
+          federationUAMSum.put(appId, mergedUAMReport);
+        } else {
+          // Insert in the list of UAM
+          federationUAMSum.put(appId, appReport);
+        }
+      }
+    }
+    // Check the remaining UAMs are depending or not from federation
+    for (ApplicationReport appReport : federationUAMSum.values()) {
+      if (mergeUamToReport(appReport.getName(), returnPartialResult)) {
+        federationAM.put(appReport.getApplicationId(), appReport);
+      }
+    }
+
+    return GetApplicationsResponse.newInstance(federationAM.values());
+  }
+
+  private static ApplicationReport mergeUAMWithUAM(ApplicationReport uam1,
+      ApplicationReport uam2){
+    uam1.setName(PARTIAL_REPORT + uam1.getApplicationId());
+    mergeAMWithUAM(uam1, uam2);
+    return uam1;
+  }
+
+  private static void mergeAMWithUAM(ApplicationReport am,
+      ApplicationReport uam){
+    ApplicationResourceUsageReport amResourceReport =

Review comment:
       For safe side we could do a null check for ApplicationUsageReport

##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java
##########
@@ -52,4 +63,131 @@ public static GetClusterMetricsResponse merge(
     }
     return GetClusterMetricsResponse.newInstance(tmp);
   }
+
+  /**
+   * Merges a list of ApplicationReports grouping by ApplicationId.
+   * Our current policy is to merge the application reports from the reachable
+   * SubClusters.
+   * @param responses a list of ApplicationResponse to merge
+   * @param returnPartialResult if the merge ApplicationReports should contain
+   * partial result or not
+   * @return the merged ApplicationsResponse
+   */
+  public static GetApplicationsResponse mergeApplications(
+      Collection<GetApplicationsResponse> responses,
+      boolean returnPartialResult){
+    Map<ApplicationId, ApplicationReport> federationAM = new HashMap<>();
+    Map<ApplicationId, ApplicationReport> federationUAMSum = new HashMap<>();
+
+    for (GetApplicationsResponse appResponse : responses){
+      for (ApplicationReport appReport : appResponse.getApplicationList()){
+        ApplicationId appId = appReport.getApplicationId();
+        // Check if this ApplicationReport is an AM
+        if (appReport.getHost() != null) {
+          // Insert in the list of AM
+          federationAM.put(appId, appReport);
+          // Check if there are any UAM found before
+          if (federationUAMSum.containsKey(appId)) {
+            // Merge the current AM with the found UAM
+            mergeAMWithUAM(appReport, federationUAMSum.get(appId));
+            // Remove the sum of the UAMs
+            federationUAMSum.remove(appId);
+          }
+          // This ApplicationReport is an UAM
+        } else if (federationAM.containsKey(appId)) {
+          // Merge the current UAM with its own AM
+          mergeAMWithUAM(federationAM.get(appId), appReport);
+        } else if (federationUAMSum.containsKey(appId)) {
+          // Merge the current UAM with its own UAM and update the list of UAM
+          ApplicationReport mergedUAMReport =
+              mergeUAMWithUAM(federationUAMSum.get(appId), appReport);
+          federationUAMSum.put(appId, mergedUAMReport);
+        } else {
+          // Insert in the list of UAM
+          federationUAMSum.put(appId, appReport);
+        }
+      }
+    }
+    // Check the remaining UAMs are depending or not from federation
+    for (ApplicationReport appReport : federationUAMSum.values()) {
+      if (mergeUamToReport(appReport.getName(), returnPartialResult)) {
+        federationAM.put(appReport.getApplicationId(), appReport);
+      }
+    }
+
+    return GetApplicationsResponse.newInstance(federationAM.values());
+  }
+
+  private static ApplicationReport mergeUAMWithUAM(ApplicationReport uam1,
+      ApplicationReport uam2){
+    uam1.setName(PARTIAL_REPORT + uam1.getApplicationId());
+    mergeAMWithUAM(uam1, uam2);
+    return uam1;
+  }
+
+  private static void mergeAMWithUAM(ApplicationReport am,
+      ApplicationReport uam){
+    ApplicationResourceUsageReport amResourceReport =
+        am.getApplicationResourceUsageReport();
+
+    ApplicationResourceUsageReport uamResourceReport =
+        uam.getApplicationResourceUsageReport();
+
+    amResourceReport.setNumUsedContainers(
+        amResourceReport.getNumUsedContainers() +
+            uamResourceReport.getNumUsedContainers());
+
+    amResourceReport.setNumReservedContainers(
+        amResourceReport.getNumReservedContainers() +
+            uamResourceReport.getNumReservedContainers());
+
+    amResourceReport.setUsedResources(Resources.add(
+        amResourceReport.getUsedResources(),
+        uamResourceReport.getUsedResources()));
+
+    amResourceReport.setReservedResources(Resources.add(
+        amResourceReport.getReservedResources(),
+        uamResourceReport.getReservedResources()));
+
+    amResourceReport.setNeededResources(Resources.add(
+        amResourceReport.getNeededResources(),
+        uamResourceReport.getNeededResources()));
+
+    amResourceReport.setMemorySeconds(
+        amResourceReport.getMemorySeconds() +
+            uamResourceReport.getMemorySeconds());
+
+    amResourceReport.setVcoreSeconds(
+        amResourceReport.getVcoreSeconds() +
+            uamResourceReport.getVcoreSeconds());
+
+    amResourceReport.setQueueUsagePercentage(
+        amResourceReport.getQueueUsagePercentage() +
+            uamResourceReport.getQueueUsagePercentage());
+
+    amResourceReport.setClusterUsagePercentage(
+        amResourceReport.getClusterUsagePercentage() +
+            uamResourceReport.getClusterUsagePercentage());
+
+    am.setApplicationResourceUsageReport(amResourceReport);
+    am.getApplicationTags().addAll(uam.getApplicationTags());

Review comment:
       Tag need to be fetched only from managed AM.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org