You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2022/10/11 21:53:32 UTC
[hadoop] branch trunk updated: YARN-11323. [Federation] Improve ResourceManager Handler FinishApps. (#4954)
This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new d78b0b39a63 YARN-11323. [Federation] Improve ResourceManager Handler FinishApps. (#4954)
d78b0b39a63 is described below
commit d78b0b39a6360c7131662c818dd498fc644484d7
Author: slfan1989 <55...@users.noreply.github.com>
AuthorDate: Wed Oct 12 05:53:02 2022 +0800
YARN-11323. [Federation] Improve ResourceManager Handler FinishApps. (#4954)
---
.../apache/hadoop/yarn/conf/YarnConfiguration.java | 11 +
.../src/main/resources/yarn-default.xml | 20 ++
.../federation/retry/FederationActionRetry.java | 45 ++++
.../yarn/server/federation/retry/package-info.java | 19 ++
.../yarn/server/resourcemanager/RMAppManager.java | 43 ++++
.../server/resourcemanager/ResourceManager.java | 8 +
.../federation/FederationStateStoreService.java | 175 +++++++++++++
.../TestFederationRMStateStoreService.java | 275 +++++++++++++++++++++
8 files changed, 596 insertions(+)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 7427533fbe7..2b7c01b0bd9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -4061,6 +4061,17 @@ public class YarnConfiguration extends Configuration {
public static final int DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS = 1000;
+ public static final String FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT =
+ FEDERATION_PREFIX + "state-store.clean-up-retry-count";
+
+ public static final int DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT = 1;
+
+ public static final String FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME =
+ FEDERATION_PREFIX + "state-store.clean-up-retry-sleep-time";
+
+ public static final long DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME =
+ TimeUnit.SECONDS.toMillis(1);
+
public static final String ROUTER_PREFIX = YARN_PREFIX + "router.";
public static final String ROUTER_BIND_HOST = ROUTER_PREFIX + "bind-host";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index fdfa1296dc5..638818ce33d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3727,6 +3727,26 @@
<value>yarnfederation/</value>
</property>
+ <property>
+ <description>
+ The number of retries to clear the app in the FederationStateStore,
+ the default value is 1, that is, after the app fails to clean up, it will retry the cleanup again.
+ </description>
+ <name>yarn.federation.state-store.clean-up-retry-count</name>
+ <value>1</value>
+ </property>
+
+ <property>
+ <description>
+ Clear the sleep time of App retry in FederationStateStore.
+ When the app fails to clean up,
+ it will sleep for a period of time and then try to clean up.
+ The default value is 1s.
+ </description>
+ <name>yarn.federation.state-store.clean-up-retry-sleep-time</name>
+ <value>1s</value>
+ </property>
+
<!-- Other Configuration -->
<property>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java
new file mode 100644
index 00000000000..634e7689645
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.retry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public interface FederationActionRetry<T> {
+
+ Logger LOG = LoggerFactory.getLogger(FederationActionRetry.class);
+
+ T run() throws Exception;
+
+ default T runWithRetries(int retryCount, long retrySleepTime) throws Exception {
+ int retry = 0;
+ while (true) {
+ try {
+ return run();
+ } catch (Exception e) {
+ LOG.info("Exception while executing an Federation operation.", e);
+ if (++retry > retryCount) {
+ LOG.info("Maxed out Federation retries. Giving up!");
+ throw e;
+ }
+ LOG.info("Retrying operation on Federation. Retry no. {}", retry);
+ Thread.sleep(retrySleepTime);
+ }
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/package-info.java
new file mode 100644
index 00000000000..5d8477cfe59
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Federation Retry Policies. **/
+package org.apache.hadoop.yarn.server.federation.retry;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index f847152c47d..87596e16ee6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -28,9 +28,11 @@ import java.util.concurrent.Future;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.security.ConfiguredYarnAuthorizer;
import org.apache.hadoop.yarn.security.Permission;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
+import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreService;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -114,6 +116,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
private boolean nodeLabelsEnabled;
private Set<String> exclusiveEnforcedPartitions;
private String amDefaultNodeLabel;
+ private FederationStateStoreService federationStateStoreService;
private static final String USER_ID_PREFIX = "userid=";
@@ -347,6 +350,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
+ ", removing app " + removeApp.getApplicationId()
+ " from state store.");
rmContext.getStateStore().removeApplication(removeApp);
+ removeApplicationIdFromStateStore(removeId);
completedAppsInStateStore--;
}
@@ -358,6 +362,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
+ this.maxCompletedAppsInMemory + ", removing app " + removeId
+ " from memory: ");
rmContext.getRMApps().remove(removeId);
+ removeApplicationIdFromStateStore(removeId);
this.applicationACLsManager.removeApplication(removeId);
}
}
@@ -1054,4 +1059,42 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
context.setQueue(placementContext.getQueue());
}
}
+
+ @VisibleForTesting
+ public void setFederationStateStoreService(FederationStateStoreService stateStoreService) {
+ this.federationStateStoreService = stateStoreService;
+ }
+
+ /**
+ * Remove ApplicationId From StateStore.
+ *
+ * @param appId appId
+ */
+ private void removeApplicationIdFromStateStore(ApplicationId appId) {
+ if (HAUtil.isFederationEnabled(conf) && federationStateStoreService != null) {
+ try {
+ boolean cleanUpResult =
+ federationStateStoreService.cleanUpFinishApplicationsWithRetries(appId, true);
+ if(cleanUpResult){
+ LOG.info("applicationId = {} remove from state store success.", appId);
+ } else {
+ LOG.warn("applicationId = {} remove from state store failed.", appId);
+ }
+ } catch (Exception e) {
+ LOG.error("applicationId = {} remove from state store error.", appId, e);
+ }
+ }
+ }
+
+ // just test using
+ @VisibleForTesting
+ public void checkAppNumCompletedLimit4Test() {
+ checkAppNumCompletedLimit();
+ }
+
+ // just test using
+ @VisibleForTesting
+ public void finishApplication4Test(ApplicationId applicationId) {
+ finishApplication(applicationId);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 8adcff42a69..1bcfdbbafa8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -917,6 +917,7 @@ public class ResourceManager extends CompositeService
}
federationStateStoreService = createFederationStateStoreService();
addIfService(federationStateStoreService);
+ rmAppManager.setFederationStateStoreService(federationStateStoreService);
LOG.info("Initialized Federation membership.");
}
@@ -996,6 +997,13 @@ public class ResourceManager extends CompositeService
RMState state = rmStore.loadState();
recover(state);
LOG.info("Recovery ended");
+
+ // Make sure that the App is cleaned up after the RM memory is restored.
+ if (HAUtil.isFederationEnabled(conf)) {
+ federationStateStoreService.
+ createCleanUpFinishApplicationThread("Recovery");
+ }
+
} catch (Exception e) {
// the Exception from loadState() needs to be handled for
// HA and we need to give up master status if we got fenced
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java
index 060540d01ee..1d67af926d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java
@@ -20,8 +20,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.federation;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.List;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
@@ -29,9 +32,11 @@ import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.retry.FederationActionRetry;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
@@ -74,10 +79,12 @@ import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationH
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,6 +109,9 @@ public class FederationStateStoreService extends AbstractService
private long heartbeatInterval;
private long heartbeatInitialDelay;
private RMContext rmContext;
+ private String cleanUpThreadNamePrefix = "FederationStateStoreService-Clean-Thread";
+ private int cleanUpRetryCountNum;
+ private long cleanUpRetrySleepTime;
public FederationStateStoreService(RMContext rmContext) {
super(FederationStateStoreService.class.getName());
@@ -149,6 +159,15 @@ public class FederationStateStoreService extends AbstractService
heartbeatInitialDelay =
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY;
}
+
+ cleanUpRetryCountNum = conf.getInt(YarnConfiguration.FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT,
+ YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT);
+
+ cleanUpRetrySleepTime = conf.getTimeDuration(
+ YarnConfiguration.FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME,
+ YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME,
+ TimeUnit.MILLISECONDS);
+
LOG.info("Initialized federation membership service.");
super.serviceInit(conf);
@@ -378,4 +397,160 @@ public class FederationStateStoreService extends AbstractService
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}
+
+ /**
+ * Create a thread that cleans up the app.
+ * @param stage rm-start/rm-stop.
+ */
+ public void createCleanUpFinishApplicationThread(String stage) {
+ String threadName = cleanUpThreadNamePrefix + "-" + stage;
+ Thread finishApplicationThread = new Thread(createCleanUpFinishApplicationThread());
+ finishApplicationThread.setName(threadName);
+ finishApplicationThread.start();
+ LOG.info("CleanUpFinishApplicationThread has been started {}.", threadName);
+ }
+
+ /**
+ * Create a thread that cleans up the apps.
+ *
+ * @return thread object.
+ */
+ private Runnable createCleanUpFinishApplicationThread() {
+ return () -> {
+ createCleanUpFinishApplication();
+ };
+ }
+
+ /**
+ * cleans up the apps.
+ */
+ private void createCleanUpFinishApplication() {
+ try {
+ // Get the current RM's App list based on subClusterId
+ GetApplicationsHomeSubClusterRequest request =
+ GetApplicationsHomeSubClusterRequest.newInstance(subClusterId);
+ GetApplicationsHomeSubClusterResponse response =
+ getApplicationsHomeSubCluster(request);
+ List<ApplicationHomeSubCluster> applicationHomeSCs = response.getAppsHomeSubClusters();
+
+ // Traverse the app list and clean up the app.
+ long successCleanUpAppCount = 0;
+
+ // Save a local copy of the map so that it won't change with the map
+ Map<ApplicationId, RMApp> rmApps = new HashMap<>(this.rmContext.getRMApps());
+
+ // Need to make sure there is app list in RM memory.
+ if (rmApps != null && !rmApps.isEmpty()) {
+ for (ApplicationHomeSubCluster applicationHomeSC : applicationHomeSCs) {
+ ApplicationId applicationId = applicationHomeSC.getApplicationId();
+ if (!rmApps.containsKey(applicationId)) {
+ try {
+ Boolean cleanUpSuccess = cleanUpFinishApplicationsWithRetries(applicationId, false);
+ if (cleanUpSuccess) {
+ LOG.info("application = {} has been cleaned up successfully.", applicationId);
+ successCleanUpAppCount++;
+ }
+ } catch (Exception e) {
+ LOG.error("problem during application = {} cleanup.", applicationId, e);
+ }
+ }
+ }
+ }
+
+ // print app cleanup log
+ LOG.info("cleanup finished applications size = {}, number = {} successful cleanup.",
+ applicationHomeSCs.size(), successCleanUpAppCount);
+ } catch (Exception e) {
+ LOG.error("problem during cleanup applications.", e);
+ }
+ }
+
+ /**
+ * Clean up the federation completed Application.
+ *
+ * @param appId app id.
+ * @param isQuery true, need to query from statestore, false not query.
+ * @throws Exception exception occurs.
+ * @return true, successfully deleted; false, failed to delete or no need to delete
+ */
+ public boolean cleanUpFinishApplicationsWithRetries(ApplicationId appId, boolean isQuery)
+ throws Exception {
+
+ // Generate a request to delete data
+ DeleteApplicationHomeSubClusterRequest request =
+ DeleteApplicationHomeSubClusterRequest.newInstance(appId);
+
+ // CleanUp Finish App.
+ return ((FederationActionRetry<Boolean>) () -> invokeCleanUpFinishApp(appId, isQuery, request))
+ .runWithRetries(cleanUpRetryCountNum, cleanUpRetrySleepTime);
+ }
+
+ /**
+ * CleanUp Finish App.
+ *
+ * @param applicationId app id.
+ * @param isQuery true, need to query from statestore, false not query.
+ * @param delRequest delete Application Request
+ * @return true, successfully deleted; false, failed to delete or no need to delete
+ * @throws YarnException
+ */
+ private boolean invokeCleanUpFinishApp(ApplicationId applicationId, boolean isQuery,
+ DeleteApplicationHomeSubClusterRequest delRequest) throws YarnException {
+ boolean isAppNeedClean = true;
+ // If we need to query the StateStore
+ if (isQuery) {
+ isAppNeedClean = isApplicationNeedClean(applicationId);
+ }
+ // When the App needs to be cleaned up, clean up the App.
+ if (isAppNeedClean) {
+ DeleteApplicationHomeSubClusterResponse response =
+ deleteApplicationHomeSubCluster(delRequest);
+ if (response != null) {
+ LOG.info("The applicationId = {} has been successfully cleaned up.", applicationId);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Used to determine whether the Application is cleaned up.
+ *
+ * When the app in the RM is completed,
+ * the HomeSC corresponding to the app will be queried in the StateStore.
+ * If the current RM is the HomeSC, the completed app will be cleaned up.
+ *
+ * @param applicationId applicationId
+ * @return true, app needs to be cleaned up;
+ * false, app doesn't need to be cleaned up.
+ */
+ private boolean isApplicationNeedClean(ApplicationId applicationId) {
+ GetApplicationHomeSubClusterRequest queryRequest =
+ GetApplicationHomeSubClusterRequest.newInstance(applicationId);
+ // Here we need to use try...catch,
+ // because getApplicationHomeSubCluster may throw not exist exception
+ try {
+ GetApplicationHomeSubClusterResponse queryResp =
+ getApplicationHomeSubCluster(queryRequest);
+ if (queryResp != null) {
+ ApplicationHomeSubCluster appHomeSC = queryResp.getApplicationHomeSubCluster();
+ SubClusterId homeSubClusterId = appHomeSC.getHomeSubCluster();
+ if (!subClusterId.equals(homeSubClusterId)) {
+ LOG.warn("The homeSubCluster of applicationId = {} belong subCluster = {}, " +
+ " not belong subCluster = {} and is not allowed to delete.",
+ applicationId, homeSubClusterId, subClusterId);
+ return false;
+ }
+ } else {
+ LOG.warn("The applicationId = {} not belong subCluster = {} " +
+ " and is not allowed to delete.", applicationId, subClusterId);
+ return false;
+ }
+ } catch (Exception e) {
+ LOG.warn("query applicationId = {} error.", applicationId, e);
+ return false;
+ }
+ return true;
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java
index e8ebdd5bedd..b8e2ce6ef32 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java
@@ -20,22 +20,46 @@ package org.apache.hadoop.yarn.server.resourcemanager.federation;
import java.io.IOException;
import java.io.StringReader;
import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import javax.xml.bind.JAXBException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.MockApps;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.junit.After;
import org.junit.Assert;
@@ -46,6 +70,8 @@ import com.sun.jersey.api.json.JSONConfiguration;
import com.sun.jersey.api.json.JSONJAXBContext;
import com.sun.jersey.api.json.JSONUnmarshaller;
+import static org.mockito.Mockito.mock;
+
/**
* Unit tests for FederationStateStoreService.
*/
@@ -207,4 +233,253 @@ public class TestFederationRMStateStoreService {
"Started federation membership heartbeat with interval: 300 and initial delay: 10"));
rm.stop();
}
+
+ @Test
+ public void testCleanUpApplication() throws Exception {
+
+ // set yarn configuration
+ conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+ conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10);
+ conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
+
+ // set up MockRM
+ final MockRM rm = new MockRM(conf);
+ rm.init(conf);
+ stateStore = rm.getFederationStateStoreService().getStateStoreClient();
+ rm.start();
+
+ // init subCluster Heartbeat,
+ // and check that the subCluster is in a running state
+ FederationStateStoreService stateStoreService =
+ rm.getFederationStateStoreService();
+ FederationStateStoreHeartbeat storeHeartbeat =
+ stateStoreService.getStateStoreHeartbeatThread();
+ storeHeartbeat.run();
+ checkSubClusterInfo(SubClusterState.SC_RUNNING);
+
+ // generate an application and join the [SC-1] cluster
+ ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+ addApplication2StateStore(appId, stateStore);
+
+ // make sure the app can be queried in the stateStore
+ GetApplicationHomeSubClusterRequest appRequest =
+ GetApplicationHomeSubClusterRequest.newInstance(appId);
+ GetApplicationHomeSubClusterResponse response =
+ stateStore.getApplicationHomeSubCluster(appRequest);
+ Assert.assertNotNull(response);
+ ApplicationHomeSubCluster appHomeSubCluster = response.getApplicationHomeSubCluster();
+ Assert.assertNotNull(appHomeSubCluster);
+ Assert.assertNotNull(appHomeSubCluster.getApplicationId());
+ Assert.assertEquals(appId, appHomeSubCluster.getApplicationId());
+
+ // clean up the app.
+ boolean cleanUpResult =
+ stateStoreService.cleanUpFinishApplicationsWithRetries(appId, true);
+ Assert.assertTrue(cleanUpResult);
+
+ // after clean, the app can no longer be queried from the stateStore.
+ LambdaTestUtils.intercept(FederationStateStoreException.class,
+ "Application " + appId + " does not exist",
+ () -> stateStore.getApplicationHomeSubCluster(appRequest));
+
+ }
+
+ @Test
+ public void testCleanUpApplicationWhenRMStart() throws Exception {
+
+ // We design such a test case.
+ // Step1. We add app01, app02, app03 to the stateStore,
+ // But these apps are not in RM's RMContext, they are finished apps
+ // Step2. We simulate RM startup, there is only app04 in RMContext.
+ // Step3. We wait for 5 seconds, the automatic cleanup thread should clean up finished apps.
+
+ // set yarn configuration.
+ conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+ conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10);
+ conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
+ conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+
+ // set up MockRM.
+ MockRM rm = new MockRM(conf);
+ rm.init(conf);
+ stateStore = rm.getFederationStateStoreService().getStateStoreClient();
+
+ // generate an [app01] and join the [SC-1] cluster.
+ List<ApplicationId> appIds = new ArrayList<>();
+ ApplicationId appId01 = ApplicationId.newInstance(Time.now(), 1);
+ addApplication2StateStore(appId01, stateStore);
+ appIds.add(appId01);
+
+ // generate an [app02] and join the [SC-1] cluster.
+ ApplicationId appId02 = ApplicationId.newInstance(Time.now(), 2);
+ addApplication2StateStore(appId02, stateStore);
+ appIds.add(appId02);
+
+ // generate an [app03] and join the [SC-1] cluster.
+ ApplicationId appId03 = ApplicationId.newInstance(Time.now(), 3);
+ addApplication2StateStore(appId03, stateStore);
+ appIds.add(appId03);
+
+ // make sure the apps can be queried in the stateStore.
+ GetApplicationsHomeSubClusterRequest allRequest =
+ GetApplicationsHomeSubClusterRequest.newInstance(subClusterId);
+ GetApplicationsHomeSubClusterResponse allResponse =
+ stateStore.getApplicationsHomeSubCluster(allRequest);
+ Assert.assertNotNull(allResponse);
+ List<ApplicationHomeSubCluster> appHomeSCLists = allResponse.getAppsHomeSubClusters();
+ Assert.assertNotNull(appHomeSCLists);
+ Assert.assertEquals(3, appHomeSCLists.size());
+
+ // app04 exists in both RM memory and stateStore.
+ ApplicationId appId04 = ApplicationId.newInstance(Time.now(), 4);
+ addApplication2StateStore(appId04, stateStore);
+ addApplication2RMAppManager(rm, appId04);
+
+ // start rm.
+ rm.start();
+
+ // wait 5s, wait for the thread to finish cleaning up.
+ GenericTestUtils.waitFor(() -> {
+ int appsSize = 0;
+ try {
+ List<ApplicationHomeSubCluster> subClusters =
+ getApplicationsFromStateStore();
+ Assert.assertNotNull(subClusters);
+ appsSize = subClusters.size();
+ } catch (YarnException e) {
+ e.printStackTrace();
+ }
+ return (appsSize == 1);
+ }, 100, 1000 * 5);
+
+ // check the app to make sure the apps(app01,app02,app03) doesn't exist.
+ for (ApplicationId appId : appIds) {
+ GetApplicationHomeSubClusterRequest appRequest =
+ GetApplicationHomeSubClusterRequest.newInstance(appId);
+ LambdaTestUtils.intercept(FederationStateStoreException.class,
+ "Application " + appId + " does not exist",
+ () -> stateStore.getApplicationHomeSubCluster(appRequest));
+ }
+
+ if (rm != null) {
+ rm.stop();
+ rm = null;
+ }
+ }
+
+ @Test
+ public void testCleanUpApplicationWhenRMCompleteOneApp() throws Exception {
+
+ // We design such a test case.
+ // Step1. We start RM,Set the RM memory to keep a maximum of 1 completed app.
+ // Step2. Register app[01-03] to RM memory & stateStore.
+ // Step3. We clean up app01, app02, app03, at this time,
+ // app01, app02 should be cleaned up from statestore, app03 should remain in statestore.
+
+ // set yarn configuration.
+ conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+ conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10);
+ conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
+ conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+ conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 1);
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+
+ // set up MockRM.
+ MockRM rm = new MockRM(conf);
+ rm.init(conf);
+ stateStore = rm.getFederationStateStoreService().getStateStoreClient();
+ rm.start();
+
+ // generate an [app01] and join the [SC-1] cluster.
+ List<ApplicationId> appIds = new ArrayList<>();
+ ApplicationId appId01 = ApplicationId.newInstance(Time.now(), 1);
+ addApplication2StateStore(appId01, stateStore);
+ addApplication2RMAppManager(rm, appId01);
+ appIds.add(appId01);
+
+ // generate an [app02] and join the [SC-1] cluster.
+ ApplicationId appId02 = ApplicationId.newInstance(Time.now(), 2);
+ addApplication2StateStore(appId02, stateStore);
+ addApplication2RMAppManager(rm, appId02);
+ appIds.add(appId02);
+
+ // generate an [app03] and join the [SC-1] cluster.
+ ApplicationId appId03 = ApplicationId.newInstance(Time.now(), 3);
+ addApplication2StateStore(appId03, stateStore);
+ addApplication2RMAppManager(rm, appId03);
+
+ // rmAppManager
+ RMAppManager rmAppManager = rm.getRMAppManager();
+ rmAppManager.finishApplication4Test(appId01);
+ rmAppManager.finishApplication4Test(appId02);
+ rmAppManager.finishApplication4Test(appId03);
+ rmAppManager.checkAppNumCompletedLimit4Test();
+
+ // app01, app02 should be cleaned from statestore
+ // After the query, it should report the error not exist.
+ for (ApplicationId appId : appIds) {
+ GetApplicationHomeSubClusterRequest appRequest =
+ GetApplicationHomeSubClusterRequest.newInstance(appId);
+ LambdaTestUtils.intercept(FederationStateStoreException.class,
+ "Application " + appId + " does not exist",
+ () -> stateStore.getApplicationHomeSubCluster(appRequest));
+ }
+
+ // app03 should remain in statestore
+ List<ApplicationHomeSubCluster> appHomeScList = getApplicationsFromStateStore();
+ Assert.assertNotNull(appHomeScList);
+ Assert.assertEquals(1, appHomeScList.size());
+ ApplicationHomeSubCluster homeSubCluster = appHomeScList.get(0);
+ Assert.assertNotNull(homeSubCluster);
+ Assert.assertEquals(appId03, homeSubCluster.getApplicationId());
+ }
+
+ private void addApplication2StateStore(ApplicationId appId,
+ FederationStateStore fedStateStore) throws YarnException {
+ ApplicationHomeSubCluster appHomeSC = ApplicationHomeSubCluster.newInstance(
+ appId, subClusterId);
+ AddApplicationHomeSubClusterRequest addHomeSCRequest =
+ AddApplicationHomeSubClusterRequest.newInstance(appHomeSC);
+ fedStateStore.addApplicationHomeSubCluster(addHomeSCRequest);
+ }
+
+ private List<ApplicationHomeSubCluster> getApplicationsFromStateStore() throws YarnException {
+ // make sure the apps can be queried in the stateStore
+ GetApplicationsHomeSubClusterRequest allRequest =
+ GetApplicationsHomeSubClusterRequest.newInstance(subClusterId);
+ GetApplicationsHomeSubClusterResponse allResponse =
+ stateStore.getApplicationsHomeSubCluster(allRequest);
+ Assert.assertNotNull(allResponse);
+ List<ApplicationHomeSubCluster> appHomeSCLists = allResponse.getAppsHomeSubClusters();
+ Assert.assertNotNull(appHomeSCLists);
+ return appHomeSCLists;
+ }
+
+ private void addApplication2RMAppManager(MockRM rm, ApplicationId appId) {
+ RMContext rmContext = rm.getRMContext();
+ Map<ApplicationId, RMApp> rmAppMaps = rmContext.getRMApps();
+ String user = MockApps.newUserName();
+ String name = MockApps.newAppName();
+ String queue = MockApps.newQueue();
+
+ YarnScheduler scheduler = mock(YarnScheduler.class);
+
+ ApplicationMasterService masterService =
+ new ApplicationMasterService(rmContext, scheduler);
+
+ ApplicationSubmissionContext submissionContext =
+ new ApplicationSubmissionContextPBImpl();
+
+ // applicationId will not be used because RMStateStore is mocked,
+ // but applicationId is still set for safety
+ submissionContext.setApplicationId(appId);
+ submissionContext.setPriority(Priority.newInstance(0));
+
+ RMApp application = new RMAppImpl(appId, rmContext, conf, name,
+ user, queue, submissionContext, scheduler, masterService,
+ System.currentTimeMillis(), "YARN", null,
+ new ArrayList<>());
+
+ rmAppMaps.putIfAbsent(application.getApplicationId(), application);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org