You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2022/10/11 21:53:32 UTC

[hadoop] branch trunk updated: YARN-11323. [Federation] Improve ResourceManager Handler FinishApps. (#4954)

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d78b0b39a63 YARN-11323. [Federation] Improve ResourceManager Handler FinishApps. (#4954)
d78b0b39a63 is described below

commit d78b0b39a6360c7131662c818dd498fc644484d7
Author: slfan1989 <55...@users.noreply.github.com>
AuthorDate: Wed Oct 12 05:53:02 2022 +0800

    YARN-11323. [Federation] Improve ResourceManager Handler FinishApps. (#4954)
---
 .../apache/hadoop/yarn/conf/YarnConfiguration.java |  11 +
 .../src/main/resources/yarn-default.xml            |  20 ++
 .../federation/retry/FederationActionRetry.java    |  45 ++++
 .../yarn/server/federation/retry/package-info.java |  19 ++
 .../yarn/server/resourcemanager/RMAppManager.java  |  43 ++++
 .../server/resourcemanager/ResourceManager.java    |   8 +
 .../federation/FederationStateStoreService.java    | 175 +++++++++++++
 .../TestFederationRMStateStoreService.java         | 275 +++++++++++++++++++++
 8 files changed, 596 insertions(+)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 7427533fbe7..2b7c01b0bd9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -4061,6 +4061,17 @@ public class YarnConfiguration extends Configuration {
 
   public static final int DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS = 1000;
 
+  public static final String FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT =
+      FEDERATION_PREFIX + "state-store.clean-up-retry-count";
+
+  public static final int DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT = 1;
+
+  public static final String FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME =
+      FEDERATION_PREFIX + "state-store.clean-up-retry-sleep-time";
+
+  public static final long DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME =
+      TimeUnit.SECONDS.toMillis(1);
+
   public static final String ROUTER_PREFIX = YARN_PREFIX + "router.";
 
   public static final String ROUTER_BIND_HOST = ROUTER_PREFIX + "bind-host";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index fdfa1296dc5..638818ce33d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3727,6 +3727,26 @@
     <value>yarnfederation/</value>
   </property>
 
+  <property>
+    <description>
+      The number of retries to clear the app in the FederationStateStore,
+      the default value is 1, that is, after the app fails to clean up, it will retry the cleanup again.
+    </description>
+    <name>yarn.federation.state-store.clean-up-retry-count</name>
+    <value>1</value>
+  </property>
+
+  <property>
+    <description>
+      Clear the sleep time of App retry in FederationStateStore.
+      When the app fails to clean up,
+      it will sleep for a period of time and then try to clean up.
+      The default value is 1s.
+    </description>
+    <name>yarn.federation.state-store.clean-up-retry-sleep-time</name>
+    <value>1s</value>
+  </property>
+
   <!-- Other Configuration -->
 
   <property>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java
new file mode 100644
index 00000000000..634e7689645
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.retry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public interface FederationActionRetry<T> {
+
+  Logger LOG = LoggerFactory.getLogger(FederationActionRetry.class);
+
+  T run() throws Exception;
+
+  default T runWithRetries(int retryCount, long retrySleepTime) throws Exception {
+    int retry = 0;
+    while (true) {
+      try {
+        return run();
+      } catch (Exception e) {
+        LOG.info("Exception while executing an Federation operation.", e);
+        if (++retry > retryCount) {
+          LOG.info("Maxed out Federation retries. Giving up!");
+          throw e;
+        }
+        LOG.info("Retrying operation on Federation. Retry no. {}", retry);
+        Thread.sleep(retrySleepTime);
+      }
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/package-info.java
new file mode 100644
index 00000000000..5d8477cfe59
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Federation Retry Policies. **/
+package org.apache.hadoop.yarn.server.federation.retry;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index f847152c47d..87596e16ee6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -28,9 +28,11 @@ import java.util.concurrent.Future;
 
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.security.ConfiguredYarnAuthorizer;
 import org.apache.hadoop.yarn.security.Permission;
 import org.apache.hadoop.yarn.security.PrivilegedEntity;
+import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreService;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -114,6 +116,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
   private boolean nodeLabelsEnabled;
   private Set<String> exclusiveEnforcedPartitions;
   private String amDefaultNodeLabel;
+  private FederationStateStoreService federationStateStoreService;
 
   private static final String USER_ID_PREFIX = "userid=";
 
@@ -347,6 +350,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
           + ", removing app " + removeApp.getApplicationId()
           + " from state store.");
       rmContext.getStateStore().removeApplication(removeApp);
+      removeApplicationIdFromStateStore(removeId);
       completedAppsInStateStore--;
     }
 
@@ -358,6 +362,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
           + this.maxCompletedAppsInMemory + ", removing app " + removeId
           + " from memory: ");
       rmContext.getRMApps().remove(removeId);
+      removeApplicationIdFromStateStore(removeId);
       this.applicationACLsManager.removeApplication(removeId);
     }
   }
@@ -1054,4 +1059,42 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
       context.setQueue(placementContext.getQueue());
     }
   }
