You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bo...@apache.org on 2018/08/02 17:18:13 UTC
[50/50] [abbrv] hadoop git commit: YARN-7707. [GPG] Policy generator
framework. Contributed by Young Chen
YARN-7707. [GPG] Policy generator framework. Contributed by Young Chen
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6800cf70
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6800cf70
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6800cf70
Branch: refs/heads/YARN-7402
Commit: 6800cf7015d81cc0085ad0f9159e246842e72187
Parents: f833e1b
Author: Botong Huang <bo...@apache.org>
Authored: Fri Mar 23 17:07:10 2018 -0700
Committer: Botong Huang <bo...@apache.org>
Committed: Thu Aug 2 09:59:48 2018 -0700
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 36 +-
.../src/main/resources/yarn-default.xml | 40 +++
.../utils/FederationStateStoreFacade.java | 13 +
.../pom.xml | 18 +
.../globalpolicygenerator/GPGContext.java | 4 +
.../globalpolicygenerator/GPGContextImpl.java | 10 +
.../globalpolicygenerator/GPGPolicyFacade.java | 220 ++++++++++++
.../server/globalpolicygenerator/GPGUtils.java | 80 +++++
.../GlobalPolicyGenerator.java | 17 +
.../policygenerator/GlobalPolicy.java | 76 +++++
.../policygenerator/NoOpGlobalPolicy.java | 36 ++
.../policygenerator/PolicyGenerator.java | 261 ++++++++++++++
.../UniformWeightedLocalityGlobalPolicy.java | 71 ++++
.../policygenerator/package-info.java | 24 ++
.../TestGPGPolicyFacade.java | 202 +++++++++++
.../policygenerator/TestPolicyGenerator.java | 338 +++++++++++++++++++
.../src/test/resources/schedulerInfo1.json | 134 ++++++++
.../src/test/resources/schedulerInfo2.json | 196 +++++++++++
18 files changed, 1775 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index ec88411..61535fc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3342,7 +3342,7 @@ public class YarnConfiguration extends Configuration {
public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED =
false;
- private static final String FEDERATION_GPG_PREFIX =
+ public static final String FEDERATION_GPG_PREFIX =
FEDERATION_PREFIX + "gpg.";
// The number of threads to use for the GPG scheduled executor service
@@ -3360,6 +3360,40 @@ public class YarnConfiguration extends Configuration {
FEDERATION_GPG_PREFIX + "subcluster.heartbeat.expiration-ms";
public static final long DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS = 1800000;
+ public static final String FEDERATION_GPG_POLICY_PREFIX =
+ FEDERATION_GPG_PREFIX + "policy.generator.";
+
+ /** The interval at which the policy generator runs, default is one hour. */
+ public static final String GPG_POLICY_GENERATOR_INTERVAL_MS =
+ FEDERATION_GPG_POLICY_PREFIX + "interval-ms";
+ public static final long DEFAULT_GPG_POLICY_GENERATOR_INTERVAL_MS = -1;
+
+ /**
+ * The configured policy generator class, runs NoOpGlobalPolicy by
+ * default.
+ */
+ public static final String GPG_GLOBAL_POLICY_CLASS =
+ FEDERATION_GPG_POLICY_PREFIX + "class";
+ public static final String DEFAULT_GPG_GLOBAL_POLICY_CLASS =
+ "org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator."
+ + "NoOpGlobalPolicy";
+
+ /**
+ * Whether or not the policy generator is running in read only (won't modify
+ * policies), default is false.
+ */
+ public static final String GPG_POLICY_GENERATOR_READONLY =
+ FEDERATION_GPG_POLICY_PREFIX + "readonly";
+ public static final boolean DEFAULT_GPG_POLICY_GENERATOR_READONLY =
+ false;
+
+ /**
+ * Which sub-clusters the policy generator should blacklist.
+ */
+ public static final String GPG_POLICY_GENERATOR_BLACKLIST =
+ FEDERATION_GPG_POLICY_PREFIX + "blacklist";
+
+
////////////////////////////////
// Other Configs
////////////////////////////////
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 66493f3..755f3e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3557,6 +3557,46 @@
<property>
<description>
+ The interval at which the policy generator runs, default is one hour
+ </description>
+ <name>yarn.federation.gpg.policy.generator.interval-ms</name>
+ <value>3600000</value>
+ </property>
+
+ <property>
+ <description>
+ The configured policy generator class, runs NoOpGlobalPolicy by default
+ </description>
+ <name>yarn.federation.gpg.policy.generator.class</name>
+ <value>org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.NoOpGlobalPolicy</value>
+ </property>
+
+ <property>
+ <description>
+ Whether or not the policy generator is running in read only (won't modify policies), default is false
+ </description>
+ <name>yarn.federation.gpg.policy.generator.readonly</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <description>
+ Whether or not the policy generator is running in read only (won't modify policies), default is false
+ </description>
+ <name>yarn.federation.gpg.policy.generator.readonly</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <description>
+ Which subclusters the gpg should blacklist, default is none
+ </description>
+ <name>yarn.federation.gpg.policy.generator.blacklist</name>
+ <value></value>
+ </property>
+
+ <property>
+ <description>
It is TimelineClient 1.5 configuration whether to store active
application’s timeline data with in user directory i.e
${yarn.timeline-service.entity-group-fs-store.active-dir}/${user.name}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index 4c3bed0..25a9e52 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolic
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -373,6 +374,18 @@ public final class FederationStateStoreFacade {
}
/**
+ * Set a policy configuration into the state store.
+ *
+ * @param policyConf the policy configuration to set
+ * @throws YarnException if the request is invalid/fails
+ */
+ public void setPolicyConfiguration(SubClusterPolicyConfiguration policyConf)
+ throws YarnException {
+ stateStore.setPolicyConfiguration(
+ SetSubClusterPolicyConfigurationRequest.newInstance(policyConf));
+ }
+
+ /**
* Adds the home {@link SubClusterId} for the specified {@link ApplicationId}.
*
* @param appHomeSubCluster the mapping of the application to it's home
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml
index 9bbb936..9398b0b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml
@@ -63,6 +63,12 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
</dependency>
@@ -73,6 +79,12 @@
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
<type>test-jar</type>
@@ -92,6 +104,12 @@
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>src/test/resources/schedulerInfo1.json</exclude>
+ <exclude>src/test/resources/schedulerInfo2.json</exclude>
+ </excludes>
+ </configuration>
</plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java
index da8a383..6b0a5a4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java
@@ -28,4 +28,8 @@ public interface GPGContext {
FederationStateStoreFacade getStateStoreFacade();
void setStateStoreFacade(FederationStateStoreFacade facade);
+
+ GPGPolicyFacade getPolicyFacade();
+
+ void setPolicyFacade(GPGPolicyFacade facade);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java
index 3884ace..bb49844 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade
public class GPGContextImpl implements GPGContext {
private FederationStateStoreFacade facade;
+ private GPGPolicyFacade policyFacade;
@Override
public FederationStateStoreFacade getStateStoreFacade() {
@@ -38,4 +39,13 @@ public class GPGContextImpl implements GPGContext {
this.facade = federationStateStoreFacade;
}
+ @Override
+ public GPGPolicyFacade getPolicyFacade(){
+ return policyFacade;
+ }
+
+ @Override
+ public void setPolicyFacade(GPGPolicyFacade gpgPolicyfacade){
+ policyFacade = gpgPolicyfacade;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java
new file mode 100644
index 0000000..4c61a14
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
+import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A utility class for the GPG Policy Generator to read and write policies
+ * into the FederationStateStore. Policy specific logic is abstracted away in
+ * this class, so the PolicyGenerator can avoid dealing with policy
+ * construction, reinitialization, and serialization.
+ *
+ * There are only two exposed methods:
+ *
+ * {@link #getPolicyManager(String)}
+ * Gets the PolicyManager via queue name. Null if there is no policy
+ * configured for the specified queue. The PolicyManager can be used to
+ * extract the {@link FederationRouterPolicy} and
+ * {@link FederationAMRMProxyPolicy}, as well as any policy specific parameters
+ *
+ * {@link #setPolicyManager(FederationPolicyManager)}
+ * Sets the PolicyManager. If the policy configuration is the same, no change
+ * occurs. Otherwise, the internal cache is updated and the new configuration
+ * is written into the FederationStateStore
+ *
+ * This class assumes that the GPG is the only service
+ * writing policies. Thus, the only FederationStateStore reads occur the first
+ * time a queue policy is retrieved - after that, the GPG only writes to the
+ * FederationStateStore.
+ *
+ * The class uses a PolicyManager cache and a SubClusterPolicyConfiguration
+ * cache. The primary use for these caches are to serve reads, and to
+ * identify when the PolicyGenerator has actually changed the policy
+ * so unnecessary FederationStateStore policy writes can be avoided.
+ */
+
+public class GPGPolicyFacade {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(GPGPolicyFacade.class);
+
+ private FederationStateStoreFacade stateStore;
+
+ private Map<String, FederationPolicyManager> policyManagerMap;
+ private Map<String, SubClusterPolicyConfiguration> policyConfMap;
+
+ private boolean readOnly;
+
+ public GPGPolicyFacade(FederationStateStoreFacade stateStore,
+ Configuration conf) {
+ this.stateStore = stateStore;
+ this.policyManagerMap = new HashMap<>();
+ this.policyConfMap = new HashMap<>();
+ this.readOnly =
+ conf.getBoolean(YarnConfiguration.GPG_POLICY_GENERATOR_READONLY,
+ YarnConfiguration.DEFAULT_GPG_POLICY_GENERATOR_READONLY);
+ }
+
+ /**
+ * Provides a utility for the policy generator to read the policy manager
+ * from the FederationStateStore. Because the policy generator should be the
+ * only component updating the policy, this implementation does not use the
+ * reinitialization feature.
+ *
+ * @param queueName the name of the queue we want the policy manager for.
+ * @return the policy manager responsible for the queue policy.
+ */
+ public FederationPolicyManager getPolicyManager(String queueName)
+ throws YarnException {
+ FederationPolicyManager policyManager = policyManagerMap.get(queueName);
+ // If we don't have the policy manager cached, pull configuration
+ // from the FederationStateStore to create and cache it
+ if (policyManager == null) {
+ try {
+ // If we don't have the configuration cached, pull it
+ // from the stateStore
+ SubClusterPolicyConfiguration conf = policyConfMap.get(queueName);
+ if (conf == null) {
+ conf = stateStore.getPolicyConfiguration(queueName);
+ }
+ // If configuration is still null, it does not exist in the
+ // FederationStateStore
+ if (conf == null) {
+ LOG.info("Read null policy for queue {}", queueName);
+ return null;
+ }
+ policyManager =
+ FederationPolicyUtils.instantiatePolicyManager(conf.getType());
+ policyManager.setQueue(queueName);
+
+ // TODO there is currently no way to cleanly deserialize a policy
+ // manager sub type from just the configuration
+ if (policyManager instanceof WeightedLocalityPolicyManager) {
+ WeightedPolicyInfo wpinfo =
+ WeightedPolicyInfo.fromByteBuffer(conf.getParams());
+ WeightedLocalityPolicyManager wlpmanager =
+ (WeightedLocalityPolicyManager) policyManager;
+ LOG.info("Updating policy for queue {} to configured weights router: "
+ + "{}, amrmproxy: {}", queueName,
+ wpinfo.getRouterPolicyWeights(),
+ wpinfo.getAMRMPolicyWeights());
+ wlpmanager.setWeightedPolicyInfo(wpinfo);
+ } else {
+ LOG.warn("Warning: FederationPolicyManager of unsupported type {}, "
+ + "initialization may be incomplete ", policyManager.getClass());
+ }
+
+ policyManagerMap.put(queueName, policyManager);
+ policyConfMap.put(queueName, conf);
+ } catch (YarnException e) {
+ LOG.error("Error reading SubClusterPolicyConfiguration from state "
+ + "store for queue: {}", queueName);
+ throw e;
+ }
+ }
+ return policyManager;
+ }
+
+ /**
+ * Provides a utility for the policy generator to write a policy manager
+ * into the FederationStateStore. The facade keeps a cache and will only write
+ * into the FederationStateStore if the policy configuration has changed.
+ *
+ * @param policyManager The policy manager we want to update into the state
+ * store. It contains policy information as well as
+ * the queue name we will update for.
+ */
+ public void setPolicyManager(FederationPolicyManager policyManager)
+ throws YarnException {
+ if (policyManager == null) {
+ LOG.warn("Attempting to set null policy manager");
+ return;
+ }
+ // Extract the configuration from the policy manager
+ String queue = policyManager.getQueue();
+ SubClusterPolicyConfiguration conf;
+ try {
+ conf = policyManager.serializeConf();
+ } catch (FederationPolicyInitializationException e) {
+ LOG.warn("Error serializing policy for queue {}", queue);
+ throw e;
+ }
+ if (conf == null) {
+ // State store does not currently support setting a policy back to null
+ // because it reads the queue name to set from the policy!
+ LOG.warn("Skip setting policy to null for queue {} into state store",
+ queue);
+ return;
+ }
+ // Compare with configuration cache, if different, write the conf into
+ // store and update our conf and manager cache
+ if (!confCacheEqual(queue, conf)) {
+ try {
+ if (readOnly) {
+ LOG.info("[read-only] Skipping policy update for queue {}", queue);
+ return;
+ }
+ LOG.info("Updating policy for queue {} into state store", queue);
+ stateStore.setPolicyConfiguration(conf);
+ policyConfMap.put(queue, conf);
+ policyManagerMap.put(queue, policyManager);
+ } catch (YarnException e) {
+ LOG.warn("Error writing SubClusterPolicyConfiguration to state "
+ + "store for queue: {}", queue);
+ throw e;
+ }
+ } else {
+ LOG.info("Setting unchanged policy - state store write skipped");
+ }
+ }
+
+ /**
+ * @param queue the queue to check the cached policy configuration for
+ * @param conf the new policy configuration
+ * @return whether or not the conf is equal to the cached conf
+ */
+ private boolean confCacheEqual(String queue,
+ SubClusterPolicyConfiguration conf) {
+ SubClusterPolicyConfiguration cachedConf = policyConfMap.get(queue);
+ if (conf == null && cachedConf == null) {
+ return true;
+ } else if (conf != null && cachedConf != null) {
+ if (conf.equals(cachedConf)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
new file mode 100644
index 0000000..429bec4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+
+/**
+ * GPGUtils contains utility functions for the GPG.
+ *
+ */
+public final class GPGUtils {
+
+ // hide constructor
+ private GPGUtils() {
+ }
+
+ /**
+ * Performs an invocation of the the remote RMWebService.
+ */
+ public static <T> T invokeRMWebService(Configuration conf, String webAddr,
+ String path, final Class<T> returnType) {
+ Client client = Client.create();
+ T obj = null;
+
+ WebResource webResource = client.resource(webAddr);
+ ClientResponse response = webResource.path("ws/v1/cluster").path(path)
+ .accept(MediaType.APPLICATION_XML).get(ClientResponse.class);
+ if (response.getStatus() == HttpServletResponse.SC_OK) {
+ obj = response.getEntity(returnType);
+ } else {
+ throw new YarnRuntimeException("Bad response from remote web service: "
+ + response.getStatus());
+ }
+ return obj;
+ }
+
+ /**
+ * Creates a uniform weighting of 1.0 for each sub cluster.
+ */
+ public static Map<SubClusterIdInfo, Float> createUniformWeights(
+ Set<SubClusterId> ids) {
+ Map<SubClusterIdInfo, Float> weights =
+ new HashMap<>();
+ for(SubClusterId id : ids) {
+ weights.put(new SubClusterIdInfo(id), 1.0f);
+ }
+ return weights;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
index f6cfba0..88b9f2b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator;
import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,6 +63,7 @@ public class GlobalPolicyGenerator extends CompositeService {
// Scheduler service that runs tasks periodically
private ScheduledThreadPoolExecutor scheduledExecutorService;
private SubClusterCleaner subClusterCleaner;
+ private PolicyGenerator policyGenerator;
public GlobalPolicyGenerator() {
super(GlobalPolicyGenerator.class.getName());
@@ -73,11 +75,15 @@ public class GlobalPolicyGenerator extends CompositeService {
// Set up the context
this.gpgContext
.setStateStoreFacade(FederationStateStoreFacade.getInstance());
+ this.gpgContext
+ .setPolicyFacade(new GPGPolicyFacade(
+ this.gpgContext.getStateStoreFacade(), conf));
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(
conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS));
this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext);
+ this.policyGenerator = new PolicyGenerator(conf, this.gpgContext);
DefaultMetricsSystem.initialize(METRICS_NAME);
@@ -99,6 +105,17 @@ public class GlobalPolicyGenerator extends CompositeService {
LOG.info("Scheduled sub-cluster cleaner with interval: {}",
DurationFormatUtils.formatDurationISO(scCleanerIntervalMs));
}
+
+ // Schedule PolicyGenerator
+ long policyGeneratorIntervalMillis = getConfig().getLong(
+ YarnConfiguration.GPG_POLICY_GENERATOR_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_GPG_POLICY_GENERATOR_INTERVAL_MS);
+ if(policyGeneratorIntervalMillis > 0){
+ this.scheduledExecutorService.scheduleAtFixedRate(this.policyGenerator,
+ 0, policyGeneratorIntervalMillis, TimeUnit.MILLISECONDS);
+ LOG.info("Scheduled policygenerator with interval: {}",
+ DurationFormatUtils.formatDurationISO(policyGeneratorIntervalMillis));
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java
new file mode 100644
index 0000000..38d762d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * This interface defines the plug-able policy that the PolicyGenerator uses
+ * to update policies into the state store.
+ */
+
+public abstract class GlobalPolicy implements Configurable {
+
+ private Configuration conf;
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * Return a map of the object type and RM path to request it from - the
+ * framework will query these paths and provide the objects to the policy.
+ * Delegating this responsibility to the PolicyGenerator enables us to avoid
+ * duplicate calls to the same * endpoints as the GlobalPolicy is invoked
+ * once per queue.
+ */
+ protected Map<Class, String> registerPaths() {
+ // Default register nothing
+ return Collections.emptyMap();
+ }
+
+ /**
+ * Given a queue, cluster metrics, and policy manager, update the policy
+ * to account for the cluster status. This method defines the policy generator
+ * behavior.
+ *
+ * @param queueName name of the queue
+ * @param clusterInfo subClusterId map to cluster information about the
+ * SubCluster used to make policy decisions
+ * @param manager the FederationPolicyManager for the queue's existing
+ * policy the manager may be null, in which case the policy
+ * will need to be created
+ * @return policy manager that handles the updated (or created) policy
+ */
+ protected abstract FederationPolicyManager updatePolicy(String queueName,
+ Map<SubClusterId, Map<Class, Object>> clusterInfo,
+ FederationPolicyManager manager);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/NoOpGlobalPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/NoOpGlobalPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/NoOpGlobalPolicy.java
new file mode 100644
index 0000000..c2d578f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/NoOpGlobalPolicy.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+
+import java.util.Map;
+
+/**
+ * Default policy that does not update any policy configurations.
+ */
+public class NoOpGlobalPolicy extends GlobalPolicy{
+
+ @Override
+ public FederationPolicyManager updatePolicy(String queueName,
+ Map<SubClusterId, Map<Class, Object>> clusterInfo,
+ FederationPolicyManager manager) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
new file mode 100644
index 0000000..5681ff0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The PolicyGenerator runs periodically and updates the policy configuration
+ * for each queue into the FederationStateStore. The policy update behavior is
+ * defined by the GlobalPolicy instance that is used.
+ */
+
+public class PolicyGenerator implements Runnable, Configurable {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PolicyGenerator.class);
+
+ private GPGContext gpgContext;
+ private Configuration conf;
+
+ // Information request map
+ private Map<Class, String> pathMap = new HashMap<>();
+
+ // Global policy instance
+ @VisibleForTesting
+ protected GlobalPolicy policy;
+
+ /**
+ * The PolicyGenerator periodically reads SubCluster load and updates
+ * policies into the FederationStateStore.
+ */
+ public PolicyGenerator(Configuration conf, GPGContext context) {
+ setConf(conf);
+ init(context);
+ }
+
+ private void init(GPGContext context) {
+ this.gpgContext = context;
+ LOG.info("Initialized PolicyGenerator");
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ this.policy = FederationStateStoreFacade
+ .createInstance(conf, YarnConfiguration.GPG_GLOBAL_POLICY_CLASS,
+ YarnConfiguration.DEFAULT_GPG_GLOBAL_POLICY_CLASS,
+ GlobalPolicy.class);
+ policy.setConf(conf);
+ pathMap.putAll(policy.registerPaths());
+ }
+
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ @Override
+ public final void run() {
+ Map<SubClusterId, SubClusterInfo> activeSubClusters;
+ try {
+ activeSubClusters = gpgContext.getStateStoreFacade().getSubClusters(true);
+ } catch (YarnException e) {
+ LOG.error("Error retrieving active sub-clusters", e);
+ return;
+ }
+
+ // Parse the scheduler information from all the SCs
+ Map<SubClusterId, SchedulerInfo> schedInfo =
+ getSchedulerInfo(activeSubClusters);
+
+ // Extract and enforce that all the schedulers have matching type
+ Set<String> queueNames = extractQueues(schedInfo);
+
+ // Remove black listed SubClusters
+ activeSubClusters.keySet().removeAll(getBlackList());
+ LOG.info("Active non-blacklist sub-clusters: {}",
+ activeSubClusters.keySet());
+
+ // Get cluster metrics information from non black listed RMs - later used
+ // to evaluate SubCluster load
+ Map<SubClusterId, Map<Class, Object>> clusterInfo =
+ getInfos(activeSubClusters);
+
+ // Update into the FederationStateStore
+ for (String queueName : queueNames) {
+ // Retrieve the manager from the policy facade
+ FederationPolicyManager manager;
+ try {
+ manager = this.gpgContext.getPolicyFacade().getPolicyManager(queueName);
+ } catch (YarnException e) {
+ LOG.error("GetPolicy for queue {} failed", queueName, e);
+ continue;
+ }
+ LOG.info("Updating policy for queue {}", queueName);
+ manager = policy.updatePolicy(queueName, clusterInfo, manager);
+ try {
+ this.gpgContext.getPolicyFacade().setPolicyManager(manager);
+ } catch (YarnException e) {
+ LOG.error("SetPolicy for queue {} failed", queueName, e);
+ }
+ }
+ }
+
+ /**
+ * Helper to retrieve metrics from the RM REST endpoints.
+ *
+ * @param activeSubClusters A map of active SubCluster IDs to info
+ */
+ @VisibleForTesting
+ protected Map<SubClusterId, Map<Class, Object>> getInfos(
+ Map<SubClusterId, SubClusterInfo> activeSubClusters) {
+
+ Map<SubClusterId, Map<Class, Object>> clusterInfo = new HashMap<>();
+ for (SubClusterInfo sci : activeSubClusters.values()) {
+ for (Map.Entry<Class, String> e : this.pathMap.entrySet()) {
+ if (!clusterInfo.containsKey(sci.getSubClusterId())) {
+ clusterInfo.put(sci.getSubClusterId(), new HashMap<Class, Object>());
+ }
+ Object ret = GPGUtils
+ .invokeRMWebService(conf, sci.getRMWebServiceAddress(),
+ e.getValue(), e.getKey());
+ clusterInfo.get(sci.getSubClusterId()).put(e.getKey(), ret);
+ }
+ }
+
+ return clusterInfo;
+ }
+
+ /**
+ * Helper to retrieve SchedulerInfos.
+ *
+ * @param activeSubClusters A map of active SubCluster IDs to info
+ */
+ @VisibleForTesting
+ protected Map<SubClusterId, SchedulerInfo> getSchedulerInfo(
+ Map<SubClusterId, SubClusterInfo> activeSubClusters) {
+ Map<SubClusterId, SchedulerInfo> schedInfo =
+ new HashMap<>();
+ for (SubClusterInfo sci : activeSubClusters.values()) {
+ SchedulerTypeInfo sti = GPGUtils
+ .invokeRMWebService(conf, sci.getRMWebServiceAddress(),
+ RMWSConsts.SCHEDULER, SchedulerTypeInfo.class);
+ if(sti != null){
+ schedInfo.put(sci.getSubClusterId(), sti.getSchedulerInfo());
+ } else {
+ LOG.warn("Skipped null scheduler info from SubCluster " + sci
+ .getSubClusterId().toString());
+ }
+ }
+ return schedInfo;
+ }
+
+ /**
+ * Helper to get a set of blacklisted SubCluster Ids from configuration.
+ */
+ private Set<SubClusterId> getBlackList() {
+ String blackListParam =
+ conf.get(YarnConfiguration.GPG_POLICY_GENERATOR_BLACKLIST);
+ if(blackListParam == null){
+ return Collections.emptySet();
+ }
+ Set<SubClusterId> blackList = new HashSet<>();
+ for (String id : blackListParam.split(",")) {
+ blackList.add(SubClusterId.newInstance(id));
+ }
+ return blackList;
+ }
+
+ /**
+ * Given the scheduler information for all RMs, extract the union of
+ * queue names - right now we only consider instances of capacity scheduler.
+ *
+ * @param schedInfo the scheduler information
+ * @return a set of queue names
+ */
+ private Set<String> extractQueues(
+ Map<SubClusterId, SchedulerInfo> schedInfo) {
+ Set<String> queueNames = new HashSet<String>();
+ for (Map.Entry<SubClusterId, SchedulerInfo> entry : schedInfo.entrySet()) {
+ if (entry.getValue() instanceof CapacitySchedulerInfo) {
+ // Flatten the queue structure and get only non leaf queues
+ queueNames.addAll(flattenQueue((CapacitySchedulerInfo) entry.getValue())
+ .get(CapacitySchedulerQueueInfo.class));
+ } else {
+ LOG.warn("Skipping SubCluster {}, not configured with capacity "
+ + "scheduler", entry.getKey());
+ }
+ }
+ return queueNames;
+ }
+
+ // Helpers to flatten the queue structure into a multimap of
+ // queue type to set of queue names
+ private Map<Class, Set<String>> flattenQueue(CapacitySchedulerInfo csi) {
+ Map<Class, Set<String>> flattened = new HashMap<Class, Set<String>>();
+ addOrAppend(flattened, csi.getClass(), csi.getQueueName());
+ for (CapacitySchedulerQueueInfo csqi : csi.getQueues().getQueueInfoList()) {
+ flattenQueue(csqi, flattened);
+ }
+ return flattened;
+ }
+
+ private void flattenQueue(CapacitySchedulerQueueInfo csi,
+ Map<Class, Set<String>> flattened) {
+ addOrAppend(flattened, csi.getClass(), csi.getQueueName());
+ if (csi.getQueues() != null) {
+ for (CapacitySchedulerQueueInfo csqi : csi.getQueues()
+ .getQueueInfoList()) {
+ flattenQueue(csqi, flattened);
+ }
+ }
+ }
+
+ private <K, V> void addOrAppend(Map<K, Set<V>> multimap, K key, V value) {
+ if (!multimap.containsKey(key)) {
+ multimap.put(key, new HashSet<V>());
+ }
+ multimap.get(key).add(value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/UniformWeightedLocalityGlobalPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/UniformWeightedLocalityGlobalPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/UniformWeightedLocalityGlobalPolicy.java
new file mode 100644
index 0000000..826cb02
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/UniformWeightedLocalityGlobalPolicy.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import org.apache.commons.math3.optim.nonlinear.vector.Weight;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Simple policy that generates and updates uniform weighted locality
+ * policies.
+ */
+public class UniformWeightedLocalityGlobalPolicy extends GlobalPolicy{
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(UniformWeightedLocalityGlobalPolicy.class);
+
+ @Override
+ protected FederationPolicyManager updatePolicy(String queueName,
+ Map<SubClusterId, Map<Class, Object>> clusterInfo,
+ FederationPolicyManager currentManager){
+ if(currentManager == null){
+ // Set uniform weights for all SubClusters
+ LOG.info("Creating uniform weighted policy queue {}", queueName);
+ WeightedLocalityPolicyManager manager =
+ new WeightedLocalityPolicyManager();
+ manager.setQueue(queueName);
+ Map<SubClusterIdInfo, Float> policyWeights =
+ GPGUtils.createUniformWeights(clusterInfo.keySet());
+ manager.getWeightedPolicyInfo().setAMRMPolicyWeights(policyWeights);
+ manager.getWeightedPolicyInfo().setRouterPolicyWeights(policyWeights);
+ currentManager = manager;
+ }
+ if(currentManager instanceof WeightedLocalityPolicyManager){
+ LOG.info("Updating policy for queue {} to default weights", queueName);
+ WeightedLocalityPolicyManager wlpmanager =
+ (WeightedLocalityPolicyManager) currentManager;
+ wlpmanager.getWeightedPolicyInfo().setAMRMPolicyWeights(
+ GPGUtils.createUniformWeights(clusterInfo.keySet()));
+ wlpmanager.getWeightedPolicyInfo().setRouterPolicyWeights(
+ GPGUtils.createUniformWeights(clusterInfo.keySet()));
+ } else {
+ LOG.info("Policy for queue {} is of type {}, expected {}",
+ queueName, currentManager.getClass(), Weight.class);
+ }
+ return currentManager;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/package-info.java
new file mode 100644
index 0000000..e8ff436
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/package-info.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Classes comprising the policy generator for the GPG. Responsibilities include
+ * generating and updating policies based on the cluster status.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java
new file mode 100644
index 0000000..d78c11f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Unit test for GPG Policy Facade.
+ */
+public class TestGPGPolicyFacade {
+
+ private Configuration conf;
+ private FederationStateStore stateStore;
+ private FederationStateStoreFacade facade =
+ FederationStateStoreFacade.getInstance();
+ private GPGPolicyFacade policyFacade;
+
+ private Set<SubClusterId> subClusterIds;
+
+ private SubClusterPolicyConfiguration testConf;
+
+ private static final String TEST_QUEUE = "test-queue";
+
+ public TestGPGPolicyFacade() {
+ conf = new Configuration();
+ conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+ subClusterIds = new HashSet<>();
+ subClusterIds.add(SubClusterId.newInstance("sc0"));
+ subClusterIds.add(SubClusterId.newInstance("sc1"));
+ subClusterIds.add(SubClusterId.newInstance("sc2"));
+ }
+
+ @Before
+ public void setUp() throws IOException, YarnException {
+ stateStore = new MemoryFederationStateStore();
+ stateStore.init(conf);
+ facade.reinitialize(stateStore, conf);
+ policyFacade = new GPGPolicyFacade(facade, conf);
+ WeightedLocalityPolicyManager manager =
+ new WeightedLocalityPolicyManager();
+ // Add a test policy for test queue
+ manager.setQueue(TEST_QUEUE);
+ manager.getWeightedPolicyInfo().setAMRMPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ manager.getWeightedPolicyInfo().setRouterPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ testConf = manager.serializeConf();
+ stateStore.setPolicyConfiguration(SetSubClusterPolicyConfigurationRequest
+ .newInstance(testConf));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ stateStore.close();
+ stateStore = null;
+ }
+
+ @Test
+ public void testGetPolicy() throws YarnException {
+ WeightedLocalityPolicyManager manager =
+ (WeightedLocalityPolicyManager) policyFacade
+ .getPolicyManager(TEST_QUEUE);
+ Assert.assertEquals(testConf, manager.serializeConf());
+ }
+
+ /**
+ * Test that new policies are written into the state store.
+ */
+ @Test
+ public void testSetNewPolicy() throws YarnException {
+ WeightedLocalityPolicyManager manager =
+ new WeightedLocalityPolicyManager();
+ manager.setQueue(TEST_QUEUE + 0);
+ manager.getWeightedPolicyInfo().setAMRMPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ manager.getWeightedPolicyInfo().setRouterPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ SubClusterPolicyConfiguration policyConf = manager.serializeConf();
+ policyFacade.setPolicyManager(manager);
+
+ manager =
+ (WeightedLocalityPolicyManager) policyFacade
+ .getPolicyManager(TEST_QUEUE + 0);
+ Assert.assertEquals(policyConf, manager.serializeConf());
+ }
+
+ /**
+ * Test that overwriting policies are updated in the state store.
+ */
+ @Test
+ public void testOverwritePolicy() throws YarnException {
+ subClusterIds.add(SubClusterId.newInstance("sc3"));
+ WeightedLocalityPolicyManager manager =
+ new WeightedLocalityPolicyManager();
+ manager.setQueue(TEST_QUEUE);
+ manager.getWeightedPolicyInfo().setAMRMPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ manager.getWeightedPolicyInfo().setRouterPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ SubClusterPolicyConfiguration policyConf = manager.serializeConf();
+ policyFacade.setPolicyManager(manager);
+
+ manager =
+ (WeightedLocalityPolicyManager) policyFacade
+ .getPolicyManager(TEST_QUEUE);
+ Assert.assertEquals(policyConf, manager.serializeConf());
+ }
+
+ /**
+ * Test that the write through cache works.
+ */
+ @Test
+ public void testWriteCache() throws YarnException {
+ stateStore = mock(MemoryFederationStateStore.class);
+ facade.reinitialize(stateStore, conf);
+ when(stateStore.getPolicyConfiguration(Matchers.any(
+ GetSubClusterPolicyConfigurationRequest.class))).thenReturn(
+ GetSubClusterPolicyConfigurationResponse.newInstance(testConf));
+ policyFacade = new GPGPolicyFacade(facade, conf);
+
+ // Query once to fill the cache
+ FederationPolicyManager manager = policyFacade.getPolicyManager(TEST_QUEUE);
+ // State store should be contacted once
+ verify(stateStore, times(1)).getPolicyConfiguration(
+ Matchers.any(GetSubClusterPolicyConfigurationRequest.class));
+
+ // If we set the same policy, the state store should be untouched
+ policyFacade.setPolicyManager(manager);
+ verify(stateStore, times(0)).setPolicyConfiguration(
+ Matchers.any(SetSubClusterPolicyConfigurationRequest.class));
+ }
+
+ /**
+ * Test that when read only is enabled, the state store is not changed.
+ */
+ @Test
+ public void testReadOnly() throws YarnException {
+ conf.setBoolean(YarnConfiguration.GPG_POLICY_GENERATOR_READONLY, true);
+ stateStore = mock(MemoryFederationStateStore.class);
+ facade.reinitialize(stateStore, conf);
+ when(stateStore.getPolicyConfiguration(Matchers.any(
+ GetSubClusterPolicyConfigurationRequest.class))).thenReturn(
+ GetSubClusterPolicyConfigurationResponse.newInstance(testConf));
+ policyFacade = new GPGPolicyFacade(facade, conf);
+
+ // If we set a policy, the state store should be untouched
+ WeightedLocalityPolicyManager manager =
+ new WeightedLocalityPolicyManager();
+ // Add a test policy for test queue
+ manager.setQueue(TEST_QUEUE);
+ manager.getWeightedPolicyInfo().setAMRMPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ manager.getWeightedPolicyInfo().setRouterPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ policyFacade.setPolicyManager(manager);
+ verify(stateStore, times(0)).setPolicyConfiguration(
+ Matchers.any(SetSubClusterPolicyConfigurationRequest.class));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java
new file mode 100644
index 0000000..9d27b3b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import com.sun.jersey.api.json.JSONConfiguration;
+import com.sun.jersey.api.json.JSONJAXBContext;
+import com.sun.jersey.api.json.JSONUnmarshaller;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGPolicyFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.io.StringReader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test for GPG Policy Generator.
+ */
+public class TestPolicyGenerator {
+
+ private static final int NUM_SC = 3;
+
+ private Configuration conf;
+ private FederationStateStore stateStore;
+ private FederationStateStoreFacade facade =
+ FederationStateStoreFacade.getInstance();
+
+ private List<SubClusterId> subClusterIds;
+ private Map<SubClusterId, SubClusterInfo> subClusterInfos;
+ private Map<SubClusterId, Map<Class, Object>> clusterInfos;
+ private Map<SubClusterId, SchedulerInfo> schedulerInfos;
+
+ private GPGContext gpgContext;
+
+ private PolicyGenerator policyGenerator;
+
+ public TestPolicyGenerator() {
+ conf = new Configuration();
+ conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+
+ gpgContext = new GPGContextImpl();
+ gpgContext.setPolicyFacade(new GPGPolicyFacade(facade, conf));
+ gpgContext.setStateStoreFacade(facade);
+ }
+
+ @Before
+ public void setUp() throws IOException, YarnException, JAXBException {
+ subClusterIds = new ArrayList<>();
+ subClusterInfos = new HashMap<>();
+ clusterInfos = new HashMap<>();
+ schedulerInfos = new HashMap<>();
+
+ CapacitySchedulerInfo sti1 =
+ readJSON("src/test/resources/schedulerInfo1.json",
+ CapacitySchedulerInfo.class);
+ CapacitySchedulerInfo sti2 =
+ readJSON("src/test/resources/schedulerInfo2.json",
+ CapacitySchedulerInfo.class);
+
+ // Set up sub clusters
+ for (int i = 0; i < NUM_SC; ++i) {
+ // Sub cluster Id
+ SubClusterId id = SubClusterId.newInstance("sc" + i);
+ subClusterIds.add(id);
+
+ // Sub cluster info
+ SubClusterInfo cluster = SubClusterInfo
+ .newInstance(id, "amrm:" + i, "clientrm:" + i, "rmadmin:" + i,
+ "rmweb:" + i, SubClusterState.SC_RUNNING, 0, "");
+ subClusterInfos.put(id, cluster);
+
+ // Cluster metrics info
+ ClusterMetricsInfo metricsInfo = new ClusterMetricsInfo();
+ metricsInfo.setAppsPending(2000);
+ if (!clusterInfos.containsKey(id)) {
+ clusterInfos.put(id, new HashMap<Class, Object>());
+ }
+ clusterInfos.get(id).put(ClusterMetricsInfo.class, metricsInfo);
+
+ schedulerInfos.put(id, sti1);
+ }
+
+ // Change one of the sub cluster schedulers
+ schedulerInfos.put(subClusterIds.get(0), sti2);
+
+ stateStore = mock(FederationStateStore.class);
+ when(stateStore.getSubClusters((GetSubClustersInfoRequest) any()))
+ .thenReturn(GetSubClustersInfoResponse.newInstance(
+ new ArrayList<SubClusterInfo>(subClusterInfos.values())));
+ facade.reinitialize(stateStore, conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ stateStore.close();
+ stateStore = null;
+ }
+
+ private <T> T readJSON(String pathname, Class<T> classy)
+ throws IOException, JAXBException {
+
+ JSONJAXBContext jc =
+ new JSONJAXBContext(JSONConfiguration.mapped().build(), classy);
+ JSONUnmarshaller unmarshaller = jc.createJSONUnmarshaller();
+ String contents = new String(Files.readAllBytes(Paths.get(pathname)));
+ return unmarshaller.unmarshalFromJSON(new StringReader(contents), classy);
+
+ }
+
+ @Test
+ public void testPolicyGenerator() throws YarnException {
+ policyGenerator = new TestablePolicyGenerator();
+ policyGenerator.policy = mock(GlobalPolicy.class);
+ policyGenerator.run();
+ verify(policyGenerator.policy, times(1))
+ .updatePolicy("default", clusterInfos, null);
+ verify(policyGenerator.policy, times(1))
+ .updatePolicy("default2", clusterInfos, null);
+ }
+
+ @Test
+ public void testBlacklist() throws YarnException {
+ conf.set(YarnConfiguration.GPG_POLICY_GENERATOR_BLACKLIST,
+ subClusterIds.get(0).toString());
+ Map<SubClusterId, Map<Class, Object>> blacklistedCMI =
+ new HashMap<>(clusterInfos);
+ blacklistedCMI.remove(subClusterIds.get(0));
+ policyGenerator = new TestablePolicyGenerator();
+ policyGenerator.policy = mock(GlobalPolicy.class);
+ policyGenerator.run();
+ verify(policyGenerator.policy, times(1))
+ .updatePolicy("default", blacklistedCMI, null);
+ verify(policyGenerator.policy, times(0))
+ .updatePolicy("default", clusterInfos, null);
+ }
+
+ @Test
+ public void testBlacklistTwo() throws YarnException {
+ conf.set(YarnConfiguration.GPG_POLICY_GENERATOR_BLACKLIST,
+ subClusterIds.get(0).toString() + "," + subClusterIds.get(1)
+ .toString());
+ Map<SubClusterId, Map<Class, Object>> blacklistedCMI =
+ new HashMap<>(clusterInfos);
+ blacklistedCMI.remove(subClusterIds.get(0));
+ blacklistedCMI.remove(subClusterIds.get(1));
+ policyGenerator = new TestablePolicyGenerator();
+ policyGenerator.policy = mock(GlobalPolicy.class);
+ policyGenerator.run();
+ verify(policyGenerator.policy, times(1))
+ .updatePolicy("default", blacklistedCMI, null);
+ verify(policyGenerator.policy, times(0))
+ .updatePolicy("default", clusterInfos, null);
+ }
+
+ @Test
+ public void testExistingPolicy() throws YarnException {
+ WeightedLocalityPolicyManager manager = new WeightedLocalityPolicyManager();
+ // Add a test policy for test queue
+ manager.setQueue("default");
+ manager.getWeightedPolicyInfo().setAMRMPolicyWeights(GPGUtils
+ .createUniformWeights(new HashSet<SubClusterId>(subClusterIds)));
+ manager.getWeightedPolicyInfo().setRouterPolicyWeights(GPGUtils
+ .createUniformWeights(new HashSet<SubClusterId>(subClusterIds)));
+ SubClusterPolicyConfiguration testConf = manager.serializeConf();
+ when(stateStore.getPolicyConfiguration(
+ GetSubClusterPolicyConfigurationRequest.newInstance("default")))
+ .thenReturn(
+ GetSubClusterPolicyConfigurationResponse.newInstance(testConf));
+
+ policyGenerator = new TestablePolicyGenerator();
+ policyGenerator.policy = mock(GlobalPolicy.class);
+ policyGenerator.run();
+
+ ArgumentCaptor<FederationPolicyManager> argCaptor =
+ ArgumentCaptor.forClass(FederationPolicyManager.class);
+ verify(policyGenerator.policy, times(1))
+ .updatePolicy(eq("default"), eq(clusterInfos), argCaptor.capture());
+ assertEquals(argCaptor.getValue().getClass(), manager.getClass());
+ assertEquals(argCaptor.getValue().serializeConf(), manager.serializeConf());
+ }
+
+ @Test
+ public void testCallRM() {
+
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+
+ final String a = CapacitySchedulerConfiguration.ROOT + ".a";
+ final String b = CapacitySchedulerConfiguration.ROOT + ".b";
+ final String a1 = a + ".a1";
+ final String a2 = a + ".a2";
+ final String b1 = b + ".b1";
+ final String b2 = b + ".b2";
+ final String b3 = b + ".b3";
+ float aCapacity = 10.5f;
+ float bCapacity = 89.5f;
+ float a1Capacity = 30;
+ float a2Capacity = 70;
+ float b1Capacity = 79.2f;
+ float b2Capacity = 0.8f;
+ float b3Capacity = 20;
+
+ // Define top-level queues
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+ new String[] {"a", "b"});
+
+ csConf.setCapacity(a, aCapacity);
+ csConf.setCapacity(b, bCapacity);
+
+ // Define 2nd-level queues
+ csConf.setQueues(a, new String[] {"a1", "a2"});
+ csConf.setCapacity(a1, a1Capacity);
+ csConf.setUserLimitFactor(a1, 100.0f);
+ csConf.setCapacity(a2, a2Capacity);
+ csConf.setUserLimitFactor(a2, 100.0f);
+
+ csConf.setQueues(b, new String[] {"b1", "b2", "b3"});
+ csConf.setCapacity(b1, b1Capacity);
+ csConf.setUserLimitFactor(b1, 100.0f);
+ csConf.setCapacity(b2, b2Capacity);
+ csConf.setUserLimitFactor(b2, 100.0f);
+ csConf.setCapacity(b3, b3Capacity);
+ csConf.setUserLimitFactor(b3, 100.0f);
+
+ YarnConfiguration rmConf = new YarnConfiguration(csConf);
+
+ ResourceManager resourceManager = new ResourceManager();
+ rmConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ resourceManager.init(rmConf);
+ resourceManager.start();
+
+ String rmAddress = WebAppUtils.getRMWebAppURLWithScheme(this.conf);
+ SchedulerTypeInfo sti = GPGUtils
+ .invokeRMWebService(conf, rmAddress, RMWSConsts.SCHEDULER,
+ SchedulerTypeInfo.class);
+
+ Assert.assertNotNull(sti);
+ }
+
+ /**
+ * Testable policy generator overrides the methods that communicate
+ * with the RM REST endpoint, allowing us to inject faked responses.
+ */
+ class TestablePolicyGenerator extends PolicyGenerator {
+
+ TestablePolicyGenerator() {
+ super(conf, gpgContext);
+ }
+
+ @Override
+ protected Map<SubClusterId, Map<Class, Object>> getInfos(
+ Map<SubClusterId, SubClusterInfo> activeSubClusters) {
+ Map<SubClusterId, Map<Class, Object>> ret = new HashMap<>();
+ for (SubClusterId id : activeSubClusters.keySet()) {
+ if (!ret.containsKey(id)) {
+ ret.put(id, new HashMap<Class, Object>());
+ }
+ ret.get(id).put(ClusterMetricsInfo.class,
+ clusterInfos.get(id).get(ClusterMetricsInfo.class));
+ }
+ return ret;
+ }
+
+ @Override
+ protected Map<SubClusterId, SchedulerInfo> getSchedulerInfo(
+ Map<SubClusterId, SubClusterInfo> activeSubClusters) {
+ Map<SubClusterId, SchedulerInfo> ret =
+ new HashMap<SubClusterId, SchedulerInfo>();
+ for (SubClusterId id : activeSubClusters.keySet()) {
+ ret.put(id, schedulerInfos.get(id));
+ }
+ return ret;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo1.json
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo1.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo1.json
new file mode 100644
index 0000000..3ad4594
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo1.json
@@ -0,0 +1,134 @@
+{
+ "capacity": 100.0,
+ "usedCapacity": 0.0,
+ "maxCapacity": 100.0,
+ "queueName": "root",
+ "queues": {
+ "queue": [
+ {
+ "type": "capacitySchedulerLeafQueueInfo",
+ "capacity": 100.0,
+ "usedCapacity": 0.0,
+ "maxCapacity": 100.0,
+ "absoluteCapacity": 100.0,
+ "absoluteMaxCapacity": 100.0,
+ "absoluteUsedCapacity": 0.0,
+ "numApplications": 484,
+ "queueName": "default",
+ "state": "RUNNING",
+ "resourcesUsed": {
+ "memory": 0,
+ "vCores": 0
+ },
+ "hideReservationQueues": false,
+ "nodeLabels": [
+ "*"
+ ],
+ "numActiveApplications": 484,
+ "numPendingApplications": 0,
+ "numContainers": 0,
+ "maxApplications": 10000,
+ "maxApplicationsPerUser": 10000,
+ "userLimit": 100,
+ "users": {
+ "user": [
+ {
+ "username": "Default",
+ "resourcesUsed": {
+ "memory": 0,
+ "vCores": 0
+ },
+ "numPendingApplications": 0,
+ "numActiveApplications": 468,
+ "AMResourceUsed": {
+ "memory": 30191616,
+ "vCores": 468
+ },
+ "userResourceLimit": {
+ "memory": 31490048,
+ "vCores": 7612
+ }
+ }
+ ]
+ },
+ "userLimitFactor": 1.0,
+ "AMResourceLimit": {
+ "memory": 31490048,
+ "vCores": 7612
+ },
+ "usedAMResource": {
+ "memory": 30388224,
+ "vCores": 532
+ },
+ "userAMResourceLimit": {
+ "memory": 31490048,
+ "vCores": 7612
+ },
+ "preemptionDisabled": true
+ }
+ ]
+ },
+ "health": {
+ "lastrun": 1517951638085,
+ "operationsInfo": {
+ "entry": {
+ "key": "last-allocation",
+ "value": {
+ "nodeId": "node0:0",
+ "containerId": "container_e61477_1517922128312_0340_01_000001",
+ "queue": "root.default"
+ }
+ },
+ "entry": {
+ "key": "last-reservation",
+ "value": {
+ "nodeId": "node0:1",
+ "containerId": "container_e61477_1517879828320_0249_01_000001",
+ "queue": "root.default"
+ }
+ },
+ "entry": {
+ "key": "last-release",
+ "value": {
+ "nodeId": "node0:2",
+ "containerId": "container_e61477_1517922128312_0340_01_000001",
+ "queue": "root.default"
+ }
+ },
+ "entry": {
+ "key": "last-preemption",
+ "value": {
+ "nodeId": "N/A",
+ "containerId": "N/A",
+ "queue": "N/A"
+ }
+ }
+ },
+ "lastRunDetails": [
+ {
+ "operation": "releases",
+ "count": 0,
+ "resources": {
+ "memory": 0,
+ "vCores": 0
+ }
+ },
+ {
+ "operation": "allocations",
+ "count": 0,
+ "resources": {
+ "memory": 0,
+ "vCores": 0
+ }
+ },
+ {
+ "operation": "reservations",
+ "count": 0,
+ "resources": {
+ "memory": 0,
+ "vCores": 0
+ }
+ }
+ ]
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org