You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bo...@apache.org on 2018/08/02 17:18:13 UTC

[50/50] [abbrv] hadoop git commit: YARN-7707. [GPG] Policy generator framework. Contributed by Young Chen

YARN-7707. [GPG] Policy generator framework. Contributed by Young Chen


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6800cf70
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6800cf70
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6800cf70

Branch: refs/heads/YARN-7402
Commit: 6800cf7015d81cc0085ad0f9159e246842e72187
Parents: f833e1b
Author: Botong Huang <bo...@apache.org>
Authored: Fri Mar 23 17:07:10 2018 -0700
Committer: Botong Huang <bo...@apache.org>
Committed: Thu Aug 2 09:59:48 2018 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  36 +-
 .../src/main/resources/yarn-default.xml         |  40 +++
 .../utils/FederationStateStoreFacade.java       |  13 +
 .../pom.xml                                     |  18 +
 .../globalpolicygenerator/GPGContext.java       |   4 +
 .../globalpolicygenerator/GPGContextImpl.java   |  10 +
 .../globalpolicygenerator/GPGPolicyFacade.java  | 220 ++++++++++++
 .../server/globalpolicygenerator/GPGUtils.java  |  80 +++++
 .../GlobalPolicyGenerator.java                  |  17 +
 .../policygenerator/GlobalPolicy.java           |  76 +++++
 .../policygenerator/NoOpGlobalPolicy.java       |  36 ++
 .../policygenerator/PolicyGenerator.java        | 261 ++++++++++++++
 .../UniformWeightedLocalityGlobalPolicy.java    |  71 ++++
 .../policygenerator/package-info.java           |  24 ++
 .../TestGPGPolicyFacade.java                    | 202 +++++++++++
 .../policygenerator/TestPolicyGenerator.java    | 338 +++++++++++++++++++
 .../src/test/resources/schedulerInfo1.json      | 134 ++++++++
 .../src/test/resources/schedulerInfo2.json      | 196 +++++++++++
 18 files changed, 1775 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index ec88411..61535fc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3342,7 +3342,7 @@ public class YarnConfiguration extends Configuration {
   public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED =
       false;
 
-  private static final String FEDERATION_GPG_PREFIX =
+  public static final String FEDERATION_GPG_PREFIX =
       FEDERATION_PREFIX + "gpg.";
 
   // The number of threads to use for the GPG scheduled executor service
@@ -3360,6 +3360,40 @@ public class YarnConfiguration extends Configuration {
       FEDERATION_GPG_PREFIX + "subcluster.heartbeat.expiration-ms";
   public static final long DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS = 1800000;
 
+  public static final String FEDERATION_GPG_POLICY_PREFIX =
+      FEDERATION_GPG_PREFIX + "policy.generator.";
+
+  /** The interval at which the policy generator runs, default is one hour. */
+  public static final String GPG_POLICY_GENERATOR_INTERVAL_MS =
+      FEDERATION_GPG_POLICY_PREFIX + "interval-ms";
+  public static final long DEFAULT_GPG_POLICY_GENERATOR_INTERVAL_MS = -1;
+
+  /**
+   * The configured policy generator class, runs NoOpGlobalPolicy by
+   * default.
+   */
+  public static final String GPG_GLOBAL_POLICY_CLASS =
+      FEDERATION_GPG_POLICY_PREFIX + "class";
+  public static final String DEFAULT_GPG_GLOBAL_POLICY_CLASS =
+      "org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator."
+          + "NoOpGlobalPolicy";
+
+  /**
+   * Whether or not the policy generator is running in read only (won't modify
+   * policies), default is false.
+   */
+  public static final String GPG_POLICY_GENERATOR_READONLY =
+      FEDERATION_GPG_POLICY_PREFIX + "readonly";
+  public static final boolean DEFAULT_GPG_POLICY_GENERATOR_READONLY =
+      false;
+
+  /**
+   * Which sub-clusters the policy generator should blacklist.
+   */
+  public static final String GPG_POLICY_GENERATOR_BLACKLIST =
+      FEDERATION_GPG_POLICY_PREFIX + "blacklist";
+
+
   ////////////////////////////////
   // Other Configs
   ////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 66493f3..755f3e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3557,6 +3557,46 @@
 
   <property>
     <description>
+      The interval at which the policy generator runs, default is one hour
+    </description>
+    <name>yarn.federation.gpg.policy.generator.interval-ms</name>
+    <value>3600000</value>
+  </property>
+
+  <property>
+    <description>
+      The configured policy generator class, runs NoOpGlobalPolicy by default
+    </description>
+    <name>yarn.federation.gpg.policy.generator.class</name>
+    <value>org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.NoOpGlobalPolicy</value>
+  </property>
+
+  <property>
+    <description>
+      Whether or not the policy generator is running in read only (won't modify policies), default is false
+    </description>
+    <name>yarn.federation.gpg.policy.generator.readonly</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>
+      Whether or not the policy generator is running in read only (won't modify policies), default is false
+    </description>
+    <name>yarn.federation.gpg.policy.generator.readonly</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>
+      Which subclusters the gpg should blacklist, default is none
+    </description>
+    <name>yarn.federation.gpg.policy.generator.blacklist</name>
+    <value></value>
+  </property>
+
+  <property>
+    <description>
        It is TimelineClient 1.5 configuration whether to store active
        application’s timeline data with in user directory i.e
        ${yarn.timeline-service.entity-group-fs-store.active-dir}/${user.name}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index 4c3bed0..25a9e52 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolic
 import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
 import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -373,6 +374,18 @@ public final class FederationStateStoreFacade {
   }
 
   /**
+   * Set a policy configuration into the state store.
+   *
+   * @param policyConf the policy configuration to set
+   * @throws YarnException if the request is invalid/fails
+   */
+  public void setPolicyConfiguration(SubClusterPolicyConfiguration policyConf)
+      throws YarnException {
+    stateStore.setPolicyConfiguration(
+        SetSubClusterPolicyConfigurationRequest.newInstance(policyConf));
+  }
+
+  /**
    * Adds the home {@link SubClusterId} for the specified {@link ApplicationId}.
    *
    * @param appHomeSubCluster the mapping of the application to it's home

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml
index 9bbb936..9398b0b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml
@@ -63,6 +63,12 @@
 
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
     </dependency>
 
@@ -73,6 +79,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-server-common</artifactId>
       <type>test-jar</type>
@@ -92,6 +104,12 @@
       <plugin>
         <groupId>org.apache.rat</groupId>
         <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>src/test/resources/schedulerInfo1.json</exclude>
+            <exclude>src/test/resources/schedulerInfo2.json</exclude>
+          </excludes>
+        </configuration>
       </plugin>
     </plugins>
   </build>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java
index da8a383..6b0a5a4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java
@@ -28,4 +28,8 @@ public interface GPGContext {
   FederationStateStoreFacade getStateStoreFacade();
 
   void setStateStoreFacade(FederationStateStoreFacade facade);
+
+  GPGPolicyFacade getPolicyFacade();
+
+  void setPolicyFacade(GPGPolicyFacade facade);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java
index 3884ace..bb49844 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade
 public class GPGContextImpl implements GPGContext {
 
   private FederationStateStoreFacade facade;
+  private GPGPolicyFacade policyFacade;
 
   @Override
   public FederationStateStoreFacade getStateStoreFacade() {
@@ -38,4 +39,13 @@ public class GPGContextImpl implements GPGContext {
     this.facade = federationStateStoreFacade;
   }
 
+  @Override
+  public GPGPolicyFacade getPolicyFacade(){
+    return policyFacade;
+  }
+
+  @Override
+  public void setPolicyFacade(GPGPolicyFacade gpgPolicyfacade){
+    policyFacade = gpgPolicyfacade;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java
new file mode 100644
index 0000000..4c61a14
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
+import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A utility class for the GPG Policy Generator to read and write policies
+ * into the FederationStateStore. Policy specific logic is abstracted away in
+ * this class, so the PolicyGenerator can avoid dealing with policy
+ * construction, reinitialization, and serialization.
+ *
+ * There are only two exposed methods:
+ *
+ * {@link #getPolicyManager(String)}
+ * Gets the PolicyManager via queue name. Null if there is no policy
+ * configured for the specified queue. The PolicyManager can be used to
+ * extract the {@link FederationRouterPolicy} and
+ * {@link FederationAMRMProxyPolicy}, as well as any policy specific parameters
+ *
+ * {@link #setPolicyManager(FederationPolicyManager)}
+ * Sets the PolicyManager. If the policy configuration is the same, no change
+ * occurs. Otherwise, the internal cache is updated and the new configuration
+ * is written into the FederationStateStore
+ *
+ * This class assumes that the GPG is the only service
+ * writing policies. Thus, the only FederationStateStore reads occur the first
+ * time a queue policy is retrieved - after that, the GPG only writes to the
+ * FederationStateStore.
+ *
+ * The class uses a PolicyManager cache and a SubClusterPolicyConfiguration
+ * cache. The primary use for these caches are to serve reads, and to
+ * identify when the PolicyGenerator has actually changed the policy
+ * so unnecessary FederationStateStore policy writes can be avoided.
+ */
+
+public class GPGPolicyFacade {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(GPGPolicyFacade.class);
+
+  private FederationStateStoreFacade stateStore;
+
+  private Map<String, FederationPolicyManager> policyManagerMap;
+  private Map<String, SubClusterPolicyConfiguration> policyConfMap;
+
+  private boolean readOnly;
+
+  public GPGPolicyFacade(FederationStateStoreFacade stateStore,
+      Configuration conf) {
+    this.stateStore = stateStore;
+    this.policyManagerMap = new HashMap<>();
+    this.policyConfMap = new HashMap<>();
+    this.readOnly =
+        conf.getBoolean(YarnConfiguration.GPG_POLICY_GENERATOR_READONLY,
+            YarnConfiguration.DEFAULT_GPG_POLICY_GENERATOR_READONLY);
+  }
+
+  /**
+   * Provides a utility for the policy generator to read the policy manager
+   * from the FederationStateStore. Because the policy generator should be the
+   * only component updating the policy, this implementation does not use the
+   * reinitialization feature.
+   *
+   * @param queueName the name of the queue we want the policy manager for.
+   * @return the policy manager responsible for the queue policy.
+   */
+  public FederationPolicyManager getPolicyManager(String queueName)
+      throws YarnException {
+    FederationPolicyManager policyManager = policyManagerMap.get(queueName);
+    // If we don't have the policy manager cached, pull configuration
+    // from the FederationStateStore to create and cache it
+    if (policyManager == null) {
+      try {
+        // If we don't have the configuration cached, pull it
+        // from the stateStore
+        SubClusterPolicyConfiguration conf = policyConfMap.get(queueName);
+        if (conf == null) {
+          conf = stateStore.getPolicyConfiguration(queueName);
+        }
+        // If configuration is still null, it does not exist in the
+        // FederationStateStore
+        if (conf == null) {
+          LOG.info("Read null policy for queue {}", queueName);
+          return null;
+        }
+        policyManager =
+            FederationPolicyUtils.instantiatePolicyManager(conf.getType());
+        policyManager.setQueue(queueName);
+
+        // TODO there is currently no way to cleanly deserialize a policy
+        // manager sub type from just the configuration
+        if (policyManager instanceof WeightedLocalityPolicyManager) {
+          WeightedPolicyInfo wpinfo =
+              WeightedPolicyInfo.fromByteBuffer(conf.getParams());
+          WeightedLocalityPolicyManager wlpmanager =
+              (WeightedLocalityPolicyManager) policyManager;
+          LOG.info("Updating policy for queue {} to configured weights router: "
+                  + "{}, amrmproxy: {}", queueName,
+              wpinfo.getRouterPolicyWeights(),
+              wpinfo.getAMRMPolicyWeights());
+          wlpmanager.setWeightedPolicyInfo(wpinfo);
+        } else {
+          LOG.warn("Warning: FederationPolicyManager of unsupported type {}, "
+              + "initialization may be incomplete ", policyManager.getClass());
+        }
+
+        policyManagerMap.put(queueName, policyManager);
+        policyConfMap.put(queueName, conf);
+      } catch (YarnException e) {
+        LOG.error("Error reading SubClusterPolicyConfiguration from state "
+            + "store for queue: {}", queueName);
+        throw e;
+      }
+    }
+    return policyManager;
+  }
+
+  /**
+   * Provides a utility for the policy generator to write a policy manager
+   * into the FederationStateStore. The facade keeps a cache and will only write
+   * into the FederationStateStore if the policy configuration has changed.
+   *
+   * @param policyManager The policy manager we want to update into the state
+   *                      store. It contains policy information as well as
+   *                      the queue name we will update for.
+   */
+  public void setPolicyManager(FederationPolicyManager policyManager)
+      throws YarnException {
+    if (policyManager == null) {
+      LOG.warn("Attempting to set null policy manager");
+      return;
+    }
+    // Extract the configuration from the policy manager
+    String queue = policyManager.getQueue();
+    SubClusterPolicyConfiguration conf;
+    try {
+      conf = policyManager.serializeConf();
+    } catch (FederationPolicyInitializationException e) {
+      LOG.warn("Error serializing policy for queue {}", queue);
+      throw e;
+    }
+    if (conf == null) {
+      // State store does not currently support setting a policy back to null
+      // because it reads the queue name to set from the policy!
+      LOG.warn("Skip setting policy to null for queue {} into state store",
+          queue);
+      return;
+    }
+    // Compare with configuration cache, if different, write the conf into
+    // store and update our conf and manager cache
+    if (!confCacheEqual(queue, conf)) {
+      try {
+        if (readOnly) {
+          LOG.info("[read-only] Skipping policy update for queue {}", queue);
+          return;
+        }
+        LOG.info("Updating policy for queue {} into state store", queue);
+        stateStore.setPolicyConfiguration(conf);
+        policyConfMap.put(queue, conf);
+        policyManagerMap.put(queue, policyManager);
+      } catch (YarnException e) {
+        LOG.warn("Error writing SubClusterPolicyConfiguration to state "
+            + "store for queue: {}", queue);
+        throw e;
+      }
+    } else {
+      LOG.info("Setting unchanged policy - state store write skipped");
+    }
+  }
+
+  /**
+   * @param queue the queue to check the cached policy configuration for
+   * @param conf the new policy configuration
+   * @return whether or not the conf is equal to the cached conf
+   */
+  private boolean confCacheEqual(String queue,
+      SubClusterPolicyConfiguration conf) {
+    SubClusterPolicyConfiguration cachedConf = policyConfMap.get(queue);
+    if (conf == null && cachedConf == null) {
+      return true;
+    } else if (conf != null && cachedConf != null) {
+      if (conf.equals(cachedConf)) {
+        return true;
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
new file mode 100644
index 0000000..429bec4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+
+/**
+ * GPGUtils contains utility functions for the GPG.
+ *
+ */
+public final class GPGUtils {
+
+  // hide constructor
+  private GPGUtils() {
+  }
+
+  /**
+   * Performs an invocation of the the remote RMWebService.
+   */
+  public static <T> T invokeRMWebService(Configuration conf, String webAddr,
+      String path, final Class<T> returnType) {
+    Client client = Client.create();
+    T obj = null;
+
+    WebResource webResource = client.resource(webAddr);
+    ClientResponse response = webResource.path("ws/v1/cluster").path(path)
+        .accept(MediaType.APPLICATION_XML).get(ClientResponse.class);
+    if (response.getStatus() == HttpServletResponse.SC_OK) {
+      obj = response.getEntity(returnType);
+    } else {
+      throw new YarnRuntimeException("Bad response from remote web service: "
+          + response.getStatus());
+    }
+    return obj;
+  }
+
+  /**
+   * Creates a uniform weighting of 1.0 for each sub cluster.
+   */
+  public static Map<SubClusterIdInfo, Float> createUniformWeights(
+      Set<SubClusterId> ids) {
+    Map<SubClusterIdInfo, Float> weights =
+        new HashMap<>();
+    for(SubClusterId id : ids) {
+      weights.put(new SubClusterIdInfo(id), 1.0f);
+    }
+    return weights;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
index f6cfba0..88b9f2b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator;
 import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,6 +63,7 @@ public class GlobalPolicyGenerator extends CompositeService {
   // Scheduler service that runs tasks periodically
   private ScheduledThreadPoolExecutor scheduledExecutorService;
   private SubClusterCleaner subClusterCleaner;
+  private PolicyGenerator policyGenerator;
 
   public GlobalPolicyGenerator() {
     super(GlobalPolicyGenerator.class.getName());
@@ -73,11 +75,15 @@ public class GlobalPolicyGenerator extends CompositeService {
     // Set up the context
     this.gpgContext
         .setStateStoreFacade(FederationStateStoreFacade.getInstance());
+    this.gpgContext
+        .setPolicyFacade(new GPGPolicyFacade(
+            this.gpgContext.getStateStoreFacade(), conf));
 
     this.scheduledExecutorService = new ScheduledThreadPoolExecutor(
         conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
             YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS));
     this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext);
+    this.policyGenerator = new PolicyGenerator(conf, this.gpgContext);
 
     DefaultMetricsSystem.initialize(METRICS_NAME);
 
@@ -99,6 +105,17 @@ public class GlobalPolicyGenerator extends CompositeService {
       LOG.info("Scheduled sub-cluster cleaner with interval: {}",
           DurationFormatUtils.formatDurationISO(scCleanerIntervalMs));
     }
+
+    // Schedule PolicyGenerator
+    long policyGeneratorIntervalMillis = getConfig().getLong(
+        YarnConfiguration.GPG_POLICY_GENERATOR_INTERVAL_MS,
+        YarnConfiguration.DEFAULT_GPG_POLICY_GENERATOR_INTERVAL_MS);
+    if(policyGeneratorIntervalMillis > 0){
+      this.scheduledExecutorService.scheduleAtFixedRate(this.policyGenerator,
+          0, policyGeneratorIntervalMillis, TimeUnit.MILLISECONDS);
+      LOG.info("Scheduled policygenerator with interval: {}",
+          DurationFormatUtils.formatDurationISO(policyGeneratorIntervalMillis));
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java
new file mode 100644
index 0000000..38d762d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * This interface defines the plug-able policy that the PolicyGenerator uses
+ * to update policies into the state store.
+ */
+
+public abstract class GlobalPolicy implements Configurable {
+
+  private Configuration conf;
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Return a map of the object type and RM path to request it from - the
+   * framework will query these paths and provide the objects to the policy.
+   * Delegating this responsibility to the PolicyGenerator enables us to avoid
+   * duplicate calls to the same * endpoints as the GlobalPolicy is invoked
+   * once per queue.
+   */
+  protected Map<Class, String> registerPaths() {
+    // Default register nothing
+    return Collections.emptyMap();
+  }
+
+  /**
+   * Given a queue, cluster metrics, and policy manager, update the policy
+   * to account for the cluster status. This method defines the policy generator
+   * behavior.
+   *
+   * @param queueName   name of the queue
+   * @param clusterInfo subClusterId map to cluster information about the
+   *                    SubCluster used to make policy decisions
+   * @param manager     the FederationPolicyManager for the queue's existing
+   *                    policy the manager may be null, in which case the policy
+   *                    will need to be created
+   * @return policy manager that handles the updated (or created) policy
+   */
+  protected abstract FederationPolicyManager updatePolicy(String queueName,
+      Map<SubClusterId, Map<Class, Object>> clusterInfo,
+      FederationPolicyManager manager);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/NoOpGlobalPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/NoOpGlobalPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/NoOpGlobalPolicy.java
new file mode 100644
index 0000000..c2d578f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/NoOpGlobalPolicy.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+
+import java.util.Map;
+
+/**
+ * Default policy that does not update any policy configurations.
+ */
+public class NoOpGlobalPolicy extends GlobalPolicy{
+
+  @Override
+  public FederationPolicyManager updatePolicy(String queueName,
+      Map<SubClusterId, Map<Class, Object>> clusterInfo,
+      FederationPolicyManager manager) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
new file mode 100644
index 0000000..5681ff0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The PolicyGenerator runs periodically and updates the policy configuration
+ * for each queue into the FederationStateStore. The policy update behavior is
+ * defined by the GlobalPolicy instance that is used.
+ */
+
+public class PolicyGenerator implements Runnable, Configurable {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PolicyGenerator.class);
+
+  private GPGContext gpgContext;
+  private Configuration conf;
+
+  // Information request map
+  private Map<Class, String> pathMap = new HashMap<>();
+
+  // Global policy instance
+  @VisibleForTesting
+  protected GlobalPolicy policy;
+
+  /**
+   * The PolicyGenerator periodically reads SubCluster load and updates
+   * policies into the FederationStateStore.
+   */
+  public PolicyGenerator(Configuration conf, GPGContext context) {
+    setConf(conf);
+    init(context);
+  }
+
+  private void init(GPGContext context) {
+    this.gpgContext = context;
+    LOG.info("Initialized PolicyGenerator");
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    this.policy = FederationStateStoreFacade
+        .createInstance(conf, YarnConfiguration.GPG_GLOBAL_POLICY_CLASS,
+            YarnConfiguration.DEFAULT_GPG_GLOBAL_POLICY_CLASS,
+            GlobalPolicy.class);
+    policy.setConf(conf);
+    pathMap.putAll(policy.registerPaths());
+  }
+
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  @Override
+  public final void run() {
+    Map<SubClusterId, SubClusterInfo> activeSubClusters;
+    try {
+      activeSubClusters = gpgContext.getStateStoreFacade().getSubClusters(true);
+    } catch (YarnException e) {
+      LOG.error("Error retrieving active sub-clusters", e);
+      return;
+    }
+
+    // Parse the scheduler information from all the SCs
+    Map<SubClusterId, SchedulerInfo> schedInfo =
+        getSchedulerInfo(activeSubClusters);
+
+    // Extract and enforce that all the schedulers have matching type
+    Set<String> queueNames = extractQueues(schedInfo);
+
+    // Remove black listed SubClusters
+    activeSubClusters.keySet().removeAll(getBlackList());
+    LOG.info("Active non-blacklist sub-clusters: {}",
+        activeSubClusters.keySet());
+
+    // Get cluster metrics information from non black listed RMs - later used
+    // to evaluate SubCluster load
+    Map<SubClusterId, Map<Class, Object>> clusterInfo =
+        getInfos(activeSubClusters);
+
+    // Update into the FederationStateStore
+    for (String queueName : queueNames) {
+      // Retrieve the manager from the policy facade
+      FederationPolicyManager manager;
+      try {
+        manager = this.gpgContext.getPolicyFacade().getPolicyManager(queueName);
+      } catch (YarnException e) {
+        LOG.error("GetPolicy for queue {} failed", queueName, e);
+        continue;
+      }
+      LOG.info("Updating policy for queue {}", queueName);
+      manager = policy.updatePolicy(queueName, clusterInfo, manager);
+      try {
+        this.gpgContext.getPolicyFacade().setPolicyManager(manager);
+      } catch (YarnException e) {
+        LOG.error("SetPolicy for queue {} failed", queueName, e);
+      }
+    }
+  }
+
+  /**
+   * Helper to retrieve metrics from the RM REST endpoints.
+   *
+   * @param activeSubClusters A map of active SubCluster IDs to info
+   */
+  @VisibleForTesting
+  protected Map<SubClusterId, Map<Class, Object>> getInfos(
+      Map<SubClusterId, SubClusterInfo> activeSubClusters) {
+
+    Map<SubClusterId, Map<Class, Object>> clusterInfo = new HashMap<>();
+    for (SubClusterInfo sci : activeSubClusters.values()) {
+      for (Map.Entry<Class, String> e : this.pathMap.entrySet()) {
+        if (!clusterInfo.containsKey(sci.getSubClusterId())) {
+          clusterInfo.put(sci.getSubClusterId(), new HashMap<Class, Object>());
+        }
+        Object ret = GPGUtils
+            .invokeRMWebService(conf, sci.getRMWebServiceAddress(),
+                e.getValue(), e.getKey());
+        clusterInfo.get(sci.getSubClusterId()).put(e.getKey(), ret);
+      }
+    }
+
+    return clusterInfo;
+  }
+
+  /**
+   * Helper to retrieve SchedulerInfos.
+   *
+   * @param activeSubClusters A map of active SubCluster IDs to info
+   */
+  @VisibleForTesting
+  protected Map<SubClusterId, SchedulerInfo> getSchedulerInfo(
+      Map<SubClusterId, SubClusterInfo> activeSubClusters) {
+    Map<SubClusterId, SchedulerInfo> schedInfo =
+        new HashMap<>();
+    for (SubClusterInfo sci : activeSubClusters.values()) {
+      SchedulerTypeInfo sti = GPGUtils
+          .invokeRMWebService(conf, sci.getRMWebServiceAddress(),
+              RMWSConsts.SCHEDULER, SchedulerTypeInfo.class);
+      if(sti != null){
+        schedInfo.put(sci.getSubClusterId(), sti.getSchedulerInfo());
+      } else {
+        LOG.warn("Skipped null scheduler info from SubCluster " + sci
+            .getSubClusterId().toString());
+      }
+    }
+    return schedInfo;
+  }
+
+  /**
+   * Helper to get a set of blacklisted SubCluster Ids from configuration.
+   */
+  private Set<SubClusterId> getBlackList() {
+    String blackListParam =
+        conf.get(YarnConfiguration.GPG_POLICY_GENERATOR_BLACKLIST);
+    if(blackListParam == null){
+      return Collections.emptySet();
+    }
+    Set<SubClusterId> blackList = new HashSet<>();
+    for (String id : blackListParam.split(",")) {
+      blackList.add(SubClusterId.newInstance(id));
+    }
+    return blackList;
+  }
+
+  /**
+   * Given the scheduler information for all RMs, extract the union of
+   * queue names - right now we only consider instances of capacity scheduler.
+   *
+   * @param schedInfo the scheduler information
+   * @return a set of queue names
+   */
+  private Set<String> extractQueues(
+      Map<SubClusterId, SchedulerInfo> schedInfo) {
+    Set<String> queueNames = new HashSet<String>();
+    for (Map.Entry<SubClusterId, SchedulerInfo> entry : schedInfo.entrySet()) {
+      if (entry.getValue() instanceof CapacitySchedulerInfo) {
+        // Flatten the queue structure and get only non leaf queues
+        queueNames.addAll(flattenQueue((CapacitySchedulerInfo) entry.getValue())
+            .get(CapacitySchedulerQueueInfo.class));
+      } else {
+        LOG.warn("Skipping SubCluster {}, not configured with capacity "
+            + "scheduler", entry.getKey());
+      }
+    }
+    return queueNames;
+  }
+
+  // Helpers to flatten the queue structure into a multimap of
+  // queue type to set of queue names
+  private Map<Class, Set<String>> flattenQueue(CapacitySchedulerInfo csi) {
+    Map<Class, Set<String>> flattened = new HashMap<Class, Set<String>>();
+    addOrAppend(flattened, csi.getClass(), csi.getQueueName());
+    for (CapacitySchedulerQueueInfo csqi : csi.getQueues().getQueueInfoList()) {
+      flattenQueue(csqi, flattened);
+    }
+    return flattened;
+  }
+
+  private void flattenQueue(CapacitySchedulerQueueInfo csi,
+      Map<Class, Set<String>> flattened) {
+    addOrAppend(flattened, csi.getClass(), csi.getQueueName());
+    if (csi.getQueues() != null) {
+      for (CapacitySchedulerQueueInfo csqi : csi.getQueues()
+          .getQueueInfoList()) {
+        flattenQueue(csqi, flattened);
+      }
+    }
+  }
+
+  private <K, V> void addOrAppend(Map<K, Set<V>> multimap, K key, V value) {
+    if (!multimap.containsKey(key)) {
+      multimap.put(key, new HashSet<V>());
+    }
+    multimap.get(key).add(value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/UniformWeightedLocalityGlobalPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/UniformWeightedLocalityGlobalPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/UniformWeightedLocalityGlobalPolicy.java
new file mode 100644
index 0000000..826cb02
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/UniformWeightedLocalityGlobalPolicy.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import org.apache.commons.math3.optim.nonlinear.vector.Weight;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Simple policy that generates and updates uniform weighted locality
+ * policies.
+ */
+public class UniformWeightedLocalityGlobalPolicy extends GlobalPolicy{
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(UniformWeightedLocalityGlobalPolicy.class);
+
+  @Override
+  protected FederationPolicyManager updatePolicy(String queueName,
+      Map<SubClusterId, Map<Class, Object>> clusterInfo,
+      FederationPolicyManager currentManager){
+    if(currentManager == null){
+      // Set uniform weights for all SubClusters
+      LOG.info("Creating uniform weighted policy queue {}", queueName);
+      WeightedLocalityPolicyManager manager =
+          new WeightedLocalityPolicyManager();
+      manager.setQueue(queueName);
+      Map<SubClusterIdInfo, Float> policyWeights =
+          GPGUtils.createUniformWeights(clusterInfo.keySet());
+      manager.getWeightedPolicyInfo().setAMRMPolicyWeights(policyWeights);
+      manager.getWeightedPolicyInfo().setRouterPolicyWeights(policyWeights);
+      currentManager = manager;
+    }
+    if(currentManager instanceof WeightedLocalityPolicyManager){
+      LOG.info("Updating policy for queue {} to default weights", queueName);
+      WeightedLocalityPolicyManager wlpmanager =
+          (WeightedLocalityPolicyManager) currentManager;
+      wlpmanager.getWeightedPolicyInfo().setAMRMPolicyWeights(
+          GPGUtils.createUniformWeights(clusterInfo.keySet()));
+      wlpmanager.getWeightedPolicyInfo().setRouterPolicyWeights(
+          GPGUtils.createUniformWeights(clusterInfo.keySet()));
+    } else {
+      LOG.info("Policy for queue {} is of type {}, expected {}",
+          queueName, currentManager.getClass(), Weight.class);
+    }
+    return currentManager;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/package-info.java
new file mode 100644
index 0000000..e8ff436
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/package-info.java
@@ -0,0 +1,24 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Classes comprising the policy generator for the GPG. Responsibilities include
+ * generating and updating policies based on the cluster status.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java
new file mode 100644
index 0000000..d78c11f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Unit test for GPG Policy Facade.
+ */
+public class TestGPGPolicyFacade {
+
+  private Configuration conf;
+  private FederationStateStore stateStore;
+  private FederationStateStoreFacade facade =
+      FederationStateStoreFacade.getInstance();
+  private GPGPolicyFacade policyFacade;
+
+  private Set<SubClusterId> subClusterIds;
+
+  private SubClusterPolicyConfiguration testConf;
+
+  private static final String TEST_QUEUE = "test-queue";
+
+  public TestGPGPolicyFacade() {
+    conf = new Configuration();
+    conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+    subClusterIds = new HashSet<>();
+    subClusterIds.add(SubClusterId.newInstance("sc0"));
+    subClusterIds.add(SubClusterId.newInstance("sc1"));
+    subClusterIds.add(SubClusterId.newInstance("sc2"));
+  }
+
+  @Before
+  public void setUp() throws IOException, YarnException {
+    stateStore = new MemoryFederationStateStore();
+    stateStore.init(conf);
+    facade.reinitialize(stateStore, conf);
+    policyFacade = new GPGPolicyFacade(facade, conf);
+    WeightedLocalityPolicyManager manager =
+        new WeightedLocalityPolicyManager();
+    // Add a test policy for test queue
+    manager.setQueue(TEST_QUEUE);
+    manager.getWeightedPolicyInfo().setAMRMPolicyWeights(
+        GPGUtils.createUniformWeights(subClusterIds));
+    manager.getWeightedPolicyInfo().setRouterPolicyWeights(
+        GPGUtils.createUniformWeights(subClusterIds));
+    testConf = manager.serializeConf();
+    stateStore.setPolicyConfiguration(SetSubClusterPolicyConfigurationRequest
+        .newInstance(testConf));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    stateStore.close();
+    stateStore = null;
+  }
+
+  @Test
+  public void testGetPolicy() throws YarnException {
+    WeightedLocalityPolicyManager manager =
+        (WeightedLocalityPolicyManager) policyFacade
+            .getPolicyManager(TEST_QUEUE);
+    Assert.assertEquals(testConf, manager.serializeConf());
+  }
+
+  /**
+   * Test that new policies are written into the state store.
+   */
+  @Test
+  public void testSetNewPolicy() throws YarnException {
+    WeightedLocalityPolicyManager manager =
+        new WeightedLocalityPolicyManager();
+    manager.setQueue(TEST_QUEUE + 0);
+    manager.getWeightedPolicyInfo().setAMRMPolicyWeights(
+        GPGUtils.createUniformWeights(subClusterIds));
+    manager.getWeightedPolicyInfo().setRouterPolicyWeights(
+        GPGUtils.createUniformWeights(subClusterIds));
+    SubClusterPolicyConfiguration policyConf = manager.serializeConf();
+    policyFacade.setPolicyManager(manager);
+
+    manager =
+        (WeightedLocalityPolicyManager) policyFacade
+            .getPolicyManager(TEST_QUEUE + 0);
+    Assert.assertEquals(policyConf, manager.serializeConf());
+  }
+
+  /**
+   * Test that overwriting policies are updated in the state store.
+   */
+  @Test
+  public void testOverwritePolicy() throws YarnException {
+    subClusterIds.add(SubClusterId.newInstance("sc3"));
+    WeightedLocalityPolicyManager manager =
+        new WeightedLocalityPolicyManager();
+    manager.setQueue(TEST_QUEUE);
+    manager.getWeightedPolicyInfo().setAMRMPolicyWeights(
+        GPGUtils.createUniformWeights(subClusterIds));
+    manager.getWeightedPolicyInfo().setRouterPolicyWeights(
+        GPGUtils.createUniformWeights(subClusterIds));
+    SubClusterPolicyConfiguration policyConf = manager.serializeConf();
+    policyFacade.setPolicyManager(manager);
+
+    manager =
+        (WeightedLocalityPolicyManager) policyFacade
+            .getPolicyManager(TEST_QUEUE);
+    Assert.assertEquals(policyConf, manager.serializeConf());
+  }
+
+  /**
+   * Test that the write through cache works.
+   */
+  @Test
+  public void testWriteCache() throws YarnException {
+    stateStore = mock(MemoryFederationStateStore.class);
+    facade.reinitialize(stateStore, conf);
+    when(stateStore.getPolicyConfiguration(Matchers.any(
+        GetSubClusterPolicyConfigurationRequest.class))).thenReturn(
+        GetSubClusterPolicyConfigurationResponse.newInstance(testConf));
+    policyFacade = new GPGPolicyFacade(facade, conf);
+
+    // Query once to fill the cache
+    FederationPolicyManager manager = policyFacade.getPolicyManager(TEST_QUEUE);
+    // State store should be contacted once
+    verify(stateStore, times(1)).getPolicyConfiguration(
+        Matchers.any(GetSubClusterPolicyConfigurationRequest.class));
+
+    // If we set the same policy, the state store should be untouched
+    policyFacade.setPolicyManager(manager);
+    verify(stateStore, times(0)).setPolicyConfiguration(
+        Matchers.any(SetSubClusterPolicyConfigurationRequest.class));
+  }
+
+  /**
+   * Test that when read only is enabled, the state store is not changed.
+   */
+  @Test
+  public void testReadOnly() throws YarnException {
+    conf.setBoolean(YarnConfiguration.GPG_POLICY_GENERATOR_READONLY, true);
+    stateStore = mock(MemoryFederationStateStore.class);
+    facade.reinitialize(stateStore, conf);
+    when(stateStore.getPolicyConfiguration(Matchers.any(
+        GetSubClusterPolicyConfigurationRequest.class))).thenReturn(
+        GetSubClusterPolicyConfigurationResponse.newInstance(testConf));
+    policyFacade = new GPGPolicyFacade(facade, conf);
+
+    // If we set a policy, the state store should be untouched
+    WeightedLocalityPolicyManager manager =
+        new WeightedLocalityPolicyManager();
+    // Add a test policy for test queue
+    manager.setQueue(TEST_QUEUE);
+    manager.getWeightedPolicyInfo().setAMRMPolicyWeights(
+        GPGUtils.createUniformWeights(subClusterIds));
+    manager.getWeightedPolicyInfo().setRouterPolicyWeights(
+        GPGUtils.createUniformWeights(subClusterIds));
+    policyFacade.setPolicyManager(manager);
+    verify(stateStore, times(0)).setPolicyConfiguration(
+        Matchers.any(SetSubClusterPolicyConfigurationRequest.class));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java
new file mode 100644
index 0000000..9d27b3b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import com.sun.jersey.api.json.JSONConfiguration;
+import com.sun.jersey.api.json.JSONJAXBContext;
+import com.sun.jersey.api.json.JSONUnmarshaller;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGPolicyFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.io.StringReader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test for GPG Policy Generator.
+ */
+public class TestPolicyGenerator {
+
+  private static final int NUM_SC = 3;
+
+  private Configuration conf;
+  private FederationStateStore stateStore;
+  private FederationStateStoreFacade facade =
+      FederationStateStoreFacade.getInstance();
+
+  private List<SubClusterId> subClusterIds;
+  private Map<SubClusterId, SubClusterInfo> subClusterInfos;
+  private Map<SubClusterId, Map<Class, Object>> clusterInfos;
+  private Map<SubClusterId, SchedulerInfo> schedulerInfos;
+
+  private GPGContext gpgContext;
+
+  private PolicyGenerator policyGenerator;
+
+  public TestPolicyGenerator() {
+    conf = new Configuration();
+    conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+
+    gpgContext = new GPGContextImpl();
+    gpgContext.setPolicyFacade(new GPGPolicyFacade(facade, conf));
+    gpgContext.setStateStoreFacade(facade);
+  }
+
+  @Before
+  public void setUp() throws IOException, YarnException, JAXBException {
+    subClusterIds = new ArrayList<>();
+    subClusterInfos = new HashMap<>();
+    clusterInfos = new HashMap<>();
+    schedulerInfos = new HashMap<>();
+
+    CapacitySchedulerInfo sti1 =
+        readJSON("src/test/resources/schedulerInfo1.json",
+            CapacitySchedulerInfo.class);
+    CapacitySchedulerInfo sti2 =
+        readJSON("src/test/resources/schedulerInfo2.json",
+            CapacitySchedulerInfo.class);
+
+    // Set up sub clusters
+    for (int i = 0; i < NUM_SC; ++i) {
+      // Sub cluster Id
+      SubClusterId id = SubClusterId.newInstance("sc" + i);
+      subClusterIds.add(id);
+
+      // Sub cluster info
+      SubClusterInfo cluster = SubClusterInfo
+          .newInstance(id, "amrm:" + i, "clientrm:" + i, "rmadmin:" + i,
+              "rmweb:" + i, SubClusterState.SC_RUNNING, 0, "");
+      subClusterInfos.put(id, cluster);
+
+      // Cluster metrics info
+      ClusterMetricsInfo metricsInfo = new ClusterMetricsInfo();
+      metricsInfo.setAppsPending(2000);
+      if (!clusterInfos.containsKey(id)) {
+        clusterInfos.put(id, new HashMap<Class, Object>());
+      }
+      clusterInfos.get(id).put(ClusterMetricsInfo.class, metricsInfo);
+
+      schedulerInfos.put(id, sti1);
+    }
+
+    // Change one of the sub cluster schedulers
+    schedulerInfos.put(subClusterIds.get(0), sti2);
+
+    stateStore = mock(FederationStateStore.class);
+    when(stateStore.getSubClusters((GetSubClustersInfoRequest) any()))
+        .thenReturn(GetSubClustersInfoResponse.newInstance(
+            new ArrayList<SubClusterInfo>(subClusterInfos.values())));
+    facade.reinitialize(stateStore, conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    stateStore.close();
+    stateStore = null;
+  }
+
+  private <T> T readJSON(String pathname, Class<T> classy)
+      throws IOException, JAXBException {
+
+    JSONJAXBContext jc =
+        new JSONJAXBContext(JSONConfiguration.mapped().build(), classy);
+    JSONUnmarshaller unmarshaller = jc.createJSONUnmarshaller();
+    String contents = new String(Files.readAllBytes(Paths.get(pathname)));
+    return unmarshaller.unmarshalFromJSON(new StringReader(contents), classy);
+
+  }
+
+  @Test
+  public void testPolicyGenerator() throws YarnException {
+    policyGenerator = new TestablePolicyGenerator();
+    policyGenerator.policy = mock(GlobalPolicy.class);
+    policyGenerator.run();
+    verify(policyGenerator.policy, times(1))
+        .updatePolicy("default", clusterInfos, null);
+    verify(policyGenerator.policy, times(1))
+        .updatePolicy("default2", clusterInfos, null);
+  }
+
+  @Test
+  public void testBlacklist() throws YarnException {
+    conf.set(YarnConfiguration.GPG_POLICY_GENERATOR_BLACKLIST,
+        subClusterIds.get(0).toString());
+    Map<SubClusterId, Map<Class, Object>> blacklistedCMI =
+        new HashMap<>(clusterInfos);
+    blacklistedCMI.remove(subClusterIds.get(0));
+    policyGenerator = new TestablePolicyGenerator();
+    policyGenerator.policy = mock(GlobalPolicy.class);
+    policyGenerator.run();
+    verify(policyGenerator.policy, times(1))
+        .updatePolicy("default", blacklistedCMI, null);
+    verify(policyGenerator.policy, times(0))
+        .updatePolicy("default", clusterInfos, null);
+  }
+
+  @Test
+  public void testBlacklistTwo() throws YarnException {
+    conf.set(YarnConfiguration.GPG_POLICY_GENERATOR_BLACKLIST,
+        subClusterIds.get(0).toString() + "," + subClusterIds.get(1)
+            .toString());
+    Map<SubClusterId, Map<Class, Object>> blacklistedCMI =
+        new HashMap<>(clusterInfos);
+    blacklistedCMI.remove(subClusterIds.get(0));
+    blacklistedCMI.remove(subClusterIds.get(1));
+    policyGenerator = new TestablePolicyGenerator();
+    policyGenerator.policy = mock(GlobalPolicy.class);
+    policyGenerator.run();
+    verify(policyGenerator.policy, times(1))
+        .updatePolicy("default", blacklistedCMI, null);
+    verify(policyGenerator.policy, times(0))
+        .updatePolicy("default", clusterInfos, null);
+  }
+
+  @Test
+  public void testExistingPolicy() throws YarnException {
+    WeightedLocalityPolicyManager manager = new WeightedLocalityPolicyManager();
+    // Add a test policy for test queue
+    manager.setQueue("default");
+    manager.getWeightedPolicyInfo().setAMRMPolicyWeights(GPGUtils
+        .createUniformWeights(new HashSet<SubClusterId>(subClusterIds)));
+    manager.getWeightedPolicyInfo().setRouterPolicyWeights(GPGUtils
+        .createUniformWeights(new HashSet<SubClusterId>(subClusterIds)));
+    SubClusterPolicyConfiguration testConf = manager.serializeConf();
+    when(stateStore.getPolicyConfiguration(
+        GetSubClusterPolicyConfigurationRequest.newInstance("default")))
+        .thenReturn(
+            GetSubClusterPolicyConfigurationResponse.newInstance(testConf));
+
+    policyGenerator = new TestablePolicyGenerator();
+    policyGenerator.policy = mock(GlobalPolicy.class);
+    policyGenerator.run();
+
+    ArgumentCaptor<FederationPolicyManager> argCaptor =
+        ArgumentCaptor.forClass(FederationPolicyManager.class);
+    verify(policyGenerator.policy, times(1))
+        .updatePolicy(eq("default"), eq(clusterInfos), argCaptor.capture());
+    assertEquals(argCaptor.getValue().getClass(), manager.getClass());
+    assertEquals(argCaptor.getValue().serializeConf(), manager.serializeConf());
+  }
+
+  @Test
+  public void testCallRM() {
+
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+
+    final String a = CapacitySchedulerConfiguration.ROOT + ".a";
+    final String b = CapacitySchedulerConfiguration.ROOT + ".b";
+    final String a1 = a + ".a1";
+    final String a2 = a + ".a2";
+    final String b1 = b + ".b1";
+    final String b2 = b + ".b2";
+    final String b3 = b + ".b3";
+    float aCapacity = 10.5f;
+    float bCapacity = 89.5f;
+    float a1Capacity = 30;
+    float a2Capacity = 70;
+    float b1Capacity = 79.2f;
+    float b2Capacity = 0.8f;
+    float b3Capacity = 20;
+
+    // Define top-level queues
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[] {"a", "b"});
+
+    csConf.setCapacity(a, aCapacity);
+    csConf.setCapacity(b, bCapacity);
+
+    // Define 2nd-level queues
+    csConf.setQueues(a, new String[] {"a1", "a2"});
+    csConf.setCapacity(a1, a1Capacity);
+    csConf.setUserLimitFactor(a1, 100.0f);
+    csConf.setCapacity(a2, a2Capacity);
+    csConf.setUserLimitFactor(a2, 100.0f);
+
+    csConf.setQueues(b, new String[] {"b1", "b2", "b3"});
+    csConf.setCapacity(b1, b1Capacity);
+    csConf.setUserLimitFactor(b1, 100.0f);
+    csConf.setCapacity(b2, b2Capacity);
+    csConf.setUserLimitFactor(b2, 100.0f);
+    csConf.setCapacity(b3, b3Capacity);
+    csConf.setUserLimitFactor(b3, 100.0f);
+
+    YarnConfiguration rmConf = new YarnConfiguration(csConf);
+
+    ResourceManager resourceManager = new ResourceManager();
+    rmConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    resourceManager.init(rmConf);
+    resourceManager.start();
+
+    String rmAddress = WebAppUtils.getRMWebAppURLWithScheme(this.conf);
+    SchedulerTypeInfo sti = GPGUtils
+        .invokeRMWebService(conf, rmAddress, RMWSConsts.SCHEDULER,
+            SchedulerTypeInfo.class);
+
+    Assert.assertNotNull(sti);
+  }
+
+  /**
+   * Testable policy generator overrides the methods that communicate
+   * with the RM REST endpoint, allowing us to inject faked responses.
+   */
+  class TestablePolicyGenerator extends PolicyGenerator {
+
+    TestablePolicyGenerator() {
+      super(conf, gpgContext);
+    }
+
+    @Override
+    protected Map<SubClusterId, Map<Class, Object>> getInfos(
+        Map<SubClusterId, SubClusterInfo> activeSubClusters) {
+      Map<SubClusterId, Map<Class, Object>> ret = new HashMap<>();
+      for (SubClusterId id : activeSubClusters.keySet()) {
+        if (!ret.containsKey(id)) {
+          ret.put(id, new HashMap<Class, Object>());
+        }
+        ret.get(id).put(ClusterMetricsInfo.class,
+            clusterInfos.get(id).get(ClusterMetricsInfo.class));
+      }
+      return ret;
+    }
+
+    @Override
+    protected Map<SubClusterId, SchedulerInfo> getSchedulerInfo(
+        Map<SubClusterId, SubClusterInfo> activeSubClusters) {
+      Map<SubClusterId, SchedulerInfo> ret =
+          new HashMap<SubClusterId, SchedulerInfo>();
+      for (SubClusterId id : activeSubClusters.keySet()) {
+        ret.put(id, schedulerInfos.get(id));
+      }
+      return ret;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6800cf70/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo1.json
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo1.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo1.json
new file mode 100644
index 0000000..3ad4594
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo1.json
@@ -0,0 +1,134 @@
+{
+  "capacity": 100.0,
+  "usedCapacity": 0.0,
+  "maxCapacity": 100.0,
+  "queueName": "root",
+  "queues": {
+    "queue": [
+      {
+        "type": "capacitySchedulerLeafQueueInfo",
+        "capacity": 100.0,
+        "usedCapacity": 0.0,
+        "maxCapacity": 100.0,
+        "absoluteCapacity": 100.0,
+        "absoluteMaxCapacity": 100.0,
+        "absoluteUsedCapacity": 0.0,
+        "numApplications": 484,
+        "queueName": "default",
+        "state": "RUNNING",
+        "resourcesUsed": {
+          "memory": 0,
+          "vCores": 0
+        },
+        "hideReservationQueues": false,
+        "nodeLabels": [
+          "*"
+        ],
+        "numActiveApplications": 484,
+        "numPendingApplications": 0,
+        "numContainers": 0,
+        "maxApplications": 10000,
+        "maxApplicationsPerUser": 10000,
+        "userLimit": 100,
+        "users": {
+          "user": [
+            {
+              "username": "Default",
+              "resourcesUsed": {
+                "memory": 0,
+                "vCores": 0
+              },
+              "numPendingApplications": 0,
+              "numActiveApplications": 468,
+              "AMResourceUsed": {
+                "memory": 30191616,
+                "vCores": 468
+              },
+              "userResourceLimit": {
+                "memory": 31490048,
+                "vCores": 7612
+              }
+            }
+          ]
+        },
+        "userLimitFactor": 1.0,
+        "AMResourceLimit": {
+          "memory": 31490048,
+          "vCores": 7612
+        },
+        "usedAMResource": {
+          "memory": 30388224,
+          "vCores": 532
+        },
+        "userAMResourceLimit": {
+          "memory": 31490048,
+          "vCores": 7612
+        },
+        "preemptionDisabled": true
+      }
+    ]
+  },
+  "health": {
+    "lastrun": 1517951638085,
+    "operationsInfo": {
+      "entry": {
+        "key": "last-allocation",
+        "value": {
+          "nodeId": "node0:0",
+          "containerId": "container_e61477_1517922128312_0340_01_000001",
+          "queue": "root.default"
+        }
+      },
+      "entry": {
+        "key": "last-reservation",
+        "value": {
+          "nodeId": "node0:1",
+          "containerId": "container_e61477_1517879828320_0249_01_000001",
+          "queue": "root.default"
+        }
+      },
+      "entry": {
+        "key": "last-release",
+        "value": {
+          "nodeId": "node0:2",
+          "containerId": "container_e61477_1517922128312_0340_01_000001",
+          "queue": "root.default"
+        }
+      },
+      "entry": {
+        "key": "last-preemption",
+        "value": {
+          "nodeId": "N/A",
+          "containerId": "N/A",
+          "queue": "N/A"
+        }
+      }
+    },
+    "lastRunDetails": [
+      {
+        "operation": "releases",
+        "count": 0,
+        "resources": {
+          "memory": 0,
+          "vCores": 0
+        }
+      },
+      {
+        "operation": "allocations",
+        "count": 0,
+        "resources": {
+          "memory": 0,
+          "vCores": 0
+        }
+      },
+      {
+        "operation": "reservations",
+        "count": 0,
+        "resources": {
+          "memory": 0,
+          "vCores": 0
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org