+
+  @VisibleForTesting
+  public void setFederationStateStoreService(FederationStateStoreService stateStoreService) {
+    this.federationStateStoreService = stateStoreService;
+  }
+
+  /**
+   * Remove ApplicationId From StateStore.
+   *
+   * @param appId appId
+   */
+  private void removeApplicationIdFromStateStore(ApplicationId appId) {
+    if (HAUtil.isFederationEnabled(conf) && federationStateStoreService != null) {
+      try {
+        boolean cleanUpResult =
+            federationStateStoreService.cleanUpFinishApplicationsWithRetries(appId, true);
+        if(cleanUpResult){
+          LOG.info("applicationId = {} remove from state store success.", appId);
+        } else {
+          LOG.warn("applicationId = {} remove from state store failed.", appId);
+        }
+      } catch (Exception e) {
+        LOG.error("applicationId = {} remove from state store error.", appId, e);
+      }
+    }
+  }
+
+  // just test using
+  @VisibleForTesting
+  public void checkAppNumCompletedLimit4Test() {
+    checkAppNumCompletedLimit();
+  }
+
+  // just test using
+  @VisibleForTesting
+  public void finishApplication4Test(ApplicationId applicationId) {
+    finishApplication(applicationId);
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 8adcff42a69..1bcfdbbafa8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -917,6 +917,7 @@ public class ResourceManager extends CompositeService
         }
         federationStateStoreService = createFederationStateStoreService();
         addIfService(federationStateStoreService);
+        rmAppManager.setFederationStateStoreService(federationStateStoreService);
         LOG.info("Initialized Federation membership.");
       }
 
@@ -996,6 +997,13 @@ public class ResourceManager extends CompositeService
           RMState state = rmStore.loadState();
           recover(state);
           LOG.info("Recovery ended");
