You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/09/30 23:24:55 UTC

[GitHub] [hadoop] goiri commented on a diff in pull request #4954: YARN-11323. [Federation] Improve ResourceManager Handler FinishApps.

goiri commented on code in PR #4954:
URL: https://github.com/apache/hadoop/pull/4954#discussion_r984998385


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java:
##########
@@ -378,4 +397,137 @@ public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyReques
       throws YarnException, IOException {
     throw new NotImplementedException("Code is not implemented");
   }
+
+  /**
+   * Create a thread that cleans up the app.
+   * @param stage rm-start/rm-stop.
+   */
+  public void createCleanUpFinishApplicationThread(String stage) {
+    String threadName = cleanUpThreadNamePrefix + "-" + stage;
+    Thread finishApplicationThread = new Thread(createCleanUpFinishApplicationThread());
+    finishApplicationThread.setName(threadName);
+    finishApplicationThread.start();
+    LOG.info("CleanUpFinishApplicationThread has been started {}.", threadName);
+  }
+
+  /**
+   * Create a thread that cleans up the apps.
+   *
+   * @return thread object.
+   */
+  private Runnable createCleanUpFinishApplicationThread() {
+    return () -> {
+      createCleanUpFinishApplication();
+    };
+  }
+
+  /**
+   * cleans up the apps.
+   */
+  private void createCleanUpFinishApplication() {
+    try {
+      // Get the current RM's App list based on subClusterId
+      GetApplicationsHomeSubClusterRequest request =
+              GetApplicationsHomeSubClusterRequest.newInstance(subClusterId);
+      GetApplicationsHomeSubClusterResponse response =
+              getApplicationsHomeSubCluster(request);
+      List<ApplicationHomeSubCluster> applicationHomeSCs = response.getAppsHomeSubClusters();
+
+      // Traverse the app list and clean up the app.
+      long successCleanUpAppCount = 0;
+
+      // Save a local copy of the map so that it won't change with the map
+      Map<ApplicationId, RMApp> rmApps = new HashMap<>(this.rmContext.getRMApps());
+
+      // Need to make sure there is app list in RM memory.
+      if (rmApps != null && !rmApps.isEmpty()) {
+        for (ApplicationHomeSubCluster applicationHomeSC : applicationHomeSCs) {
+          ApplicationId applicationId = applicationHomeSC.getApplicationId();
+          if (!rmApps.containsKey(applicationId)) {
+            try {
+              Boolean cleanUpSuccess =
+                      cleanUpFinishApplicationsWithRetries(applicationId, false);
+              if (cleanUpSuccess) {
+                LOG.info("application = {} has been cleaned up successfully.", applicationId);
+                successCleanUpAppCount++;
+              }
+            } catch (Exception e) {
+              LOG.error("problem during application = {} cleanup.", applicationId, e);
+            }
+          }
+        }
+      }
+
+      // print app cleanup log
+      LOG.info("cleanup finished applications size = {}, number = {} successful cleanup.",
+              applicationHomeSCs.size(), successCleanUpAppCount);
+    } catch (Exception e) {
+      LOG.error("problem during cleanup applications.", e);
+    }
+  }
+
+  /**
+   * Clean up the federation completed Application.
+   *
+   * @param applicationId app id.
+   * @param isQuery true, need to query from statestore ; false not query.
+   * @throws Exception exception occurs.

Review Comment:
   javadoc return warning



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java:
##########
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.retry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class FederationActionRetry<T> {
+
+  public static final Logger LOG =

Review Comment:
   Single line



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java:
##########
@@ -4061,6 +4061,16 @@ public static boolean isAclEnabled(Configuration conf) {
 
   public static final int DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS = 1000;
 
+  public static final String FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT =
+      FEDERATION_PREFIX + "state-store.clean-up-retry-count";
+
+  public static final int DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT = 1;
+
+  public static final String FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME =
+      FEDERATION_PREFIX + "state-store.clean-up-retry-sleep-time";
+
+  public static final long DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME = 1000;

Review Comment:
   TimeUnit.SECONDS.toMillis(1);



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml:
##########
@@ -3727,6 +3727,26 @@
     <value>yarnfederation/</value>
   </property>
 
+  <property>
+    <description>
+      The number of retries to clear the app in the FederationStateStore,
+      the default value is 1, that is, after the app fails to clean up, it will retry the cleanup again.
+    </description>
+    <name>yarn.federation.state-store.clean-up-retry-count</name>
+    <value>1</value>
+  </property>
+
+  <property>
+    <description>
+      Clear the sleep time of App retry in FederationStateStore.
+      When the app fails to clean up,
+      it will sleep for a period of time and then try to clean up.
+      The default value is 1000ms.
+    </description>
+    <name>yarn.federation.state-store.clean-up-retry-sleep-time</name>
+    <value>1000ms</value>

Review Comment:
   1s



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java:
##########
@@ -207,4 +233,254 @@ public void testFederationStateStoreServiceInitialHeartbeatDelay() throws Except
         "Started federation membership heartbeat with interval: 300 and initial delay: 10"));
     rm.stop();
   }
+
+  @Test
+  public void testCleanUpApplication() throws Exception {
+
+    // set yarn configuration
+    conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+    conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10);
+    conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
+
+    // set up MockRM
+    final MockRM rm = new MockRM(conf);
+    rm.init(conf);
+    stateStore = rm.getFederationStateStoreService().getStateStoreClient();
+    rm.start();
+
+    // init subCluster Heartbeat,
+    // and check that the subCluster is in a running state
+    FederationStateStoreService stateStoreService =
+        rm.getFederationStateStoreService();
+    FederationStateStoreHeartbeat storeHeartbeat =
+        stateStoreService.getStateStoreHeartbeatThread();
+    storeHeartbeat.run();
+    checkSubClusterInfo(SubClusterState.SC_RUNNING);
+
+    // generate an application and join the [SC-1] cluster
+    ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+    addApplication2StateStore(appId, stateStore);
+
+    // make sure the app can be queried in the stateStore
+    GetApplicationHomeSubClusterRequest appRequest =
+         GetApplicationHomeSubClusterRequest.newInstance(appId);
+    GetApplicationHomeSubClusterResponse response =
+         stateStore.getApplicationHomeSubCluster(appRequest);
+    Assert.assertNotNull(response);
+    ApplicationHomeSubCluster appHomeSubCluster = response.getApplicationHomeSubCluster();
+    Assert.assertNotNull(appHomeSubCluster);
+    Assert.assertNotNull(appHomeSubCluster.getApplicationId());
+    Assert.assertEquals(appId, appHomeSubCluster.getApplicationId());
+
+    // clean up the app.
+    boolean cleanUpResult =
+        stateStoreService.cleanUpFinishApplicationsWithRetries(appId, true);
+    Assert.assertTrue(cleanUpResult);
+
+    // after clean, the app can no longer be queried from the stateStore.
+    LambdaTestUtils.intercept(FederationStateStoreException.class,
+        "Application " + appId + " does not exist",
+        () -> stateStore.getApplicationHomeSubCluster(appRequest));
+
+  }
+
+  @Test
+  public void testCleanUpApplicationWhenRMStart() throws Exception {
+
+    // We design such a test case.
+    // Step1. We add app01, app02, app03 to the stateStore,
+    // But these apps are not in RM's RMContext, they are finished apps
+    // Step2. We simulate RM startup, there is only app04 in RMContext.
+    // Step3. We wait for 5 seconds, the automatic cleanup thread should clean up finished apps.
+
+    // set yarn configuration.
+    conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+    conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10);
+    conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
+    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+
+    // set up MockRM.
+    MockRM rm = new MockRM(conf);
+    rm.init(conf);
+    stateStore = rm.getFederationStateStoreService().getStateStoreClient();
+
+    // generate an [app01] and join the [SC-1] cluster.
+    List<ApplicationId> appIds = new ArrayList();

Review Comment:
   Type



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java:
##########
@@ -378,4 +397,137 @@ public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyReques
       throws YarnException, IOException {
     throw new NotImplementedException("Code is not implemented");
   }
+
+  /**
+   * Create a thread that cleans up the app.
+   * @param stage rm-start/rm-stop.
+   */
+  public void createCleanUpFinishApplicationThread(String stage) {
+    String threadName = cleanUpThreadNamePrefix + "-" + stage;
+    Thread finishApplicationThread = new Thread(createCleanUpFinishApplicationThread());
+    finishApplicationThread.setName(threadName);
+    finishApplicationThread.start();
+    LOG.info("CleanUpFinishApplicationThread has been started {}.", threadName);
+  }
+
+  /**
+   * Create a thread that cleans up the apps.
+   *
+   * @return thread object.
+   */
+  private Runnable createCleanUpFinishApplicationThread() {
+    return () -> {
+      createCleanUpFinishApplication();
+    };
+  }
+
+  /**
+   * cleans up the apps.
+   */
+  private void createCleanUpFinishApplication() {
+    try {
+      // Get the current RM's App list based on subClusterId
+      GetApplicationsHomeSubClusterRequest request =
+              GetApplicationsHomeSubClusterRequest.newInstance(subClusterId);
+      GetApplicationsHomeSubClusterResponse response =
+              getApplicationsHomeSubCluster(request);
+      List<ApplicationHomeSubCluster> applicationHomeSCs = response.getAppsHomeSubClusters();
+
+      // Traverse the app list and clean up the app.
+      long successCleanUpAppCount = 0;
+
+      // Save a local copy of the map so that it won't change with the map
+      Map<ApplicationId, RMApp> rmApps = new HashMap<>(this.rmContext.getRMApps());
+
+      // Need to make sure there is app list in RM memory.
+      if (rmApps != null && !rmApps.isEmpty()) {
+        for (ApplicationHomeSubCluster applicationHomeSC : applicationHomeSCs) {
+          ApplicationId applicationId = applicationHomeSC.getApplicationId();
+          if (!rmApps.containsKey(applicationId)) {
+            try {
+              Boolean cleanUpSuccess =
+                      cleanUpFinishApplicationsWithRetries(applicationId, false);
+              if (cleanUpSuccess) {
+                LOG.info("application = {} has been cleaned up successfully.", applicationId);
+                successCleanUpAppCount++;
+              }
+            } catch (Exception e) {
+              LOG.error("problem during application = {} cleanup.", applicationId, e);
+            }
+          }
+        }
+      }
+
+      // print app cleanup log
+      LOG.info("cleanup finished applications size = {}, number = {} successful cleanup.",
+              applicationHomeSCs.size(), successCleanUpAppCount);
+    } catch (Exception e) {
+      LOG.error("problem during cleanup applications.", e);
+    }
+  }
+
+  /**
+   * Clean up the federation completed Application.
+   *
+   * @param applicationId app id.
+   * @param isQuery true, need to query from statestore ; false not query.
+   * @throws Exception exception occurs.
+   */
+  public boolean cleanUpFinishApplicationsWithRetries(ApplicationId applicationId, boolean isQuery)
+      throws Exception {
+
+    // Generate a request to delete data
+    DeleteApplicationHomeSubClusterRequest delRequest =
+        DeleteApplicationHomeSubClusterRequest.newInstance(applicationId);
+
+    return new FederationActionRetry<Boolean>() {
+      @Override
+      public Boolean run() throws Exception {

Review Comment:
   Can this be written as a lambda?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org