You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/09/29 23:14:44 UTC

[GitHub] [hadoop] goiri commented on a diff in pull request #4954: YARN-11323. [Federation] Improve Router Handler FinishApps.

goiri commented on code in PR #4954:
URL: https://github.com/apache/hadoop/pull/4954#discussion_r984059221


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java:
##########
@@ -1054,4 +1060,24 @@ private void copyPlacementQueueToSubmissionContext(
       context.setQueue(placementContext.getQueue());
     }
   }
+
+  @VisibleForTesting
+  public void setFederationStateStoreService(FederationStateStoreService stateStoreService) {
+    this.federationStateStoreService = stateStoreService;
+  }
+
+  private void removeApplicationIdFromStateStore(ApplicationId applicationId) {
+    if (HAUtil.isFederationEnabled(conf) && federationStateStoreService != null) {
+      try {
+        DeleteApplicationHomeSubClusterResponse response =
+            federationStateStoreService.cleanUpFinishApplicationsWithRetries(applicationId);
+        if (response != null) {
+          LOG.info("applicationId = {} remove from state store success.",
+              applicationId);

Review Comment:
   Make the var appId and single line



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java:
##########
@@ -378,4 +382,104 @@ public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyReques
       throws YarnException, IOException {
     throw new NotImplementedException("Code is not implemented");
   }
+
+  /**
+   * Create a thread that cleans up the app.
+   * @param stage rm-start/rm-stop.
+   */
+  public void createCleanUpFinishApplicationThread(String stage) {
+    String threadName = cleanUpThreadNamePrefix + "-" + stage;
+    Thread finishApplicationThread = new Thread(createCleanUpFinishApplicationThread());
+    finishApplicationThread.setName(threadName);
+    finishApplicationThread.start();
+  }
+
+  /**
+   * Create a thread that cleans up the app.
+   *
+   * @return thread object.
+   */
+  private Runnable createCleanUpFinishApplicationThread() {
+    return () -> {
+
+      try {
+        // Get the current RM's App list based on subClusterId
+        GetApplicationsHomeSubClusterRequest request =
+            GetApplicationsHomeSubClusterRequest.newInstance(subClusterId);
+        GetApplicationsHomeSubClusterResponse response =
+            getApplicationsHomeSubCluster(request);
+        List<ApplicationHomeSubCluster> applications = response.getAppsHomeSubClusters();
+
+        // Traverse the app list and clean up the app.
+        long successCleanUpAppCount = 0;
+        for (ApplicationHomeSubCluster application : applications) {
+          ApplicationId applicationId = application.getApplicationId();
+          if (!this.rmContext.getRMApps().containsKey(applicationId)) {
+            try {
+              DeleteApplicationHomeSubClusterResponse deleteResponse =
+                  cleanUpFinishApplicationsWithRetries(applicationId);
+              if (deleteResponse != null) {
+                LOG.info("application = {} has been cleaned up successfully.", applicationId);
+                successCleanUpAppCount++;
+              }
+            } catch (YarnException e) {
+              LOG.error("problem during application = {} cleanup.", applicationId, e);
+            }
+          }
+        }
+
+        // print app cleanup log
+        LOG.info("cleanup finished applications size = {}, number = {} successful cleanups.",
+            applications.size(), successCleanUpAppCount);
+      } catch (Exception e) {
+        LOG.error("problem during cleanup applications.", e);
+      }
+    };
+  }
+
+  /**
+   * Clean up the completed Application.
+   *
+   * @param applicationId app id.
+   * @return DeleteApplicationHomeSubClusterResponse.
+   * @throws Exception exception occurs.
+   */
+  public DeleteApplicationHomeSubClusterResponse
+      cleanUpFinishApplicationsWithRetries(ApplicationId applicationId) throws Exception {
+    DeleteApplicationHomeSubClusterRequest request =
+        DeleteApplicationHomeSubClusterRequest.newInstance(applicationId);
+    return new FederationStateStoreAction<DeleteApplicationHomeSubClusterResponse>() {
+      @Override
+      public DeleteApplicationHomeSubClusterResponse run() throws Exception {
+        return deleteApplicationHomeSubCluster(request);
+      }
+    }.runWithRetries();
+  }
+
+  /**
+   * Define an abstract class, abstract retry method,
+   * which can be used for other methods later.
+   *
+   * @param <T> abstract parameter
+   */
+  private abstract class FederationStateStoreAction<T> {
+    abstract T run() throws Exception;
+
+    T runWithRetries() throws Exception {
+      int retry = 0;
+      while (true) {

Review Comment:
   Doesn't this exist?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org