You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bo...@apache.org on 2018/09/22 00:54:09 UTC
hadoop git commit: YARN-7599. [GPG] ApplicationCleaner in Global
Policy Generator. Contributed by Botong Huang.
Repository: hadoop
Updated Branches:
refs/heads/YARN-7402 ef4d71c0b -> 3671dc3ef
YARN-7599. [GPG] ApplicationCleaner in Global Policy Generator. Contributed by Botong Huang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3671dc3e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3671dc3e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3671dc3e
Branch: refs/heads/YARN-7402
Commit: 3671dc3eff8b4de8ba33922204aa00af98ea20ba
Parents: ef4d71c
Author: Botong Huang <bo...@apache.org>
Authored: Fri Sep 21 17:30:44 2018 -0700
Committer: Botong Huang <bo...@apache.org>
Committed: Fri Sep 21 17:30:44 2018 -0700
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 25 +++
.../src/main/resources/yarn-default.xml | 28 ++++
.../store/impl/MemoryFederationStateStore.java | 2 -
.../pb/ApplicationHomeSubClusterPBImpl.java | 3 +
.../utils/FederationStateStoreFacade.java | 33 ++++
.../server/globalpolicygenerator/GPGUtils.java | 21 ++-
.../GlobalPolicyGenerator.java | 23 ++-
.../applicationcleaner/ApplicationCleaner.java | 154 +++++++++++++++++++
.../DefaultApplicationCleaner.java | 82 ++++++++++
.../applicationcleaner/package-info.java | 19 +++
.../TestDefaultApplicationCleaner.java | 130 ++++++++++++++++
11 files changed, 513 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3671dc3e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 54e29a0..1464892 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3383,6 +3383,31 @@ public class YarnConfiguration extends Configuration {
FEDERATION_GPG_PREFIX + "subcluster.heartbeat.expiration-ms";
public static final long DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS = 1800000;
+ // The application cleaner class to use
+ public static final String GPG_APPCLEANER_CLASS =
+ FEDERATION_GPG_PREFIX + "application.cleaner.class";
+ public static final String DEFAULT_GPG_APPCLEANER_CLASS =
+ "org.apache.hadoop.yarn.server.globalpolicygenerator"
+ + ".applicationcleaner.DefaultApplicationCleaner";
+
+ // The interval at which the application cleaner runs, -1 means disabled
+ public static final String GPG_APPCLEANER_INTERVAL_MS =
+ FEDERATION_GPG_PREFIX + "application.cleaner.interval-ms";
+ public static final long DEFAULT_GPG_APPCLEANER_INTERVAL_MS = -1;
+
+ /**
+ * Specifications on how (many times) to contact Router for apps. We need to
+ * do this because Router might return partial application list because some
+ * sub-cluster RM is not responsive (e.g. failing over).
+ *
+ * Should have three values separated by comma: minimal success retries,
+ * maximum total retry, retry interval (ms).
+ */
+ public static final String GPG_APPCLEANER_CONTACT_ROUTER_SPEC =
+ FEDERATION_GPG_PREFIX + "application.cleaner.contact.router.spec";
+ public static final String DEFAULT_GPG_APPCLEANER_CONTACT_ROUTER_SPEC =
+ "3,10,600000";
+
public static final String FEDERATION_GPG_POLICY_PREFIX =
FEDERATION_GPG_PREFIX + "policy.generator.";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3671dc3e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 9e71cc6..39871df 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3635,6 +3635,34 @@
<property>
<description>
+ The Application Cleaner implementation class for GPG to use.
+ </description>
+ <name>yarn.federation.gpg.application.cleaner.class</name>
+ <value>org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.DefaultApplicationCleaner</value>
+ </property>
+
+ <property>
+ <description>
+ The interval at which the application cleaner runs, -1 means disabled.
+ </description>
+ <name>yarn.federation.gpg.application.cleaner.interval-ms</name>
+ <value>-1</value>
+ </property>
+
+ <property>
+ <description>
+ Specifications on how (many times) to contact Router for apps. We need to
+ do this because Router might return partial application list because some
+ sub-cluster RM is not responsive (e.g. failing over).
+ Should have three values separated by comma: minimal success retries,
+ maximum total retry, retry interval (ms).
+ </description>
+ <name>yarn.federation.gpg.application.cleaner.contact.router.spec</name>
+ <value>3,10,600000</value>
+ </property>
+
+ <property>
+ <description>
The interval at which the policy generator runs, default is one hour
</description>
<name>yarn.federation.gpg.policy.generator.interval-ms</name>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3671dc3e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
index b42fc79..f7cdcd5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
@@ -259,8 +259,6 @@ public class MemoryFederationStateStore implements FederationStateStore {
result
.add(ApplicationHomeSubCluster.newInstance(e.getKey(), e.getValue()));
}
-
- GetApplicationsHomeSubClusterResponse.newInstance(result);
return GetApplicationsHomeSubClusterResponse.newInstance(result);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3671dc3e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java
index 7e6a564..6bd80fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java
@@ -110,6 +110,9 @@ public class ApplicationHomeSubClusterPBImpl extends ApplicationHomeSubCluster {
@Override
public ApplicationId getApplicationId() {
ApplicationHomeSubClusterProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.applicationId != null) {
+ return this.applicationId;
+ }
if (!p.hasApplicationId()) {
return null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3671dc3e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index d10e568..df5f50c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -52,8 +52,11 @@ import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateS
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
@@ -433,6 +436,36 @@ public final class FederationStateStoreFacade {
}
/**
+ * Get the {@code ApplicationHomeSubCluster} list representing the mapping of
+ * all submitted applications to it's home sub-cluster.
+ *
+ * @return the mapping of all submitted application to it's home sub-cluster
+ * @throws YarnException if the request is invalid/fails
+ */
+ public List<ApplicationHomeSubCluster> getApplicationsHomeSubCluster()
+ throws YarnException {
+ GetApplicationsHomeSubClusterResponse response =
+ stateStore.getApplicationsHomeSubCluster(
+ GetApplicationsHomeSubClusterRequest.newInstance());
+ return response.getAppsHomeSubClusters();
+ }
+
+ /**
+ * Delete the mapping of home {@code SubClusterId} of a previously submitted
+ * {@code ApplicationId}. Currently response is empty if the operation is
+ * successful, if not an exception reporting reason for a failure.
+ *
+ * @param applicationId the application to delete the home sub-cluster of
+ * @throws YarnException if the request is invalid/fails
+ */
+ public void deleteApplicationHomeSubCluster(ApplicationId applicationId)
+ throws YarnException {
+ stateStore.deleteApplicationHomeSubCluster(
+ DeleteApplicationHomeSubClusterRequest.newInstance(applicationId));
+ return;
+ }
+
+ /**
* Get the singleton instance of SubClusterResolver.
*
* @return SubClusterResolver instance
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3671dc3e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
index 31cee1c..615cf3a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
@@ -49,15 +50,19 @@ public final class GPGUtils {
* Performs an invocation of the the remote RMWebService.
*/
public static <T> T invokeRMWebService(Configuration conf, String webAddr,
- String path, final Class<T> returnType) {
+ String path, final Class<T> returnType, String deSelectParam) {
Client client = Client.create();
T obj = null;
- WebResource webResource = client.resource(webAddr);
+ WebResource webResource =
+ client.resource(webAddr).path("ws/v1/cluster").path(path);
+ if (deSelectParam != null) {
+ webResource = webResource.queryParam(RMWSConsts.DESELECTS, deSelectParam);
+ }
ClientResponse response = null;
try {
- response = webResource.path("ws/v1/cluster").path(path)
- .accept(MediaType.APPLICATION_XML).get(ClientResponse.class);
+ response = webResource.accept(MediaType.APPLICATION_XML)
+ .get(ClientResponse.class);
if (response.getStatus() == SC_OK) {
obj = response.getEntity(returnType);
} else {
@@ -74,6 +79,14 @@ public final class GPGUtils {
}
/**
+ * Performs an invocation of the the remote RMWebService.
+ */
+ public static <T> T invokeRMWebService(Configuration conf, String webAddr,
+ String path, final Class<T> returnType) {
+ return invokeRMWebService(conf, webAddr, path, returnType, null);
+ }
+
+ /**
* Creates a uniform weighting of 1.0 for each sub cluster.
*/
public static Map<SubClusterIdInfo, Float> createUniformWeights(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3671dc3e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
index 1ae07f3..c8ec4cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.ApplicationCleaner;
import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator;
import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
import org.slf4j.Logger;
@@ -63,6 +64,7 @@ public class GlobalPolicyGenerator extends CompositeService {
// Scheduler service that runs tasks periodically
private ScheduledThreadPoolExecutor scheduledExecutorService;
private SubClusterCleaner subClusterCleaner;
+ private ApplicationCleaner applicationCleaner;
private PolicyGenerator policyGenerator;
public GlobalPolicyGenerator() {
@@ -82,7 +84,15 @@ public class GlobalPolicyGenerator extends CompositeService {
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(
conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS));
+
this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext);
+
+ this.applicationCleaner = FederationStateStoreFacade.createInstance(conf,
+ YarnConfiguration.GPG_APPCLEANER_CLASS,
+ YarnConfiguration.DEFAULT_GPG_APPCLEANER_CLASS,
+ ApplicationCleaner.class);
+ this.applicationCleaner.init(conf, this.gpgContext);
+
this.policyGenerator = new PolicyGenerator(conf, this.gpgContext);
DefaultMetricsSystem.initialize(METRICS_NAME);
@@ -95,7 +105,7 @@ public class GlobalPolicyGenerator extends CompositeService {
protected void serviceStart() throws Exception {
super.serviceStart();
- // Scheduler SubClusterCleaner service
+ // Schedule SubClusterCleaner service
long scCleanerIntervalMs = getConfig().getLong(
YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS,
YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS);
@@ -106,6 +116,17 @@ public class GlobalPolicyGenerator extends CompositeService {
DurationFormatUtils.formatDurationISO(scCleanerIntervalMs));
}
+ // Schedule ApplicationCleaner service
+ long appCleanerIntervalMs =
+ getConfig().getLong(YarnConfiguration.GPG_APPCLEANER_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_GPG_APPCLEANER_INTERVAL_MS);
+ if (appCleanerIntervalMs > 0) {
+ this.scheduledExecutorService.scheduleAtFixedRate(this.applicationCleaner,
+ 0, appCleanerIntervalMs, TimeUnit.MILLISECONDS);
+ LOG.info("Scheduled application cleaner with interval: {}",
+ DurationFormatUtils.formatDurationISO(appCleanerIntervalMs));
+ }
+
// Schedule PolicyGenerator
long policyGeneratorIntervalMillis = getConfig().getLong(
YarnConfiguration.GPG_POLICY_GENERATOR_INTERVAL_MS,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3671dc3e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
new file mode 100644
index 0000000..85047ef
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.lang3.time.DurationFormatUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.DeSelectFields;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The ApplicationCleaner is a runnable that cleans up old applications from
+ * table applicationsHomeSubCluster in FederationStateStore.
+ */
+public abstract class ApplicationCleaner implements Runnable {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ApplicationCleaner.class);
+
+ private Configuration conf;
+ private GPGContext gpgContext;
+
+ private int minRouterSuccessCount;
+ private int maxRouterRetry;
+ private long routerQueryIntevalMillis;
+
+ public void init(Configuration config, GPGContext context)
+ throws YarnException {
+
+ this.gpgContext = context;
+ this.conf = config;
+
+ String routerSpecString =
+ this.conf.get(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC,
+ YarnConfiguration.DEFAULT_GPG_APPCLEANER_CONTACT_ROUTER_SPEC);
+ String[] specs = routerSpecString.split(",");
+ if (specs.length != 3) {
+ throw new YarnException("Expect three comma separated values in "
+ + YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC + " but get "
+ + routerSpecString);
+ }
+ this.minRouterSuccessCount = Integer.parseInt(specs[0]);
+ this.maxRouterRetry = Integer.parseInt(specs[1]);
+ this.routerQueryIntevalMillis = Long.parseLong(specs[2]);
+
+ if (this.minRouterSuccessCount > this.maxRouterRetry) {
+ throw new YarnException("minRouterSuccessCount "
+ + this.minRouterSuccessCount
+ + " should not be larger than maxRouterRetry" + this.maxRouterRetry);
+ }
+ if (this.minRouterSuccessCount <= 0) {
+ throw new YarnException("minRouterSuccessCount "
+ + this.minRouterSuccessCount + " should be positive");
+ }
+
+ LOG.info(
+ "Initialized AppCleaner with Router query with min success {}, "
+ + "max retry {}, retry interval {}",
+ this.minRouterSuccessCount, this.maxRouterRetry,
+ DurationFormatUtils.formatDurationISO(this.routerQueryIntevalMillis));
+ }
+
+ public GPGContext getGPGContext() {
+ return this.gpgContext;
+ }
+
+ /**
+ * Query router for applications.
+ *
+ * @return the set of applications
+ * @throws YarnRuntimeException when router call fails
+ */
+ public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException {
+ String webAppAddress = WebAppUtils.getRouterWebAppURLWithScheme(conf);
+
+ LOG.info(String.format("Contacting router at: %s", webAppAddress));
+ AppsInfo appsInfo = (AppsInfo) GPGUtils.invokeRMWebService(conf,
+ webAppAddress, "apps", AppsInfo.class,
+ DeSelectFields.DeSelectType.RESOURCE_REQUESTS.toString());
+
+ Set<ApplicationId> appSet = new HashSet<ApplicationId>();
+ for (AppInfo appInfo : appsInfo.getApps()) {
+ appSet.add(ApplicationId.fromString(appInfo.getAppId()));
+ }
+ return appSet;
+ }
+
+ /**
+ * Get the list of known applications in the cluster from Router.
+ *
+ * @return the list of known applications
+ * @throws YarnException if get app fails
+ */
+ public Set<ApplicationId> getRouterKnownApplications() throws YarnException {
+ int successCount = 0, totalAttemptCount = 0;
+ Set<ApplicationId> resultSet = new HashSet<ApplicationId>();
+ while (totalAttemptCount < this.maxRouterRetry) {
+ try {
+ Set<ApplicationId> routerApps = getAppsFromRouter();
+ resultSet.addAll(routerApps);
+ LOG.info("Attempt {}: {} known apps from Router, {} in total",
+ totalAttemptCount, routerApps.size(), resultSet.size());
+
+ successCount++;
+ if (successCount >= this.minRouterSuccessCount) {
+ return resultSet;
+ }
+
+ // Wait for the next attempt
+ try {
+ Thread.sleep(this.routerQueryIntevalMillis);
+ } catch (InterruptedException e) {
+ LOG.warn("Sleep interrupted after attempt " + totalAttemptCount);
+ }
+ } catch (Exception e) {
+ LOG.warn("Router query attempt " + totalAttemptCount + " failed ", e);
+ } finally {
+ totalAttemptCount++;
+ }
+ }
+ throw new YarnException("Only " + successCount
+ + " success Router queries after " + totalAttemptCount + " retries");
+ }
+
+ @Override
+ public abstract void run();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3671dc3e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
new file mode 100644
index 0000000..1ce9840
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
+
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+/**
+ * The default ApplicationCleaner that cleans up old applications from table
+ * applicationsHomeSubCluster in FederationStateStore.
+ */
+public class DefaultApplicationCleaner extends ApplicationCleaner {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DefaultApplicationCleaner.class);
+
+ @Override
+ public void run() {
+ Date now = new Date();
+ LOG.info("Application cleaner run at time {}", now);
+
+ FederationStateStoreFacade facade = getGPGContext().getStateStoreFacade();
+ Set<ApplicationId> candidates = new HashSet<ApplicationId>();
+ try {
+ List<ApplicationHomeSubCluster> response =
+ facade.getApplicationsHomeSubCluster();
+ for (ApplicationHomeSubCluster app : response) {
+ candidates.add(app.getApplicationId());
+ }
+ LOG.info("{} app entries in FederationStateStore", candidates.size());
+
+ Set<ApplicationId> routerApps = getRouterKnownApplications();
+ LOG.info("{} known applications from Router", routerApps.size());
+
+ candidates = Sets.difference(candidates, routerApps);
+ LOG.info("Deleting {} applications from statestore", candidates.size());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Apps to delete: ", candidates.stream().map(Object::toString)
+ .collect(Collectors.joining(",")));
+ }
+ for (ApplicationId appId : candidates) {
+ try {
+ facade.deleteApplicationHomeSubCluster(appId);
+ } catch (Exception e) {
+ LOG.error(
+ "deleteApplicationHomeSubCluster failed at application " + appId,
+ e);
+ }
+ }
+
+ } catch (Throwable e) {
+ LOG.error("Application cleaner started at time " + now + " fails: ", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3671dc3e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java
new file mode 100644
index 0000000..dd302c8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3671dc3e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
new file mode 100644
index 0000000..ec3f64e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for DefaultApplicationCleaner in GPG.
+ */
+public class TestDefaultApplicationCleaner {
+ private Configuration conf;
+ private MemoryFederationStateStore stateStore;
+ private FederationStateStoreFacade facade;
+ private ApplicationCleaner appCleaner;
+ private GPGContext gpgContext;
+
+ private List<ApplicationId> appIds;
+ // The list of applications returned by mocked router
+ private Set<ApplicationId> routerAppIds;
+
+ @Before
+ public void setup() throws Exception {
+ conf = new YarnConfiguration();
+
+ // No Router query retry
+ conf.set(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC, "1,1,0");
+
+ stateStore = new MemoryFederationStateStore();
+ stateStore.init(conf);
+
+ facade = FederationStateStoreFacade.getInstance();
+ facade.reinitialize(stateStore, conf);
+
+ gpgContext = new GPGContextImpl();
+ gpgContext.setStateStoreFacade(facade);
+
+ appCleaner = new TestableDefaultApplicationCleaner();
+ appCleaner.init(conf, gpgContext);
+
+ routerAppIds = new HashSet<ApplicationId>();
+
+ appIds = new ArrayList<ApplicationId>();
+ for (int i = 0; i < 3; i++) {
+ ApplicationId appId = ApplicationId.newInstance(0, i);
+ appIds.add(appId);
+
+ SubClusterId subClusterId =
+ SubClusterId.newInstance("SUBCLUSTER-" + Integer.toString(i));
+
+ stateStore.addApplicationHomeSubCluster(
+ AddApplicationHomeSubClusterRequest.newInstance(
+ ApplicationHomeSubCluster.newInstance(appId, subClusterId)));
+ }
+ }
+
+ @After
+ public void breakDown() throws Exception {
+ if (stateStore != null) {
+ stateStore.close();
+ }
+ }
+
+ @Test
+ public void testFederationStateStoreAppsCleanUp() throws YarnException {
+ // Set first app to be still known by Router
+ ApplicationId appId = appIds.get(0);
+ routerAppIds.add(appId);
+
+ // Another random app not in stateStore known by Router
+ appId = ApplicationId.newInstance(100, 200);
+ routerAppIds.add(appId);
+
+ appCleaner.run();
+
+ // Only one app should be left
+ Assert.assertEquals(1,
+ stateStore
+ .getApplicationsHomeSubCluster(
+ GetApplicationsHomeSubClusterRequest.newInstance())
+ .getAppsHomeSubClusters().size());
+ }
+
+ /**
+ * Testable version of DefaultApplicationCleaner.
+ */
+ public class TestableDefaultApplicationCleaner
+ extends DefaultApplicationCleaner {
+ @Override
+ public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException {
+ return routerAppIds;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org