You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/05/09 00:00:26 UTC
[20/42] hadoop git commit: YARN-3671. Integrate Federation services
with ResourceManager. Contributed by Subru Krishnan
YARN-3671. Integrate Federation services with ResourceManager. Contributed by Subru Krishnan
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e3d508f0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e3d508f0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e3d508f0
Branch: refs/heads/YARN-2915
Commit: e3d508f07207026ad02a0fc73021dbe3284cd4b3
Parents: b96be3d
Author: Jian He <ji...@apache.org>
Authored: Tue Aug 30 12:20:52 2016 +0800
Committer: Subru Krishnan <su...@apache.org>
Committed: Mon May 8 16:59:12 2017 -0700
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 11 +-
.../yarn/conf/TestYarnConfigurationFields.java | 4 +-
.../failover/FederationProxyProviderUtil.java | 2 +-
.../FederationRMFailoverProxyProvider.java | 4 +-
...ationMembershipStateStoreInputValidator.java | 7 +-
.../TestFederationStateStoreInputValidator.java | 10 +-
.../server/resourcemanager/ResourceManager.java | 26 ++
.../FederationStateStoreHeartbeat.java | 108 +++++++
.../federation/FederationStateStoreService.java | 304 +++++++++++++++++++
.../federation/package-info.java | 17 ++
.../webapp/dao/ClusterMetricsInfo.java | 5 +-
.../TestFederationRMStateStoreService.java | 170 +++++++++++
12 files changed, 648 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3d508f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 05321e1..3508fad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2546,9 +2546,6 @@ public class YarnConfiguration extends Configuration {
FEDERATION_PREFIX + "failover.enabled";
public static final boolean DEFAULT_FEDERATION_FAILOVER_ENABLED = true;
- public static final String FEDERATION_SUBCLUSTER_ID =
- FEDERATION_PREFIX + "sub-cluster.id";
-
public static final String FEDERATION_STATESTORE_CLIENT_CLASS =
FEDERATION_PREFIX + "state-store.class";
@@ -2561,6 +2558,14 @@ public class YarnConfiguration extends Configuration {
// 5 minutes
public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
+ public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS =
+ FEDERATION_PREFIX + "state-store.heartbeat-interval-secs";
+
+ // 5 minutes
+ public static final int
+ DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS =
+ 5 * 60;
+
public static final String FEDERATION_MACHINE_LIST =
FEDERATION_PREFIX + "machine-list";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3d508f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index c4d8f38..5e0876f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -72,9 +72,9 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
configurationPropsToSkipCompare
.add(YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS);
configurationPropsToSkipCompare
- .add(YarnConfiguration.FEDERATION_SUBCLUSTER_ID);
- configurationPropsToSkipCompare
.add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED);
+ configurationPropsToSkipCompare
+ .add(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS);
// Ignore blacklisting nodes for AM failures feature since it is still a
// "work in progress"
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3d508f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
index a986008..18f1338 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
@@ -134,7 +134,7 @@ public final class FederationProxyProviderUtil {
// are based out of conf
private static void updateConf(Configuration conf,
SubClusterId subClusterId) {
- conf.set(YarnConfiguration.FEDERATION_SUBCLUSTER_ID, subClusterId.getId());
+ conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
// In a Federation setting, we will connect to not just the local cluster RM
// but also multiple external RMs. The membership information of all the RMs
// that are currently
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3d508f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
index 90a9239..0ffab0b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
@@ -74,8 +74,8 @@ public class FederationRMFailoverProxyProvider<T>
this.protocol = proto;
this.rmProxy.checkAllowedProtocols(this.protocol);
String clusterId =
- configuration.get(YarnConfiguration.FEDERATION_SUBCLUSTER_ID);
- Preconditions.checkNotNull(clusterId, "Missing Federation SubClusterId");
+ configuration.get(YarnConfiguration.RM_CLUSTER_ID);
+ Preconditions.checkNotNull(clusterId, "Missing RM ClusterId");
this.subClusterId = SubClusterId.newInstance(clusterId);
this.facade = facade.getInstance();
if (configuration instanceof YarnConfiguration) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3d508f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
index b587ee5..ff9d8e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
@@ -159,7 +159,10 @@ public final class FederationMembershipStateStoreInputValidator {
}
/**
- * Validate if the SubCluster Info are present or not.
+ * Validate if all the required fields on {@link SubClusterInfo} are present
+ * or not. {@code Capability} will be empty as the corresponding
+ * {@code ResourceManager} is in the process of initialization during
+ * registration.
*
* @param subClusterInfo the information of the subcluster to be verified
* @throws FederationStateStoreInvalidInputException if the SubCluster Info
@@ -194,8 +197,6 @@ public final class FederationMembershipStateStoreInputValidator {
// validate subcluster state
checkSubClusterState(subClusterInfo.getState());
- // validate subcluster capability
- checkCapability(subClusterInfo.getCapability());
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3d508f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java
index 13175ae..b95f17a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java
@@ -242,11 +242,8 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
.validateSubClusterRegisterRequest(request);
- Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
- LOG.info(e.getMessage());
- Assert.assertTrue(
- e.getMessage().startsWith("Invalid capability information."));
+ Assert.fail(e.getMessage());
}
// Execution with Empty Capability
@@ -260,11 +257,8 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
.validateSubClusterRegisterRequest(request);
- Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
- LOG.info(e.getMessage());
- Assert.assertTrue(
- e.getMessage().startsWith("Invalid capability information."));
+ Assert.fail(e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3d508f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 81c3f1b..2a5f03e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
+import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreService;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher;
@@ -188,6 +189,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected RMAppManager rmAppManager;
protected ApplicationACLsManager applicationACLsManager;
protected QueueACLsManager queueACLsManager;
+ private FederationStateStoreService federationStateStoreService;
private WebApp webApp;
private AppReportFetcher fetcher = null;
protected ResourceTrackerService resourceTracker;
@@ -504,6 +506,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
return new RMTimelineCollectorManager(rmContext);
}
+ private FederationStateStoreService createFederationStateStoreService() {
+ return new FederationStateStoreService(rmContext);
+ }
+
protected SystemMetricsPublisher createSystemMetricsPublisher() {
SystemMetricsPublisher publisher;
if (YarnConfiguration.timelineServiceEnabled(conf) &&
@@ -732,6 +738,20 @@ public class ResourceManager extends CompositeService implements Recoverable {
delegationTokenRenewer.setRMContext(rmContext);
}
+ if(HAUtil.isFederationEnabled(conf)) {
+ String cId = YarnConfiguration.getClusterId(conf);
+ if (cId.isEmpty()) {
+ String errMsg =
+ "Cannot initialize RM as Federation is enabled"
+ + " but cluster id is not configured.";
+ LOG.error(errMsg);
+ throw new YarnRuntimeException(errMsg);
+ }
+ federationStateStoreService = createFederationStateStoreService();
+ addIfService(federationStateStoreService);
+ LOG.info("Initialized Federation membership.");
+ }
+
new RMNMInfo(rmContext, scheduler);
super.serviceInit(conf);
@@ -1396,6 +1416,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
@Private
+ @VisibleForTesting
+ public FederationStateStoreService getFederationStateStoreService() {
+ return this.federationStateStoreService;
+ }
+
+ @Private
WebApp getWebapp() {
return this.webApp;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3d508f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreHeartbeat.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreHeartbeat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreHeartbeat.java
new file mode 100644
index 0000000..a4618a2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreHeartbeat.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.federation;
+
+import java.io.StringWriter;
+
+import javax.xml.bind.JAXBException;
+
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sun.jersey.api.json.JSONConfiguration;
+import com.sun.jersey.api.json.JSONJAXBContext;
+import com.sun.jersey.api.json.JSONMarshaller;
+
+/**
+ * Periodic heart beat from a <code>ResourceManager</code> participating in
+ * federation to indicate liveliness. The heart beat publishes the current
+ * capabilities as represented by {@link ClusterMetricsInfo} of the sub cluster.
+ *
+ */
+public class FederationStateStoreHeartbeat implements Runnable {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FederationStateStoreHeartbeat.class);
+
+ private SubClusterId subClusterId;
+ private FederationStateStore stateStoreService;
+
+ private final ResourceScheduler rs;
+
+ private StringWriter currentClusterState;
+ private JSONJAXBContext jc;
+ private JSONMarshaller marshaller;
+ private String capability;
+
+ public FederationStateStoreHeartbeat(SubClusterId subClusterId,
+ FederationStateStore stateStoreClient, ResourceScheduler scheduler) {
+ this.stateStoreService = stateStoreClient;
+ this.subClusterId = subClusterId;
+ this.rs = scheduler;
+ // Initialize the JAXB Marshaller
+ this.currentClusterState = new StringWriter();
+ try {
+ this.jc = new JSONJAXBContext(
+ JSONConfiguration.mapped().rootUnwrapping(false).build(),
+ ClusterMetricsInfo.class);
+ marshaller = jc.createJSONMarshaller();
+ } catch (JAXBException e) {
+ LOG.warn("Exception while trying to initialize JAXB context.", e);
+ }
+ LOG.info("Initialized Federation membership for cluster with timestamp: "
+ + ResourceManager.getClusterTimeStamp());
+ }
+
+ /**
+ * Get the current cluster state as a JSON string representation of the
+ * {@link ClusterMetricsInfo}.
+ */
+ private void updateClusterState() {
+ try {
+ // get the current state
+ currentClusterState.getBuffer().setLength(0);
+ ClusterMetricsInfo clusterMetricsInfo = new ClusterMetricsInfo(rs);
+ marshaller.marshallToJSON(clusterMetricsInfo, currentClusterState);
+ capability = currentClusterState.toString();
+ } catch (Exception e) {
+ LOG.warn("Exception while trying to generate cluster state,"
+ + " so reverting to last know state.", e);
+ }
+ }
+
+ @Override
+ public synchronized void run() {
+ try {
+ updateClusterState();
+ SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest
+ .newInstance(subClusterId, SubClusterState.SC_RUNNING, capability);
+ stateStoreService.subClusterHeartbeat(request);
+ LOG.debug("Sending the heartbeat with capability: {}", capability);
+ } catch (Exception e) {
+ LOG.warn("Exception when trying to heartbeat: ", e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3d508f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java
new file mode 100644
index 0000000..9a01d7e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.federation;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Implements {@link FederationStateStore} and provides a service for
+ * participating in the federation membership.
+ */
+public class FederationStateStoreService extends AbstractService
+ implements FederationStateStore {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(FederationStateStoreService.class);
+
+ private Configuration config;
+ private ScheduledExecutorService scheduledExecutorService;
+ private FederationStateStoreHeartbeat stateStoreHeartbeat;
+ private FederationStateStore stateStoreClient = null;
+ private SubClusterId subClusterId;
+ private long heartbeatInterval;
+ private RMContext rmContext;
+
+ public FederationStateStoreService(RMContext rmContext) {
+ super(FederationStateStoreService.class.getName());
+ LOG.info("FederationStateStoreService initialized");
+ this.rmContext = rmContext;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+
+ this.config = conf;
+
+ RetryPolicy retryPolicy =
+ FederationStateStoreFacade.createRetryPolicy(conf);
+
+ this.stateStoreClient =
+ (FederationStateStore) FederationStateStoreFacade.createRetryInstance(
+ conf, YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS,
+ YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS,
+ FederationStateStore.class, retryPolicy);
+ this.stateStoreClient.init(conf);
+ LOG.info("Initialized state store client class");
+
+ this.subClusterId =
+ SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
+
+ heartbeatInterval = conf.getLong(
+ YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS,
+ YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS);
+ if (heartbeatInterval <= 0) {
+ heartbeatInterval =
+ YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS;
+ }
+ LOG.info("Initialized federation membership service.");
+
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+
+ registerAndInitializeHeartbeat();
+
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ Exception ex = null;
+ try {
+ if (this.scheduledExecutorService != null
+ && !this.scheduledExecutorService.isShutdown()) {
+ this.scheduledExecutorService.shutdown();
+ LOG.info("Stopped federation membership heartbeat");
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to shutdown ScheduledExecutorService", e);
+ ex = e;
+ }
+
+ if (this.stateStoreClient != null) {
+ try {
+ deregisterSubCluster(SubClusterDeregisterRequest
+ .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED));
+ } finally {
+ this.stateStoreClient.close();
+ }
+ }
+
+ if (ex != null) {
+ throw ex;
+ }
+ }
+
+ // Return a client accessible string representation of the service address.
+ private String getServiceAddress(InetSocketAddress address) {
+ InetSocketAddress socketAddress = NetUtils.getConnectAddress(address);
+ return socketAddress.getAddress().getHostAddress() + ":"
+ + socketAddress.getPort();
+ }
+
+ private void registerAndInitializeHeartbeat() {
+ String clientRMAddress =
+ getServiceAddress(rmContext.getClientRMService().getBindAddress());
+ String amRMAddress = getServiceAddress(
+ rmContext.getApplicationMasterService().getBindAddress());
+ String rmAdminAddress = getServiceAddress(
+ config.getSocketAddr(YarnConfiguration.RM_ADMIN_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADMIN_PORT));
+ String webAppAddress =
+ WebAppUtils.getResolvedRemoteRMWebAppURLWithoutScheme(config);
+
+ SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId,
+ amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress,
+ SubClusterState.SC_NEW, ResourceManager.getClusterTimeStamp(), "");
+ try {
+ registerSubCluster(SubClusterRegisterRequest.newInstance(subClusterInfo));
+ LOG.info("Successfully registered for federation subcluster: {}",
+ subClusterInfo);
+ } catch (Exception e) {
+ throw new YarnRuntimeException(
+ "Failed to register Federation membership with the StateStore", e);
+ }
+ stateStoreHeartbeat = new FederationStateStoreHeartbeat(subClusterId,
+ stateStoreClient, rmContext.getScheduler());
+ scheduledExecutorService =
+ HadoopExecutors.newSingleThreadScheduledExecutor();
+ scheduledExecutorService.scheduleWithFixedDelay(stateStoreHeartbeat,
+ heartbeatInterval, heartbeatInterval, TimeUnit.SECONDS);
+ LOG.info("Started federation membership heartbeat with interval: {}",
+ heartbeatInterval);
+ }
+
+ @VisibleForTesting
+ public FederationStateStore getStateStoreClient() {
+ return stateStoreClient;
+ }
+
+ @VisibleForTesting
+ public FederationStateStoreHeartbeat getStateStoreHeartbeatThread() {
+ return stateStoreHeartbeat;
+ }
+
+ @Override
+ public Version getCurrentVersion() {
+ return stateStoreClient.getCurrentVersion();
+ }
+
+ @Override
+ public Version loadVersion() {
+ return stateStoreClient.getCurrentVersion();
+ }
+
+ @Override
+ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
+ GetSubClusterPolicyConfigurationRequest request) throws YarnException {
+ return stateStoreClient.getPolicyConfiguration(request);
+ }
+
+ @Override
+ public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
+ SetSubClusterPolicyConfigurationRequest request) throws YarnException {
+ return stateStoreClient.setPolicyConfiguration(request);
+ }
+
+ @Override
+ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
+ GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
+ return stateStoreClient.getPoliciesConfigurations(request);
+ }
+
+ @Override
+ public SubClusterRegisterResponse registerSubCluster(
+ SubClusterRegisterRequest registerSubClusterRequest)
+ throws YarnException {
+ return stateStoreClient.registerSubCluster(registerSubClusterRequest);
+ }
+
+ @Override
+ public SubClusterDeregisterResponse deregisterSubCluster(
+ SubClusterDeregisterRequest subClusterDeregisterRequest)
+ throws YarnException {
+ return stateStoreClient.deregisterSubCluster(subClusterDeregisterRequest);
+ }
+
+ @Override
+ public SubClusterHeartbeatResponse subClusterHeartbeat(
+ SubClusterHeartbeatRequest subClusterHeartbeatRequest)
+ throws YarnException {
+ return stateStoreClient.subClusterHeartbeat(subClusterHeartbeatRequest);
+ }
+
+ @Override
+ public GetSubClusterInfoResponse getSubCluster(
+ GetSubClusterInfoRequest subClusterRequest) throws YarnException {
+ return stateStoreClient.getSubCluster(subClusterRequest);
+ }
+
+ @Override
+ public GetSubClustersInfoResponse getSubClusters(
+ GetSubClustersInfoRequest subClustersRequest) throws YarnException {
+ return stateStoreClient.getSubClusters(subClustersRequest);
+ }
+
+ @Override
+ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
+ AddApplicationHomeSubClusterRequest request) throws YarnException {
+ return stateStoreClient.addApplicationHomeSubCluster(request);
+ }
+
+ @Override
+ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
+ UpdateApplicationHomeSubClusterRequest request) throws YarnException {
+ return stateStoreClient.updateApplicationHomeSubCluster(request);
+ }
+
+ @Override
+ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
+ GetApplicationHomeSubClusterRequest request) throws YarnException {
+ return stateStoreClient.getApplicationHomeSubCluster(request);
+ }
+
+ @Override
+ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
+ GetApplicationsHomeSubClusterRequest request) throws YarnException {
+ return stateStoreClient.getApplicationsHomeSubCluster(request);
+ }
+
+ @Override
+ public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
+ DeleteApplicationHomeSubClusterRequest request) throws YarnException {
+ return stateStoreClient.deleteApplicationHomeSubCluster(request);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3d508f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/package-info.java
new file mode 100644
index 0000000..47c7c65
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/package-info.java
@@ -0,0 +1,17 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.federation;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3d508f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
index f083b05..dc42eb6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
@@ -65,7 +65,10 @@ public class ClusterMetricsInfo {
} // JAXB needs this
public ClusterMetricsInfo(final ResourceManager rm) {
- ResourceScheduler rs = rm.getResourceScheduler();
+ this(rm.getResourceScheduler());
+ }
+
+ public ClusterMetricsInfo(final ResourceScheduler rs) {
QueueMetrics metrics = rs.getRootQueueMetrics();
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3d508f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java
new file mode 100644
index 0000000..30f69b5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.federation;
+
+import java.io.IOException;
+import java.io.StringReader;
+
+import javax.xml.bind.JAXBException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.sun.jersey.api.json.JSONConfiguration;
+import com.sun.jersey.api.json.JSONJAXBContext;
+import com.sun.jersey.api.json.JSONUnmarshaller;
+
+/**
+ * Unit tests for FederationStateStoreService.
+ */
+public class TestFederationRMStateStoreService {
+
+ private final HAServiceProtocol.StateChangeRequestInfo requestInfo =
+ new HAServiceProtocol.StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+ private final SubClusterId subClusterId = SubClusterId.newInstance("SC-1");
+ private final GetSubClusterInfoRequest request =
+ GetSubClusterInfoRequest.newInstance(subClusterId);
+
+ private Configuration conf;
+ private FederationStateStore stateStore;
+ private long lastHearbeatTS = 0;
+ private JSONJAXBContext jc;
+ private JSONUnmarshaller unmarshaller;
+
+ @Before
+ public void setUp() throws IOException, YarnException, JAXBException {
+ conf = new YarnConfiguration();
+ jc = new JSONJAXBContext(
+ JSONConfiguration.mapped().rootUnwrapping(false).build(),
+ ClusterMetricsInfo.class);
+ unmarshaller = jc.createJSONUnmarshaller();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ unmarshaller = null;
+ jc = null;
+ }
+
+ @Test
+ public void testFederationStateStoreService() throws Exception {
+ conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+ conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
+ final MockRM rm = new MockRM(conf);
+
+ // Initially there should be no entry for the sub-cluster
+ rm.init(conf);
+ stateStore = rm.getFederationStateStoreService().getStateStoreClient();
+ try {
+ stateStore.getSubCluster(request);
+ Assert.fail("There should be no entry for the sub-cluster.");
+ } catch (YarnException e) {
+ Assert.assertTrue(e.getMessage().endsWith("does not exist"));
+ }
+
+ // Validate if sub-cluster is registered
+ rm.start();
+ String capability = checkSubClusterInfo(SubClusterState.SC_NEW);
+ Assert.assertTrue(capability.isEmpty());
+
+ // Heartbeat to see if sub-cluster transitions to running
+ FederationStateStoreHeartbeat storeHeartbeat =
+ rm.getFederationStateStoreService().getStateStoreHeartbeatThread();
+ storeHeartbeat.run();
+ capability = checkSubClusterInfo(SubClusterState.SC_RUNNING);
+ checkClusterMetricsInfo(capability, 0);
+
+ // heartbeat again after adding a node.
+ rm.registerNode("127.0.0.1:1234", 4 * 1024);
+ storeHeartbeat.run();
+ capability = checkSubClusterInfo(SubClusterState.SC_RUNNING);
+ checkClusterMetricsInfo(capability, 1);
+
+ // Validate sub-cluster deregistration
+ rm.getFederationStateStoreService()
+ .deregisterSubCluster(SubClusterDeregisterRequest
+ .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED));
+ checkSubClusterInfo(SubClusterState.SC_UNREGISTERED);
+
+ // check after failover
+ explicitFailover(rm);
+
+ capability = checkSubClusterInfo(SubClusterState.SC_NEW);
+ Assert.assertTrue(capability.isEmpty());
+
+ // Heartbeat to see if sub-cluster transitions to running
+ storeHeartbeat =
+ rm.getFederationStateStoreService().getStateStoreHeartbeatThread();
+ storeHeartbeat.run();
+ capability = checkSubClusterInfo(SubClusterState.SC_RUNNING);
+ checkClusterMetricsInfo(capability, 0);
+
+ // heartbeat again after adding a node.
+ rm.registerNode("127.0.0.1:1234", 4 * 1024);
+ storeHeartbeat.run();
+ capability = checkSubClusterInfo(SubClusterState.SC_RUNNING);
+ checkClusterMetricsInfo(capability, 1);
+
+ rm.stop();
+ }
+
+ private void explicitFailover(MockRM rm) throws IOException {
+ rm.getAdminService().transitionToStandby(requestInfo);
+ Assert.assertTrue(rm.getRMContext()
+ .getHAServiceState() == HAServiceProtocol.HAServiceState.STANDBY);
+ rm.getAdminService().transitionToActive(requestInfo);
+ Assert.assertTrue(rm.getRMContext()
+ .getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE);
+ lastHearbeatTS = 0;
+ stateStore = rm.getFederationStateStoreService().getStateStoreClient();
+ }
+
+ private void checkClusterMetricsInfo(String capability, int numNodes)
+ throws JAXBException {
+ ClusterMetricsInfo clusterMetricsInfo = unmarshaller.unmarshalFromJSON(
+ new StringReader(capability), ClusterMetricsInfo.class);
+ Assert.assertEquals(numNodes, clusterMetricsInfo.getTotalNodes());
+ }
+
+ private String checkSubClusterInfo(SubClusterState state)
+ throws YarnException {
+ Assert.assertNotNull(stateStore.getSubCluster(request));
+ SubClusterInfo response =
+ stateStore.getSubCluster(request).getSubClusterInfo();
+ Assert.assertEquals(state, response.getState());
+ Assert.assertTrue(response.getLastHeartBeat() >= lastHearbeatTS);
+ lastHearbeatTS = response.getLastHeartBeat();
+ return response.getCapability();
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org