You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/07/28 22:57:59 UTC
[04/50] [abbrv] hadoop git commit: YARN-3672. Create Facade for
Federation State and Policy Store. Contributed by Subru Krishnan
YARN-3672. Create Facade for Federation State and Policy Store. Contributed by Subru Krishnan
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/34d4c900
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/34d4c900
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/34d4c900
Branch: refs/heads/YARN-2915
Commit: 34d4c900289c8f3562be8534bb12680dd52278b1
Parents: 0213f66
Author: Jian He <ji...@apache.org>
Authored: Wed Aug 17 11:13:19 2016 +0800
Committer: Subru Krishnan <su...@apache.org>
Committed: Fri Jul 28 15:45:05 2017 -0700
----------------------------------------------------------------------
hadoop-project/pom.xml | 13 +
.../hadoop/yarn/conf/YarnConfiguration.java | 13 +
.../yarn/conf/TestYarnConfigurationFields.java | 4 +
.../src/main/resources/yarn-default.xml | 20 +-
.../hadoop-yarn-server-common/pom.xml | 10 +
.../utils/FederationStateStoreFacade.java | 532 +++++++++++++++++++
.../server/federation/utils/package-info.java | 17 +
.../utils/FederationStateStoreTestUtil.java | 149 ++++++
.../utils/TestFederationStateStoreFacade.java | 148 ++++++
9 files changed, 905 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d4c900/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index b9819b4..93bbcf8 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -98,6 +98,9 @@
<apacheds.version>2.0.0-M21</apacheds.version>
<ldap-api.version>1.0.0-M33</ldap-api.version>
+ <jcache.version>1.0.0</jcache.version>
+ <ehcache.version>3.0.3</ehcache.version>
+
<!-- define the Java language version used by the compiler -->
<javac.version>1.8</javac.version>
@@ -1265,6 +1268,16 @@
<artifactId>kerb-simplekdc</artifactId>
<version>1.0.0</version>
</dependency>
+ <dependency>
+ <groupId>javax.cache</groupId>
+ <artifactId>cache-api</artifactId>
+ <version>${jcache.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.ehcache</groupId>
+ <artifactId>ehcache</artifactId>
+ <version>${ehcache.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d4c900/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 3e778ee..fe6c7b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2560,6 +2560,19 @@ public class YarnConfiguration extends Configuration {
////////////////////////////////
public static final String FEDERATION_PREFIX = YARN_PREFIX + "federation.";
+
+ public static final String FEDERATION_STATESTORE_CLIENT_CLASS =
+ FEDERATION_PREFIX + "state-store.class";
+
+ public static final String DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS =
+ "org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore";
+
+ public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS =
+ FEDERATION_PREFIX + "cache-ttl.secs";
+
+ // 5 minutes
+ public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
+
public static final String FEDERATION_MACHINE_LIST =
FEDERATION_PREFIX + "machine-list";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d4c900/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index 3da4bab..bfc2534 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -68,6 +68,10 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
.YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCETRACKER_PROTOCOL);
configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR);
+ // Federation default configs to be ignored
+ configurationPropsToSkipCompare
+ .add(YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS);
+
// Ignore blacklisting nodes for AM failures feature since it is still a
// "work in progress"
configurationPropsToSkipCompare.add(YarnConfiguration.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d4c900/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index e20aad5..0b0a160 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2686,8 +2686,8 @@
<description>The arguments to pass to the Node label script.</description>
<name>yarn.nodemanager.node-labels.provider.script.opts</name>
</property>
- <!-- Other Configuration -->
+ <!-- Federation Configuration -->
<property>
<description>
Machine list file to be loaded by the FederationSubCluster Resolver
@@ -2696,6 +2696,24 @@
</property>
<property>
+ <description>
+ Store class name for federation state store
+ </description>
+ <name>yarn.federation.state-store.class</name>
+ <value>org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore</value>
+ </property>
+
+ <property>
+ <description>
+ The time in seconds after which the federation state store local cache
+ will be refreshed periodically
+ </description>
+ <name>yarn.federation.cache-ttl.secs</name>
+ <value>300</value>
+ </property>
+
+ <!-- Other Configuration -->
+ <property>
<description>The interval that the yarn client library uses to poll the
completion status of the asynchronous API of application client protocol.
</description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d4c900/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index 89dec30..def5357 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -102,6 +102,16 @@
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
</dependency>
+ <dependency>
+ <groupId>javax.cache</groupId>
+ <artifactId>cache-api</artifactId>
+ <version>${jcache.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.ehcache</groupId>
+ <artifactId>ehcache</artifactId>
+ <version>${ehcache.version}</version>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d4c900/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
new file mode 100644
index 0000000..f1c8218
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -0,0 +1,532 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.utils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.cache.Cache;
+import javax.cache.CacheManager;
+import javax.cache.Caching;
+import javax.cache.configuration.CompleteConfiguration;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableConfiguration;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.integration.CacheLoader;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.spi.CachingProvider;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ *
+ * The FederationStateStoreFacade is an utility wrapper that provides singleton
+ * access to the Federation state store. It abstracts out retries and in
+ * addition, it also implements the caching for various objects.
+ *
+ */
+public final class FederationStateStoreFacade {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FederationStateStoreFacade.class);
+
+ private static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters";
+ private static final String GET_POLICIES_CONFIGURATIONS_CACHEID =
+ "getPoliciesConfigurations";
+
+ private static final FederationStateStoreFacade FACADE =
+ new FederationStateStoreFacade();
+
+ private FederationStateStore stateStore;
+ private int cacheTimeToLive;
+ private Configuration conf;
+ private Cache<Object, Object> cache;
+
+ private FederationStateStoreFacade() {
+ initializeFacadeInternal(new Configuration());
+ }
+
+ private void initializeFacadeInternal(Configuration config) {
+ this.conf = config;
+ try {
+ this.stateStore = (FederationStateStore) createRetryInstance(this.conf,
+ YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS,
+ YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS,
+ FederationStateStore.class, createRetryPolicy(conf));
+ this.stateStore.init(conf);
+
+ initCache();
+
+ } catch (YarnException ex) {
+ LOG.error("Failed to initialize the FederationStateStoreFacade object",
+ ex);
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * Delete and re-initialize the cache, to force it to use the given
+ * configuration.
+ *
+ * @param store the {@link FederationStateStore} instance to reinitialize with
+ * @param config the updated configuration to reinitialize with
+ */
+ @VisibleForTesting
+ public synchronized void reinitialize(FederationStateStore store,
+ Configuration config) {
+ this.conf = config;
+ this.stateStore = store;
+ clearCache();
+ initCache();
+ }
+
+ public static RetryPolicy createRetryPolicy(Configuration conf) {
+ // Retry settings for StateStore
+ RetryPolicy retryPolicy = RetryPolicies.exponentialBackoffRetry(
+ conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, Integer.SIZE),
+ conf.getLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
+ YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS),
+ TimeUnit.MILLISECONDS);
+
+ return retryPolicy;
+ }
+
+ private boolean isCachingEnabled() {
+ return (cacheTimeToLive > 0);
+ }
+
+ private void initCache() {
+ // Picking the JCache provider from classpath, need to make sure there's
+ // no conflict or pick up a specific one in the future
+ cacheTimeToLive =
+ conf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS,
+ YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS);
+ if (isCachingEnabled()) {
+ CachingProvider jcacheProvider = Caching.getCachingProvider();
+ CacheManager jcacheManager = jcacheProvider.getCacheManager();
+ this.cache = jcacheManager.getCache(this.getClass().getSimpleName());
+ if (this.cache == null) {
+ LOG.info("Creating a JCache Manager with name "
+ + this.getClass().getSimpleName());
+ Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive);
+ CompleteConfiguration<Object, Object> configuration =
+ new MutableConfiguration<Object, Object>().setStoreByValue(false)
+ .setReadThrough(true)
+ .setExpiryPolicyFactory(
+ new FactoryBuilder.SingletonFactory<ExpiryPolicy>(
+ new CreatedExpiryPolicy(cacheExpiry)))
+ .setCacheLoaderFactory(
+ new FactoryBuilder.SingletonFactory<CacheLoader<Object, Object>>(
+ new CacheLoaderImpl<Object, Object>()));
+ this.cache = jcacheManager.createCache(this.getClass().getSimpleName(),
+ configuration);
+ }
+ }
+ }
+
+ private void clearCache() {
+ CachingProvider jcacheProvider = Caching.getCachingProvider();
+ CacheManager jcacheManager = jcacheProvider.getCacheManager();
+
+ jcacheManager.destroyCache(this.getClass().getSimpleName());
+ this.cache = null;
+ }
+
+ /**
+ * Returns the singleton instance of the FederationStateStoreFacade object.
+ *
+ * @return the singleton {@link FederationStateStoreFacade} instance
+ */
+ public static FederationStateStoreFacade getInstance() {
+ return FACADE;
+ }
+
+ /**
+ * Returns the {@link SubClusterInfo} for the specified {@link SubClusterId}.
+ *
+ * @param subClusterId the identifier of the sub-cluster
+ * @return the sub cluster information
+ * @throws YarnException if the call to the state store is unsuccessful
+ */
+ public SubClusterInfo getSubCluster(final SubClusterId subClusterId)
+ throws YarnException {
+ if (isCachingEnabled()) {
+ return getSubClusters(false).get(subClusterId);
+ } else {
+ return stateStore
+ .getSubCluster(GetSubClusterInfoRequest.newInstance(subClusterId))
+ .getSubClusterInfo();
+ }
+ }
+
+ /**
+ * Updates the cache with the central {@link FederationStateStore} and returns
+ * the {@link SubClusterInfo} for the specified {@link SubClusterId}.
+ *
+ * @param subClusterId the identifier of the sub-cluster
+ * @param flushCache flag to indicate if the cache should be flushed or not
+ * @return the sub cluster information
+ * @throws YarnException if the call to the state store is unsuccessful
+ */
+ public SubClusterInfo getSubCluster(final SubClusterId subClusterId,
+ final boolean flushCache) throws YarnException {
+ if (flushCache && isCachingEnabled()) {
+ LOG.info("Flushing subClusters from cache and rehydrating from store,"
+ + " most likely on account of RM failover.");
+ cache.remove(buildGetSubClustersCacheRequest(false));
+ }
+ return getSubCluster(subClusterId);
+ }
+
+ /**
+ * Returns the {@link SubClusterInfo} of all active sub cluster(s).
+ *
+ * @param filterInactiveSubClusters whether to filter out inactive
+ * sub-clusters
+ * @return the information of all active sub cluster(s)
+ * @throws YarnException if the call to the state store is unsuccessful
+ */
+ @SuppressWarnings("unchecked")
+ public Map<SubClusterId, SubClusterInfo> getSubClusters(
+ final boolean filterInactiveSubClusters) throws YarnException {
+ try {
+ if (isCachingEnabled()) {
+ return (Map<SubClusterId, SubClusterInfo>) cache
+ .get(buildGetSubClustersCacheRequest(filterInactiveSubClusters));
+ } else {
+ return buildSubClusterInfoMap(stateStore.getSubClusters(
+ GetSubClustersInfoRequest.newInstance(filterInactiveSubClusters)));
+ }
+ } catch (Throwable ex) {
+ throw new YarnException(ex);
+ }
+ }
+
+ /**
+ * Returns the {@link SubClusterPolicyConfiguration} for the specified queue.
+ *
+ * @param queue the queue whose policy is required
+ * @return the corresponding configured policy
+ * @throws YarnException if the call to the state store is unsuccessful
+ */
+ public SubClusterPolicyConfiguration getPolicyConfiguration(
+ final String queue) throws YarnException {
+ if (isCachingEnabled()) {
+ return getPoliciesConfigurations().get(queue);
+ } else {
+ return stateStore
+ .getPolicyConfiguration(
+ GetSubClusterPolicyConfigurationRequest.newInstance(queue))
+ .getPolicyConfiguration();
+ }
+
+ }
+
+ /**
+ * Get the policies that is represented as
+ * {@link SubClusterPolicyConfiguration} for all currently active queues in
+ * the system.
+ *
+ * @return the policies for all currently active queues in the system
+ * @throws YarnException if the call to the state store is unsuccessful
+ */
+ @SuppressWarnings("unchecked")
+ public Map<String, SubClusterPolicyConfiguration> getPoliciesConfigurations()
+ throws YarnException {
+ try {
+ if (isCachingEnabled()) {
+ return (Map<String, SubClusterPolicyConfiguration>) cache
+ .get(buildGetPoliciesConfigurationsCacheRequest());
+ } else {
+ return buildPolicyConfigMap(stateStore.getPoliciesConfigurations(
+ GetSubClusterPoliciesConfigurationsRequest.newInstance()));
+ }
+ } catch (Throwable ex) {
+ throw new YarnException(ex);
+ }
+ }
+
+ /**
+ * Adds the home {@link SubClusterId} for the specified {@link ApplicationId}.
+ *
+ * @param appHomeSubCluster the mapping of the application to it's home
+ * sub-cluster
+ * @throws YarnException if the call to the state store is unsuccessful
+ */
+ public void addApplicationHomeSubCluster(
+ ApplicationHomeSubCluster appHomeSubCluster) throws YarnException {
+ stateStore.addApplicationHomeSubCluster(
+ AddApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster));
+ return;
+ }
+
+ /**
+ * Updates the home {@link SubClusterId} for the specified
+ * {@link ApplicationId}.
+ *
+ * @param appHomeSubCluster the mapping of the application to it's home
+ * sub-cluster
+ * @throws YarnException if the call to the state store is unsuccessful
+ */
+ public void updateApplicationHomeSubCluster(
+ ApplicationHomeSubCluster appHomeSubCluster) throws YarnException {
+ stateStore.updateApplicationHomeSubCluster(
+ UpdateApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster));
+ return;
+ }
+
+ /**
+ * Returns the home {@link SubClusterId} for the specified
+ * {@link ApplicationId}.
+ *
+ * @param appId the identifier of the application
+ * @return the home sub cluster identifier
+ * @throws YarnException if the call to the state store is unsuccessful
+ */
+ public SubClusterId getApplicationHomeSubCluster(ApplicationId appId)
+ throws YarnException {
+ GetApplicationHomeSubClusterResponse response =
+ stateStore.getApplicationHomeSubCluster(
+ GetApplicationHomeSubClusterRequest.newInstance(appId));
+ return response.getApplicationHomeSubCluster().getHomeSubCluster();
+ }
+
+ /**
+ * Helper method to create instances of Object using the class name defined in
+ * the configuration object. The instances creates {@link RetryProxy} using
+ * the specific {@link RetryPolicy}.
+ *
+ * @param conf the yarn configuration
+ * @param configuredClassName the configuration provider key
+ * @param defaultValue the default implementation for fallback
+ * @param type the class for which a retry proxy is required
+ * @param retryPolicy the policy for retrying method call failures
+ * @return a retry proxy for the specified interface
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> Object createRetryInstance(Configuration conf,
+ String configuredClassName, String defaultValue, Class<T> type,
+ RetryPolicy retryPolicy) {
+
+ String className = conf.get(configuredClassName, defaultValue);
+ try {
+ Class<?> clusterResolverClass = conf.getClassByName(className);
+ if (type.isAssignableFrom(clusterResolverClass)) {
+ return RetryProxy.create(type,
+ (T) ReflectionUtils.newInstance(clusterResolverClass, conf),
+ retryPolicy);
+ } else {
+ throw new YarnRuntimeException(
+ "Class: " + className + " not instance of " + type.getSimpleName());
+ }
+ } catch (Exception e) {
+ throw new YarnRuntimeException("Could not instantiate : " + className, e);
+ }
+ }
+
+ private Map<SubClusterId, SubClusterInfo> buildSubClusterInfoMap(
+ final GetSubClustersInfoResponse response) {
+ List<SubClusterInfo> subClusters = response.getSubClusters();
+ Map<SubClusterId, SubClusterInfo> subClustersMap =
+ new HashMap<>(subClusters.size());
+ for (SubClusterInfo subCluster : subClusters) {
+ subClustersMap.put(subCluster.getSubClusterId(), subCluster);
+ }
+ return subClustersMap;
+ }
+
+ private Object buildGetSubClustersCacheRequest(
+ final boolean filterInactiveSubClusters) {
+ final String cacheKey = buildCacheKey(getClass().getSimpleName(),
+ GET_SUBCLUSTERS_CACHEID, null);
+ CacheRequest<String, Map<SubClusterId, SubClusterInfo>> cacheRequest =
+ new CacheRequest<String, Map<SubClusterId, SubClusterInfo>>(cacheKey,
+ new Func<String, Map<SubClusterId, SubClusterInfo>>() {
+ @Override
+ public Map<SubClusterId, SubClusterInfo> invoke(String key)
+ throws Exception {
+ GetSubClustersInfoResponse subClusters =
+ stateStore.getSubClusters(GetSubClustersInfoRequest
+ .newInstance(filterInactiveSubClusters));
+ return buildSubClusterInfoMap(subClusters);
+ }
+ });
+ return cacheRequest;
+ }
+
+ private Map<String, SubClusterPolicyConfiguration> buildPolicyConfigMap(
+ GetSubClusterPoliciesConfigurationsResponse response) {
+ List<SubClusterPolicyConfiguration> policyConfigs =
+ response.getPoliciesConfigs();
+ Map<String, SubClusterPolicyConfiguration> queuePolicyConfigs =
+ new HashMap<>();
+ for (SubClusterPolicyConfiguration policyConfig : policyConfigs) {
+ queuePolicyConfigs.put(policyConfig.getQueue(), policyConfig);
+ }
+ return queuePolicyConfigs;
+ }
+
+ private Object buildGetPoliciesConfigurationsCacheRequest() {
+ final String cacheKey = buildCacheKey(getClass().getSimpleName(),
+ GET_POLICIES_CONFIGURATIONS_CACHEID, null);
+ CacheRequest<String, Map<String, SubClusterPolicyConfiguration>> cacheRequest =
+ new CacheRequest<String, Map<String, SubClusterPolicyConfiguration>>(
+ cacheKey,
+ new Func<String, Map<String, SubClusterPolicyConfiguration>>() {
+ @Override
+ public Map<String, SubClusterPolicyConfiguration> invoke(
+ String key) throws Exception {
+ GetSubClusterPoliciesConfigurationsResponse policyConfigs =
+ stateStore.getPoliciesConfigurations(
+ GetSubClusterPoliciesConfigurationsRequest
+ .newInstance());
+ return buildPolicyConfigMap(policyConfigs);
+ }
+ });
+ return cacheRequest;
+ }
+
+ protected String buildCacheKey(String typeName, String methodName,
+ String argName) {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append(typeName).append(".");
+ buffer.append(methodName);
+ if (argName != null) {
+ buffer.append("::");
+ buffer.append(argName);
+ }
+ return buffer.toString();
+ }
+
+ /**
+ * Internal class that implements the CacheLoader interface that can be
+ * plugged into the CacheManager to load objects into the cache for specified
+ * keys.
+ */
+ private static class CacheLoaderImpl<K, V> implements CacheLoader<K, V> {
+ @SuppressWarnings("unchecked")
+ @Override
+ public V load(K key) throws CacheLoaderException {
+ try {
+ CacheRequest<K, V> query = (CacheRequest<K, V>) key;
+ assert query != null;
+ return query.getValue();
+ } catch (Throwable ex) {
+ throw new CacheLoaderException(ex);
+ }
+ }
+
+ @Override
+ public Map<K, V> loadAll(Iterable<? extends K> keys)
+ throws CacheLoaderException {
+ // The FACADE does not use the Cache's getAll API. Hence this is not
+ // required to be implemented
+ throw new NotImplementedException();
+ }
+ }
+
+ /**
+ * Internal class that encapsulates the cache key and a function that returns
+ * the value for the specified key.
+ */
+ private static class CacheRequest<K, V> {
+ private K key;
+ private Func<K, V> func;
+
+ public CacheRequest(K key, Func<K, V> func) {
+ this.key = key;
+ this.func = func;
+ }
+
+ public V getValue() throws Exception {
+ return func.invoke(key);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((key == null) ? 0 : key.hashCode());
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ CacheRequest<K, V> other = (CacheRequest<K, V>) obj;
+ if (key == null) {
+ if (other.key != null) {
+ return false;
+ }
+ } else if (!key.equals(other.key)) {
+ return false;
+ }
+
+ return true;
+ }
+ }
+
+ /**
+ * Encapsulates a method that has one parameter and returns a value of the
+ * type specified by the TResult parameter.
+ */
+ protected interface Func<T, TResult> {
+ TResult invoke(T input) throws Exception;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d4c900/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/package-info.java
new file mode 100644
index 0000000..39a46ec
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/package-info.java
@@ -0,0 +1,17 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.utils;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d4c900/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java
new file mode 100644
index 0000000..c179521
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.utils;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.util.MonotonicClock;
+
+/**
+ * Utility class for FederationStateStore unit tests.
+ */
+public class FederationStateStoreTestUtil {
+
+ private static final MonotonicClock CLOCK = new MonotonicClock();
+
+ public static final String SC_PREFIX = "SC-";
+ public static final String Q_PREFIX = "queue-";
+ public static final String POLICY_PREFIX = "policy-";
+
+ private FederationStateStore stateStore;
+
+ public FederationStateStoreTestUtil(FederationStateStore stateStore) {
+ this.stateStore = stateStore;
+ }
+
+ private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) {
+
+ String amRMAddress = "1.2.3.4:1";
+ String clientRMAddress = "1.2.3.4:2";
+ String rmAdminAddress = "1.2.3.4:3";
+ String webAppAddress = "1.2.3.4:4";
+
+ return SubClusterInfo.newInstance(subClusterId, amRMAddress,
+ clientRMAddress, rmAdminAddress, webAppAddress, SubClusterState.SC_NEW,
+ CLOCK.getTime(), "capability");
+ }
+
+ private void registerSubCluster(SubClusterId subClusterId)
+ throws YarnException {
+
+ SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
+ stateStore.registerSubCluster(
+ SubClusterRegisterRequest.newInstance(subClusterInfo));
+ }
+
+ public void registerSubClusters(int numSubClusters) throws YarnException {
+
+ for (int i = 0; i < numSubClusters; i++) {
+ registerSubCluster(SubClusterId.newInstance(SC_PREFIX + i));
+ }
+ }
+
+ private void addApplicationHomeSC(ApplicationId appId,
+ SubClusterId subClusterId) throws YarnException {
+ ApplicationHomeSubCluster ahsc =
+ ApplicationHomeSubCluster.newInstance(appId, subClusterId);
+ AddApplicationHomeSubClusterRequest request =
+ AddApplicationHomeSubClusterRequest.newInstance(ahsc);
+ stateStore.addApplicationHomeSubCluster(request);
+ }
+
+ public void addAppsHomeSC(long clusterTs, int numApps) throws YarnException {
+ for (int i = 0; i < numApps; i++) {
+ addApplicationHomeSC(ApplicationId.newInstance(clusterTs, i),
+ SubClusterId.newInstance(SC_PREFIX + i));
+ }
+ }
+
+ private SubClusterPolicyConfiguration createSCPolicyConf(String queueName,
+ String policyType) {
+ return SubClusterPolicyConfiguration.newInstance(queueName, policyType,
+ ByteBuffer.allocate(1));
+ }
+
+ private void setPolicyConf(String queue, String policyType)
+ throws YarnException {
+ SetSubClusterPolicyConfigurationRequest request =
+ SetSubClusterPolicyConfigurationRequest
+ .newInstance(createSCPolicyConf(queue, policyType));
+ stateStore.setPolicyConfiguration(request);
+ }
+
+ public void addPolicyConfigs(int numQueues) throws YarnException {
+
+ for (int i = 0; i < numQueues; i++) {
+ setPolicyConf(Q_PREFIX + i, POLICY_PREFIX + i);
+ }
+ }
+
+ public SubClusterInfo querySubClusterInfo(SubClusterId subClusterId)
+ throws YarnException {
+ GetSubClusterInfoRequest request =
+ GetSubClusterInfoRequest.newInstance(subClusterId);
+ return stateStore.getSubCluster(request).getSubClusterInfo();
+ }
+
+ public SubClusterId queryApplicationHomeSC(ApplicationId appId)
+ throws YarnException {
+ GetApplicationHomeSubClusterRequest request =
+ GetApplicationHomeSubClusterRequest.newInstance(appId);
+
+ GetApplicationHomeSubClusterResponse response =
+ stateStore.getApplicationHomeSubCluster(request);
+
+ return response.getApplicationHomeSubCluster().getHomeSubCluster();
+ }
+
+ public SubClusterPolicyConfiguration queryPolicyConfiguration(String queue)
+ throws YarnException {
+ GetSubClusterPolicyConfigurationRequest request =
+ GetSubClusterPolicyConfigurationRequest.newInstance(queue);
+
+ GetSubClusterPolicyConfigurationResponse result =
+ stateStore.getPolicyConfiguration(request);
+ return result.getPolicyConfiguration();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d4c900/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java
new file mode 100644
index 0000000..53f4f84
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Unit tests for FederationStateStoreFacade.
+ */
+@RunWith(Parameterized.class)
+public class TestFederationStateStoreFacade {
+
+ @Parameters
+ public static Collection<Boolean[]> getParameters() {
+ return Arrays
+ .asList(new Boolean[][] {{Boolean.FALSE }, {Boolean.TRUE } });
+ }
+
+ private final long clusterTs = System.currentTimeMillis();
+ private final int numSubClusters = 3;
+ private final int numApps = 5;
+ private final int numQueues = 2;
+
+ private Configuration conf;
+ private FederationStateStore stateStore;
+ private FederationStateStoreTestUtil stateStoreTestUtil;
+ private FederationStateStoreFacade facade =
+ FederationStateStoreFacade.getInstance();
+
+ public TestFederationStateStoreFacade(Boolean isCachingEnabled) {
+ conf = new Configuration();
+ if (!(isCachingEnabled.booleanValue())) {
+ conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+ }
+ }
+
+ @Before
+ public void setUp() throws IOException, YarnException {
+ stateStore = new MemoryFederationStateStore();
+ stateStore.init(conf);
+ facade.reinitialize(stateStore, conf);
+ // hydrate the store
+ stateStoreTestUtil = new FederationStateStoreTestUtil(stateStore);
+ stateStoreTestUtil.registerSubClusters(numSubClusters);
+ stateStoreTestUtil.addAppsHomeSC(clusterTs, numApps);
+ stateStoreTestUtil.addPolicyConfigs(numQueues);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ stateStore.close();
+ stateStore = null;
+ }
+
+ @Test
+ public void testGetSubCluster() throws YarnException {
+ for (int i = 0; i < numSubClusters; i++) {
+ SubClusterId subClusterId =
+ SubClusterId.newInstance(FederationStateStoreTestUtil.SC_PREFIX + i);
+ Assert.assertEquals(stateStoreTestUtil.querySubClusterInfo(subClusterId),
+ facade.getSubCluster(subClusterId));
+ }
+ }
+
+ @Test
+ public void testGetSubClusterFlushCache() throws YarnException {
+ for (int i = 0; i < numSubClusters; i++) {
+ SubClusterId subClusterId =
+ SubClusterId.newInstance(FederationStateStoreTestUtil.SC_PREFIX + i);
+ Assert.assertEquals(stateStoreTestUtil.querySubClusterInfo(subClusterId),
+ facade.getSubCluster(subClusterId, true));
+ }
+ }
+
+ @Test
+ public void testGetSubClusters() throws YarnException {
+ Map<SubClusterId, SubClusterInfo> subClusters =
+ facade.getSubClusters(false);
+ for (SubClusterId subClusterId : subClusters.keySet()) {
+ Assert.assertEquals(stateStoreTestUtil.querySubClusterInfo(subClusterId),
+ subClusters.get(subClusterId));
+ }
+ }
+
+ @Test
+ public void testGetPolicyConfiguration() throws YarnException {
+ for (int i = 0; i < numQueues; i++) {
+ String queue = FederationStateStoreTestUtil.Q_PREFIX + i;
+ Assert.assertEquals(stateStoreTestUtil.queryPolicyConfiguration(queue),
+ facade.getPolicyConfiguration(queue));
+ }
+ }
+
+ @Test
+ public void testGetPoliciesConfigurations() throws YarnException {
+ Map<String, SubClusterPolicyConfiguration> queuePolicies =
+ facade.getPoliciesConfigurations();
+ for (String queue : queuePolicies.keySet()) {
+ Assert.assertEquals(stateStoreTestUtil.queryPolicyConfiguration(queue),
+ queuePolicies.get(queue));
+ }
+ }
+
+ @Test
+ public void testGetHomeSubClusterForApp() throws YarnException {
+ for (int i = 0; i < numApps; i++) {
+ ApplicationId appId = ApplicationId.newInstance(clusterTs, i);
+ Assert.assertEquals(stateStoreTestUtil.queryApplicationHomeSC(appId),
+ facade.getApplicationHomeSubCluster(appId));
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org