+
+          // Make sure that the App is cleaned up after the RM memory is restored.
+          if (HAUtil.isFederationEnabled(conf)) {
+            federationStateStoreService.
+                createCleanUpFinishApplicationThread("Recovery");
+          }
+
         } catch (Exception e) {
           // the Exception from loadState() needs to be handled for
           // HA and we need to give up master status if we got fenced
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java
index 060540d01ee..1d67af926d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java
@@ -20,8 +20,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.federation;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.List;
 
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.hadoop.conf.Configuration;
@@ -29,9 +32,11 @@ import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.retry.FederationActionRetry;
 import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
@@ -74,10 +79,12 @@ import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationH
 import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
 import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -102,6 +109,9 @@ public class FederationStateStoreService extends AbstractService
   private long heartbeatInterval;
   private long heartbeatInitialDelay;
   private RMContext rmContext;
+  private String cleanUpThreadNamePrefix = "FederationStateStoreService-Clean-Thread";
+  private int cleanUpRetryCountNum;
+  private long cleanUpRetrySleepTime;
 
   public FederationStateStoreService(RMContext rmContext) {
     super(FederationStateStoreService.class.getName());
@@ -149,6 +159,15 @@ public class FederationStateStoreService extends AbstractService
       heartbeatInitialDelay =
           YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY;
     }
+
+    cleanUpRetryCountNum = conf.getInt(YarnConfiguration.FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT,
+        YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT);
+
+    cleanUpRetrySleepTime = conf.getTimeDuration(
+        YarnConfiguration.FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME,
+        YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME,
+        TimeUnit.MILLISECONDS);
+
     LOG.info("Initialized federation membership service.");
 
     super.serviceInit(conf);
@@ -378,4 +397,160 @@ public class FederationStateStoreService extends AbstractService
       throws YarnException, IOException {
     throw new NotImplementedException("Code is not implemented");
   }
+
+  /**
+   * Create a thread that cleans up the app.
+   * @param stage rm-start/rm-stop.
+   */
+  public void createCleanUpFinishApplicationThread(String stage) {
+    String threadName = cleanUpThreadNamePrefix + "-" + stage;
+    Thread finishApplicationThread = new Thread(createCleanUpFinishApplicationThread());
+    finishApplicationThread.setName(threadName);
+    finishApplicationThread.start();
+    LOG.info("CleanUpFinishApplicationThread has been started {}.", threadName);
+  }
+
+  /**
+   * Create a thread that cleans up the apps.
+   *
+   * @return thread object.
+   */
+  private Runnable createCleanUpFinishApplicationThread() {
+    return () -> {
+      createCleanUpFinishApplication();
+    };
+  }
+
+  /**
+   * cleans up the apps.
+   */
+  private void createCleanUpFinishApplication() {
+    try {
+      // Get the current RM's App list based on subClusterId
+      GetApplicationsHomeSubClusterRequest request =
+          GetApplicationsHomeSubClusterRequest.newInstance(subClusterId);
+      GetApplicationsHomeSubClusterResponse response =
+          getApplicationsHomeSubCluster(request);
+      List<ApplicationHomeSubCluster> applicationHomeSCs = response.getAppsHomeSubClusters();
+
+      // Traverse the app list and clean up the app.
+      long successCleanUpAppCount = 0;
+
+      // Save a local copy of the map so that it won't change with the map
+      Map<ApplicationId, RMApp> rmApps = new HashMap<>(this.rmContext.getRMApps());
+
+      // Need to make sure there is app list in RM memory.
+      if (rmApps != null && !rmApps.isEmpty()) {
+        for (ApplicationHomeSubCluster applicationHomeSC : applicationHomeSCs) {
+          ApplicationId applicationId = applicationHomeSC.getApplicationId();
+          if (!rmApps.containsKey(applicationId)) {
+            try {
+              Boolean cleanUpSuccess = cleanUpFinishApplicationsWithRetries(applicationId, false);
+              if (cleanUpSuccess) {
+                LOG.info("application = {} has been cleaned up successfully.", applicationId);
+                successCleanUpAppCount++;
+              }
+            } catch (Exception e) {
+              LOG.error("problem during application = {} cleanup.", applicationId, e);
+            }
+          }
+        }
+      }
+
+      // print app cleanup log
+      LOG.info("cleanup finished applications size = {}, number = {} successful cleanup.",
+          applicationHomeSCs.size(), successCleanUpAppCount);
+    } catch (Exception e) {
+      LOG.error("problem during cleanup applications.", e);
+    }
+  }
+
+  /**
+   * Clean up the federation completed Application.
+   *
+   * @param appId app id.
+   * @param isQuery true, need to query from statestore, false not query.
+   * @throws Exception exception occurs.
+   * @return true, successfully deleted; false, failed to delete or no need to delete
+   */
+  public boolean cleanUpFinishApplicationsWithRetries(ApplicationId appId, boolean isQuery)
+      throws Exception {
+
+    // Generate a request to delete data
+    DeleteApplicationHomeSubClusterRequest request =
+        DeleteApplicationHomeSubClusterRequest.newInstance(appId);
+
+    // CleanUp Finish App.
+    return ((FederationActionRetry<Boolean>) () -> invokeCleanUpFinishApp(appId, isQuery, request))
+        .runWithRetries(cleanUpRetryCountNum, cleanUpRetrySleepTime);
+  }
+
+  /**
+   * CleanUp Finish App.
+   *
+   * @param applicationId app id.
+   * @param isQuery true, need to query from statestore, false not query.
+   * @param delRequest delete Application Request
+   * @return true, successfully deleted; false, failed to delete or no need to delete
+   * @throws YarnException
+   */
+  private boolean invokeCleanUpFinishApp(ApplicationId applicationId, boolean isQuery,
+      DeleteApplicationHomeSubClusterRequest delRequest) throws YarnException {
+    boolean isAppNeedClean = true;
+    // If we need to query the StateStore
+    if (isQuery) {
+      isAppNeedClean = isApplicationNeedClean(applicationId);
+    }
+    // When the App needs to be cleaned up, clean up the App.
+    if (isAppNeedClean) {
+      DeleteApplicationHomeSubClusterResponse response =
+          deleteApplicationHomeSubCluster(delRequest);
+      if (response != null) {
+        LOG.info("The applicationId = {} has been successfully cleaned up.", applicationId);
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Used to determine whether the Application is cleaned up.
+   *
+   * When the app in the RM is completed,
+   * the HomeSC corresponding to the app will be queried in the StateStore.
+   * If the current RM is the HomeSC, the completed app will be cleaned up.
+   *
+   * @param applicationId applicationId
+   * @return true, app needs to be cleaned up;
+   *         false, app doesn't need to be cleaned up.
+   */
+  private boolean isApplicationNeedClean(ApplicationId applicationId) {
+    GetApplicationHomeSubClusterRequest queryRequest =
+            GetApplicationHomeSubClusterRequest.newInstance(applicationId);
+    // Here we need to use try...catch,
+    // because getApplicationHomeSubCluster may throw not exist exception
+    try {
+      GetApplicationHomeSubClusterResponse queryResp =
+          getApplicationHomeSubCluster(queryRequest);
+      if (queryResp != null) {
+        ApplicationHomeSubCluster appHomeSC = queryResp.getApplicationHomeSubCluster();
+        SubClusterId homeSubClusterId = appHomeSC.getHomeSubCluster();
+        if (!subClusterId.equals(homeSubClusterId)) {
+          LOG.warn("The homeSubCluster of applicationId = {} belong subCluster = {}, " +
+              " not belong subCluster = {} and is not allowed to delete.",
+              applicationId, homeSubClusterId, subClusterId);
+          return false;
+        }
+      } else {
+        LOG.warn("The applicationId = {} not belong subCluster = {} " +
+            " and is not allowed to delete.", applicationId, subClusterId);
+        return false;
+      }
+    } catch (Exception e) {
+      LOG.warn("query applicationId = {} error.", applicationId, e);
+      return false;
+    }
+    return true;
+  }
+
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java
index e8ebdd5bedd..b8e2ce6ef32 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java
@@ -20,22 +20,46 @@ package org.apache.hadoop.yarn.server.resourcemanager.federation;
 import java.io.IOException;
 import java.io.StringReader;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 import javax.xml.bind.JAXBException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.MockApps;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
 import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
 import org.junit.After;
 import org.junit.Assert;
@@ -46,6 +70,8 @@ import com.sun.jersey.api.json.JSONConfiguration;
 import com.sun.jersey.api.json.JSONJAXBContext;
 import com.sun.jersey.api.json.JSONUnmarshaller;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Unit tests for FederationStateStoreService.
  */
@@ -207,4 +233,253 @@ public class TestFederationRMStateStoreService {
         "Started federation membership heartbeat with interval: 300 and initial delay: 10"));
     rm.stop();
   }
+
+  @Test
+  public void testCleanUpApplication() throws Exception {
+
+    // set yarn configuration
+    conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+    conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10);
+    conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
+
+    // set up MockRM
+    final MockRM rm = new MockRM(conf);
+    rm.init(conf);
+    stateStore = rm.getFederationStateStoreService().getStateStoreClient();
+    rm.start();
+
+    // init subCluster Heartbeat,
+    // and check that the subCluster is in a running state
+    FederationStateStoreService stateStoreService =
+        rm.getFederationStateStoreService();
+    FederationStateStoreHeartbeat storeHeartbeat =
+        stateStoreService.getStateStoreHeartbeatThread();
+    storeHeartbeat.run();
+    checkSubClusterInfo(SubClusterState.SC_RUNNING);
+
+    // generate an application and join the [SC-1] cluster
+    ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+    addApplication2StateStore(appId, stateStore);
+
+    // make sure the app can be queried in the stateStore
+    GetApplicationHomeSubClusterRequest appRequest =
+         GetApplicationHomeSubClusterRequest.newInstance(appId);
+    GetApplicationHomeSubClusterResponse response =
+         stateStore.getApplicationHomeSubCluster(appRequest);
+    Assert.assertNotNull(response);
+    ApplicationHomeSubCluster appHomeSubCluster = response.getApplicationHomeSubCluster();
+    Assert.assertNotNull(appHomeSubCluster);
+    Assert.assertNotNull(appHomeSubCluster.getApplicationId());
+    Assert.assertEquals(appId, appHomeSubCluster.getApplicationId());
+
+    // clean up the app.
+    boolean cleanUpResult =
+        stateStoreService.cleanUpFinishApplicationsWithRetries(appId, true);
+    Assert.assertTrue(cleanUpResult);
+
+    // after clean, the app can no longer be queried from the stateStore.
+    LambdaTestUtils.intercept(FederationStateStoreException.class,
+        "Application " + appId + " does not exist",
+        () -> stateStore.getApplicationHomeSubCluster(appRequest));
+
+  }
+
+  @Test
+  public void testCleanUpApplicationWhenRMStart() throws Exception {
+
+    // We design such a test case.
+    // Step1. We add app01, app02, app03 to the stateStore,
+    // But these apps are not in RM's RMContext, they are finished apps
+    // Step2. We simulate RM startup, there is only app04 in RMContext.
+    // Step3. We wait for 5 seconds, the automatic cleanup thread should clean up finished apps.
+
+    // set yarn configuration.
+    conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+    conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10);
+    conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
+    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+
+    // set up MockRM.
+    MockRM rm = new MockRM(conf);
+    rm.init(conf);
+    stateStore = rm.getFederationStateStoreService().getStateStoreClient();
+
+    // generate an [app01] and join the [SC-1] cluster.
+    List<ApplicationId> appIds = new ArrayList<>();
+    ApplicationId appId01 = ApplicationId.newInstance(Time.now(), 1);
+    addApplication2StateStore(appId01, stateStore);
+    appIds.add(appId01);
+
+    // generate an [app02] and join the [SC-1] cluster.
+    ApplicationId appId02 = ApplicationId.newInstance(Time.now(), 2);
+    addApplication2StateStore(appId02, stateStore);
+    appIds.add(appId02);
+
+    // generate an [app03] and join the [SC-1] cluster.
+    ApplicationId appId03 = ApplicationId.newInstance(Time.now(), 3);
+    addApplication2StateStore(appId03, stateStore);
+    appIds.add(appId03);
+
+    // make sure the apps can be queried in the stateStore.
+    GetApplicationsHomeSubClusterRequest allRequest =
+        GetApplicationsHomeSubClusterRequest.newInstance(subClusterId);
+    GetApplicationsHomeSubClusterResponse allResponse =
+        stateStore.getApplicationsHomeSubCluster(allRequest);
+    Assert.assertNotNull(allResponse);
+    List<ApplicationHomeSubCluster> appHomeSCLists = allResponse.getAppsHomeSubClusters();
+    Assert.assertNotNull(appHomeSCLists);
+    Assert.assertEquals(3, appHomeSCLists.size());
+
+    // app04 exists in both RM memory and stateStore.
+    ApplicationId appId04 = ApplicationId.newInstance(Time.now(), 4);
+    addApplication2StateStore(appId04, stateStore);
+    addApplication2RMAppManager(rm, appId04);
+
+    // start rm.
+    rm.start();
+
+    // wait 5s, wait for the thread to finish cleaning up.
+    GenericTestUtils.waitFor(() -> {
+      int appsSize = 0;
+      try {
+        List<ApplicationHomeSubCluster> subClusters =
+            getApplicationsFromStateStore();
+        Assert.assertNotNull(subClusters);
+        appsSize = subClusters.size();
+      } catch (YarnException e) {
+        e.printStackTrace();
+      }
+      return (appsSize == 1);
+    }, 100, 1000 * 5);
+
+    // check the app to make sure the apps(app01,app02,app03) doesn't exist.
+    for (ApplicationId appId : appIds) {
+      GetApplicationHomeSubClusterRequest appRequest =
+          GetApplicationHomeSubClusterRequest.newInstance(appId);
+      LambdaTestUtils.intercept(FederationStateStoreException.class,
+          "Application " + appId + " does not exist",
+          () -> stateStore.getApplicationHomeSubCluster(appRequest));
+    }
+
+    if (rm != null) {
+      rm.stop();
+      rm = null;
+    }
+  }
+
+  @Test
+  public void testCleanUpApplicationWhenRMCompleteOneApp() throws Exception {
+
+    // We design such a test case.
+    // Step1. We start RM,Set the RM memory to keep a maximum of 1 completed app.
+    // Step2. Register app[01-03] to RM memory & stateStore.
+    // Step3. We clean up app01, app02, app03, at this time,
+    // app01, app02 should be cleaned up from statestore, app03 should remain in statestore.
+
+    // set yarn configuration.
+    conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+    conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10);
+    conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
+    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+    conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 1);
+    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+
+    // set up MockRM.
+    MockRM rm = new MockRM(conf);
+    rm.init(conf);
+    stateStore = rm.getFederationStateStoreService().getStateStoreClient();
+    rm.start();
+
+    // generate an [app01] and join the [SC-1] cluster.
+    List<ApplicationId> appIds = new ArrayList<>();
+    ApplicationId appId01 = ApplicationId.newInstance(Time.now(), 1);
+    addApplication2StateStore(appId01, stateStore);
+    addApplication2RMAppManager(rm, appId01);
+    appIds.add(appId01);
+
+    // generate an [app02] and join the [SC-1] cluster.
+    ApplicationId appId02 = ApplicationId.newInstance(Time.now(), 2);
+    addApplication2StateStore(appId02, stateStore);
+    addApplication2RMAppManager(rm, appId02);
+    appIds.add(appId02);
+
+    // generate an [app03] and join the [SC-1] cluster.
+    ApplicationId appId03 = ApplicationId.newInstance(Time.now(), 3);
+    addApplication2StateStore(appId03, stateStore);
+    addApplication2RMAppManager(rm, appId03);
+
+    // rmAppManager
+    RMAppManager rmAppManager = rm.getRMAppManager();
+    rmAppManager.finishApplication4Test(appId01);
+    rmAppManager.finishApplication4Test(appId02);
+    rmAppManager.finishApplication4Test(appId03);
+    rmAppManager.checkAppNumCompletedLimit4Test();
+
+    // app01, app02 should be cleaned from statestore
+    // After the query, it should report the error not exist.
+    for (ApplicationId appId : appIds) {
+      GetApplicationHomeSubClusterRequest appRequest =
+          GetApplicationHomeSubClusterRequest.newInstance(appId);
+      LambdaTestUtils.intercept(FederationStateStoreException.class,
+          "Application " + appId + " does not exist",
+          () -> stateStore.getApplicationHomeSubCluster(appRequest));
+    }
+
+    // app03 should remain in statestore
+    List<ApplicationHomeSubCluster> appHomeScList = getApplicationsFromStateStore();
+    Assert.assertNotNull(appHomeScList);
+    Assert.assertEquals(1, appHomeScList.size());
+    ApplicationHomeSubCluster homeSubCluster = appHomeScList.get(0);
+    Assert.assertNotNull(homeSubCluster);
+    Assert.assertEquals(appId03, homeSubCluster.getApplicationId());
+  }
+
+  private void addApplication2StateStore(ApplicationId appId,
+      FederationStateStore fedStateStore) throws YarnException {
+    ApplicationHomeSubCluster appHomeSC = ApplicationHomeSubCluster.newInstance(
+        appId, subClusterId);
+    AddApplicationHomeSubClusterRequest addHomeSCRequest =
+        AddApplicationHomeSubClusterRequest.newInstance(appHomeSC);
+    fedStateStore.addApplicationHomeSubCluster(addHomeSCRequest);
+  }
+
+  private List<ApplicationHomeSubCluster> getApplicationsFromStateStore() throws YarnException {
+    // make sure the apps can be queried in the stateStore
+    GetApplicationsHomeSubClusterRequest allRequest =
+        GetApplicationsHomeSubClusterRequest.newInstance(subClusterId);
+    GetApplicationsHomeSubClusterResponse allResponse =
+        stateStore.getApplicationsHomeSubCluster(allRequest);
+    Assert.assertNotNull(allResponse);
+    List<ApplicationHomeSubCluster> appHomeSCLists = allResponse.getAppsHomeSubClusters();
+    Assert.assertNotNull(appHomeSCLists);
+    return appHomeSCLists;
+  }
+
+  private void addApplication2RMAppManager(MockRM rm, ApplicationId appId) {
+    RMContext rmContext = rm.getRMContext();
+    Map<ApplicationId, RMApp> rmAppMaps = rmContext.getRMApps();
+    String user = MockApps.newUserName();
+    String name = MockApps.newAppName();
+    String queue = MockApps.newQueue();
+
+    YarnScheduler scheduler = mock(YarnScheduler.class);
+
+    ApplicationMasterService masterService =
+        new ApplicationMasterService(rmContext, scheduler);
+
+    ApplicationSubmissionContext submissionContext =
+        new ApplicationSubmissionContextPBImpl();
+
+    // applicationId will not be used because RMStateStore is mocked,
+    // but applicationId is still set for safety
+    submissionContext.setApplicationId(appId);
+    submissionContext.setPriority(Priority.newInstance(0));
+
+    RMApp application = new RMAppImpl(appId, rmContext, conf, name,
+        user, queue, submissionContext, scheduler, masterService,
+        System.currentTimeMillis(), "YARN", null,
+        new ArrayList<>());
+
+    rmAppMaps.putIfAbsent(application.getApplicationId(), application);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org