You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bo...@apache.org on 2018/08/02 17:18:11 UTC
[48/50] [abbrv] hadoop git commit: YARN-6648. [GPG] Add
SubClusterCleaner in Global Policy Generator. (botong)
YARN-6648. [GPG] Add SubClusterCleaner in Global Policy Generator. (botong)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f833e1b3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f833e1b3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f833e1b3
Branch: refs/heads/YARN-7402
Commit: f833e1b300758e7c7622e2ca93c2dd164ec6d73d
Parents: 48a8379
Author: Botong Huang <bo...@apache.org>
Authored: Thu Feb 1 14:43:48 2018 -0800
Committer: Botong Huang <bo...@apache.org>
Committed: Thu Aug 2 09:59:48 2018 -0700
----------------------------------------------------------------------
.../dev-support/findbugs-exclude.xml | 5 +
.../hadoop/yarn/conf/YarnConfiguration.java | 18 +++
.../src/main/resources/yarn-default.xml | 24 ++++
.../store/impl/MemoryFederationStateStore.java | 13 ++
.../utils/FederationStateStoreFacade.java | 41 ++++++-
.../GlobalPolicyGenerator.java | 92 ++++++++++-----
.../subclustercleaner/SubClusterCleaner.java | 109 +++++++++++++++++
.../subclustercleaner/package-info.java | 19 +++
.../TestSubClusterCleaner.java | 118 +++++++++++++++++++
9 files changed, 409 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f833e1b3/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 216c3bd..9fcafad 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -387,6 +387,11 @@
<Method name="initAndStartNodeManager" />
<Bug pattern="DM_EXIT" />
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.yarn.server.globalpolicygenerator.GlobalPolicyGenerator" />
+ <Medhod name="startGPG" />
+ <Bug pattern="DM_EXIT" />
+ </Match>
<!-- Ignore heartbeat exception when killing localizer -->
<Match>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f833e1b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index bbf877f..ec88411 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3342,6 +3342,24 @@ public class YarnConfiguration extends Configuration {
public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED =
false;
+ private static final String FEDERATION_GPG_PREFIX =
+ FEDERATION_PREFIX + "gpg.";
+
+ // The number of threads to use for the GPG scheduled executor service
+ public static final String GPG_SCHEDULED_EXECUTOR_THREADS =
+ FEDERATION_GPG_PREFIX + "scheduled.executor.threads";
+ public static final int DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS = 10;
+
+ // The interval at which the subcluster cleaner runs, -1 means disabled
+ public static final String GPG_SUBCLUSTER_CLEANER_INTERVAL_MS =
+ FEDERATION_GPG_PREFIX + "subcluster.cleaner.interval-ms";
+ public static final long DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS = -1;
+
+ // The expiration time for a subcluster heartbeat, default is 30 minutes
+ public static final String GPG_SUBCLUSTER_EXPIRATION_MS =
+ FEDERATION_GPG_PREFIX + "subcluster.heartbeat.expiration-ms";
+ public static final long DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS = 1800000;
+
////////////////////////////////
// Other Configs
////////////////////////////////
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f833e1b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 2cc842f..66493f3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3533,6 +3533,30 @@
<property>
<description>
+ The number of threads to use for the GPG scheduled executor service.
+ </description>
+ <name>yarn.federation.gpg.scheduled.executor.threads</name>
+ <value>10</value>
+ </property>
+
+ <property>
+ <description>
+ The interval at which the subcluster cleaner runs, -1 means disabled.
+ </description>
+ <name>yarn.federation.gpg.subcluster.cleaner.interval-ms</name>
+ <value>-1</value>
+ </property>
+
+ <property>
+ <description>
+ The expiration time for a subcluster heartbeat, default is 30 minutes.
+ </description>
+ <name>yarn.federation.gpg.subcluster.heartbeat.expiration-ms</name>
+ <value>1800000</value>
+ </property>
+
+ <property>
+ <description>
It is TimelineClient 1.5 configuration whether to store active
application’s timeline data with in user directory i.e
${yarn.timeline-service.entity-group-fs-store.active-dir}/${user.name}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f833e1b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
index 7c06256..b42fc79 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
@@ -68,6 +68,8 @@ import org.apache.hadoop.yarn.util.MonotonicClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* In-memory implementation of {@link FederationStateStore}.
*/
@@ -158,6 +160,17 @@ public class MemoryFederationStateStore implements FederationStateStore {
return SubClusterHeartbeatResponse.newInstance();
}
+ @VisibleForTesting
+ public void setSubClusterLastHeartbeat(SubClusterId subClusterId,
+ long lastHeartbeat) throws YarnException {
+ SubClusterInfo subClusterInfo = membership.get(subClusterId);
+ if (subClusterInfo == null) {
+ throw new YarnException(
+ "Subcluster " + subClusterId.toString() + " does not exist");
+ }
+ subClusterInfo.setLastHeartBeat(lastHeartbeat);
+ }
+
@Override
public GetSubClusterInfoResponse getSubCluster(
GetSubClusterInfoRequest request) throws YarnException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f833e1b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index 1bcb0f4..4c3bed0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -62,9 +62,11 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolic
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -221,6 +223,22 @@ public final class FederationStateStoreFacade {
}
/**
+ * Deregister a <em>subcluster</em> identified by {@code SubClusterId} to
+ * change state in federation. This can be done to mark the sub cluster lost,
+ * deregistered, or decommissioned.
+ *
+ * @param subClusterId the target subclusterId
+ * @param subClusterState the state to update it to
+ * @throws YarnException if the request is invalid/fails
+ */
+ public void deregisterSubCluster(SubClusterId subClusterId,
+ SubClusterState subClusterState) throws YarnException {
+ stateStore.deregisterSubCluster(
+ SubClusterDeregisterRequest.newInstance(subClusterId, subClusterState));
+ return;
+ }
+
+ /**
* Returns the {@link SubClusterInfo} for the specified {@link SubClusterId}.
*
* @param subClusterId the identifier of the sub-cluster
@@ -255,8 +273,7 @@ public final class FederationStateStoreFacade {
public SubClusterInfo getSubCluster(final SubClusterId subClusterId,
final boolean flushCache) throws YarnException {
if (flushCache && isCachingEnabled()) {
- LOG.info("Flushing subClusters from cache and rehydrating from store,"
- + " most likely on account of RM failover.");
+ LOG.info("Flushing subClusters from cache and rehydrating from store.");
cache.remove(buildGetSubClustersCacheRequest(false));
}
return getSubCluster(subClusterId);
@@ -287,6 +304,26 @@ public final class FederationStateStoreFacade {
}
/**
+ * Updates the cache with the central {@link FederationStateStore} and returns
+ * the {@link SubClusterInfo} of all active sub cluster(s).
+ *
+ * @param filterInactiveSubClusters whether to filter out inactive
+ * sub-clusters
+ * @param flushCache flag to indicate if the cache should be flushed or not
+ * @return the sub cluster information
+ * @throws YarnException if the call to the state store is unsuccessful
+ */
+ public Map<SubClusterId, SubClusterInfo> getSubClusters(
+ final boolean filterInactiveSubClusters, final boolean flushCache)
+ throws YarnException {
+ if (flushCache && isCachingEnabled()) {
+ LOG.info("Flushing subClusters from cache and rehydrating from store.");
+ cache.remove(buildGetSubClustersCacheRequest(filterInactiveSubClusters));
+ }
+ return getSubClusters(filterInactiveSubClusters);
+ }
+
+ /**
* Returns the {@link SubClusterPolicyConfiguration} for the specified queue.
*
* @param queue the queue whose policy is required
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f833e1b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
index c1f7460..f6cfba0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
@@ -18,8 +18,11 @@
package org.apache.hadoop.yarn.server.globalpolicygenerator;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.lang.time.DurationFormatUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.service.CompositeService;
@@ -28,6 +31,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,36 +59,26 @@ public class GlobalPolicyGenerator extends CompositeService {
// Federation Variables
private GPGContext gpgContext;
+ // Scheduler service that runs tasks periodically
+ private ScheduledThreadPoolExecutor scheduledExecutorService;
+ private SubClusterCleaner subClusterCleaner;
+
public GlobalPolicyGenerator() {
super(GlobalPolicyGenerator.class.getName());
this.gpgContext = new GPGContextImpl();
}
- protected void initAndStart(Configuration conf, boolean hasToReboot) {
- try {
- // Remove the old hook if we are rebooting.
- if (hasToReboot && null != gpgShutdownHook) {
- ShutdownHookManager.get().removeShutdownHook(gpgShutdownHook);
- }
-
- gpgShutdownHook = new CompositeServiceShutdownHook(this);
- ShutdownHookManager.get().addShutdownHook(gpgShutdownHook,
- SHUTDOWN_HOOK_PRIORITY);
-
- this.init(conf);
- this.start();
- } catch (Throwable t) {
- LOG.error("Error starting globalpolicygenerator", t);
- System.exit(-1);
- }
- }
-
@Override
protected void serviceInit(Configuration conf) throws Exception {
// Set up the context
this.gpgContext
.setStateStoreFacade(FederationStateStoreFacade.getInstance());
+ this.scheduledExecutorService = new ScheduledThreadPoolExecutor(
+ conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
+ YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS));
+ this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext);
+
DefaultMetricsSystem.initialize(METRICS_NAME);
// super.serviceInit after all services are added
@@ -94,10 +88,32 @@ public class GlobalPolicyGenerator extends CompositeService {
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
+
+ // Scheduler SubClusterCleaner service
+ long scCleanerIntervalMs = getConfig().getLong(
+ YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS);
+ if (scCleanerIntervalMs > 0) {
+ this.scheduledExecutorService.scheduleAtFixedRate(this.subClusterCleaner,
+ 0, scCleanerIntervalMs, TimeUnit.MILLISECONDS);
+ LOG.info("Scheduled sub-cluster cleaner with interval: {}",
+ DurationFormatUtils.formatDurationISO(scCleanerIntervalMs));
+ }
}
@Override
protected void serviceStop() throws Exception {
+ try {
+ if (this.scheduledExecutorService != null
+ && !this.scheduledExecutorService.isShutdown()) {
+ this.scheduledExecutorService.shutdown();
+ LOG.info("Stopped ScheduledExecutorService");
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to shutdown ScheduledExecutorService", e);
+ throw e;
+ }
+
if (this.isStopping.getAndSet(true)) {
return;
}
@@ -113,20 +129,40 @@ public class GlobalPolicyGenerator extends CompositeService {
return this.gpgContext;
}
+ private void initAndStart(Configuration conf, boolean hasToReboot) {
+ // Remove the old hook if we are rebooting.
+ if (hasToReboot && null != gpgShutdownHook) {
+ ShutdownHookManager.get().removeShutdownHook(gpgShutdownHook);
+ }
+
+ gpgShutdownHook = new CompositeServiceShutdownHook(this);
+ ShutdownHookManager.get().addShutdownHook(gpgShutdownHook,
+ SHUTDOWN_HOOK_PRIORITY);
+
+ this.init(conf);
+ this.start();
+ }
+
@SuppressWarnings("resource")
public static void startGPG(String[] argv, Configuration conf) {
boolean federationEnabled =
conf.getBoolean(YarnConfiguration.FEDERATION_ENABLED,
YarnConfiguration.DEFAULT_FEDERATION_ENABLED);
- if (federationEnabled) {
- Thread.setDefaultUncaughtExceptionHandler(
- new YarnUncaughtExceptionHandler());
- StringUtils.startupShutdownMessage(GlobalPolicyGenerator.class, argv,
- LOG);
- GlobalPolicyGenerator globalPolicyGenerator = new GlobalPolicyGenerator();
- globalPolicyGenerator.initAndStart(conf, false);
- } else {
- LOG.warn("Federation is not enabled. The gpg cannot start.");
+ try {
+ if (federationEnabled) {
+ Thread.setDefaultUncaughtExceptionHandler(
+ new YarnUncaughtExceptionHandler());
+ StringUtils.startupShutdownMessage(GlobalPolicyGenerator.class, argv,
+ LOG);
+ GlobalPolicyGenerator globalPolicyGenerator =
+ new GlobalPolicyGenerator();
+ globalPolicyGenerator.initAndStart(conf, false);
+ } else {
+ LOG.warn("Federation is not enabled. The gpg cannot start.");
+ }
+ } catch (Throwable t) {
+ LOG.error("Error starting globalpolicygenerator", t);
+ System.exit(-1);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f833e1b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java
new file mode 100644
index 0000000..dad5121
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner;
+
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.commons.lang.time.DurationFormatUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The sub-cluster cleaner is one of the GPG's services that periodically checks
+ * the membership table in FederationStateStore and mark sub-clusters that have
+ * not sent a heartbeat in certain amount of time as LOST.
+ */
+public class SubClusterCleaner implements Runnable {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SubClusterCleaner.class);
+
+ private GPGContext gpgContext;
+ private long heartbeatExpirationMillis;
+
+ /**
+ * The sub-cluster cleaner runnable is invoked by the sub cluster cleaner
+ * service to check the membership table and remove sub clusters that have not
+ * sent a heart beat in some amount of time.
+ */
+ public SubClusterCleaner(Configuration conf, GPGContext gpgContext) {
+ this.heartbeatExpirationMillis =
+ conf.getLong(YarnConfiguration.GPG_SUBCLUSTER_EXPIRATION_MS,
+ YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS);
+ this.gpgContext = gpgContext;
+ LOG.info("Initialized SubClusterCleaner with heartbeat expiration of {}",
+ DurationFormatUtils.formatDurationISO(this.heartbeatExpirationMillis));
+ }
+
+ @Override
+ public void run() {
+ try {
+ Date now = new Date();
+ LOG.info("SubClusterCleaner at {}", now);
+
+ Map<SubClusterId, SubClusterInfo> infoMap =
+ this.gpgContext.getStateStoreFacade().getSubClusters(false, true);
+
+ // Iterate over each sub cluster and check last heartbeat
+ for (Map.Entry<SubClusterId, SubClusterInfo> entry : infoMap.entrySet()) {
+ SubClusterInfo subClusterInfo = entry.getValue();
+
+ Date lastHeartBeat = new Date(subClusterInfo.getLastHeartBeat());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checking subcluster {} in state {}, last heartbeat at {}",
+ subClusterInfo.getSubClusterId(), subClusterInfo.getState(),
+ lastHeartBeat);
+ }
+
+ if (!subClusterInfo.getState().isUnusable()) {
+ long timeUntilDeregister = this.heartbeatExpirationMillis
+ - (now.getTime() - lastHeartBeat.getTime());
+ // Deregister sub-cluster as SC_LOST if last heartbeat too old
+ if (timeUntilDeregister < 0) {
+ LOG.warn(
+ "Deregistering subcluster {} in state {} last heartbeat at {}",
+ subClusterInfo.getSubClusterId(), subClusterInfo.getState(),
+ new Date(subClusterInfo.getLastHeartBeat()));
+ try {
+ this.gpgContext.getStateStoreFacade().deregisterSubCluster(
+ subClusterInfo.getSubClusterId(), SubClusterState.SC_LOST);
+ } catch (Exception e) {
+ LOG.error("deregisterSubCluster failed on subcluster "
+ + subClusterInfo.getSubClusterId(), e);
+ }
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("Time until deregister for subcluster {}: {}",
+ entry.getKey(),
+ DurationFormatUtils.formatDurationISO(timeUntilDeregister));
+ }
+ }
+ }
+ } catch (Throwable e) {
+ LOG.error("Subcluster cleaner fails: ", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f833e1b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/package-info.java
new file mode 100644
index 0000000..f65444a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f833e1b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/TestSubClusterCleaner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/TestSubClusterCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/TestSubClusterCleaner.java
new file mode 100644
index 0000000..19b8802
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/TestSubClusterCleaner.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for Sub-cluster Cleaner in GPG.
+ */
+public class TestSubClusterCleaner {
+
+ private Configuration conf;
+ private MemoryFederationStateStore stateStore;
+ private FederationStateStoreFacade facade;
+ private SubClusterCleaner cleaner;
+ private GPGContext gpgContext;
+
+ private ArrayList<SubClusterId> subClusterIds;
+
+ @Before
+ public void setup() throws YarnException {
+ conf = new YarnConfiguration();
+
+ // subcluster expires in one second
+ conf.setLong(YarnConfiguration.GPG_SUBCLUSTER_EXPIRATION_MS, 1000);
+
+ stateStore = new MemoryFederationStateStore();
+ stateStore.init(conf);
+
+ facade = FederationStateStoreFacade.getInstance();
+ facade.reinitialize(stateStore, conf);
+
+ gpgContext = new GPGContextImpl();
+ gpgContext.setStateStoreFacade(facade);
+
+ cleaner = new SubClusterCleaner(conf, gpgContext);
+
+ // Create and register six sub clusters
+ subClusterIds = new ArrayList<SubClusterId>();
+ for (int i = 0; i < 3; i++) {
+ // Create sub cluster id and info
+ SubClusterId subClusterId =
+ SubClusterId.newInstance("SUBCLUSTER-" + Integer.toString(i));
+
+ SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId,
+ "1.2.3.4:1", "1.2.3.4:2", "1.2.3.4:3", "1.2.3.4:4",
+ SubClusterState.SC_RUNNING, System.currentTimeMillis(), "");
+ // Register the sub cluster
+ stateStore.registerSubCluster(
+ SubClusterRegisterRequest.newInstance(subClusterInfo));
+ // Append the id to a local list
+ subClusterIds.add(subClusterId);
+ }
+ }
+
+ @After
+ public void breakDown() throws Exception {
+ stateStore.close();
+ }
+
+ @Test
+ public void testSubClusterRegisterHeartBeatTime() throws YarnException {
+ cleaner.run();
+ Assert.assertEquals(3, facade.getSubClusters(true, true).size());
+ }
+
+ /**
+ * Test the base use case.
+ */
+ @Test
+ public void testSubClusterHeartBeat() throws YarnException {
+ // The first subcluster reports as Unhealthy
+ SubClusterId subClusterId = subClusterIds.get(0);
+ stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest
+ .newInstance(subClusterId, SubClusterState.SC_UNHEALTHY, "capacity"));
+
+ // The second subcluster didn't heartbeat for two seconds, should mark lost
+ subClusterId = subClusterIds.get(1);
+ stateStore.setSubClusterLastHeartbeat(subClusterId,
+ System.currentTimeMillis() - 2000);
+
+ cleaner.run();
+ Assert.assertEquals(1, facade.getSubClusters(true, true).size());
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org