You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/08/02 00:24:56 UTC

[14/50] [abbrv] hadoop git commit: YARN-5325. Stateless ARMRMProxy policies implementation. (Carlo Curino via Subru).

YARN-5325. Stateless ARMRMProxy policies implementation. (Carlo Curino via Subru).


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/11c53365
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/11c53365
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/11c53365

Branch: refs/heads/YARN-2915
Commit: 11c5336522d3504598fb94eee288d54df73418c6
Parents: 1298127
Author: Subru Krishnan <su...@apache.org>
Authored: Thu Oct 13 17:59:13 2016 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Tue Aug 1 17:22:10 2017 -0700

----------------------------------------------------------------------
 .../AbstractConfigurableFederationPolicy.java   | 155 +++++
 .../policies/ConfigurableFederationPolicy.java  |   9 +-
 .../FederationPolicyInitializationContext.java  |  37 +-
 ...ionPolicyInitializationContextValidator.java |  28 +-
 .../policies/FederationPolicyManager.java       |  59 +-
 .../amrmproxy/AbstractAMRMProxyPolicy.java      |  47 ++
 .../amrmproxy/BroadcastAMRMProxyPolicy.java     |  85 +++
 .../amrmproxy/FederationAMRMProxyPolicy.java    |  25 +-
 .../LocalityMulticastAMRMProxyPolicy.java       | 583 +++++++++++++++++++
 .../policies/amrmproxy/package-info.java        |   1 -
 .../policies/dao/WeightedPolicyInfo.java        | 180 +++---
 .../federation/policies/dao/package-info.java   |   1 -
 .../policies/exceptions/package-info.java       |   1 -
 .../federation/policies/package-info.java       |   1 -
 .../policies/router/AbstractRouterPolicy.java   |  47 ++
 .../router/BaseWeightedRouterPolicy.java        | 150 -----
 .../policies/router/FederationRouterPolicy.java |   5 +-
 .../policies/router/LoadBasedRouterPolicy.java  |  36 +-
 .../policies/router/PriorityRouterPolicy.java   |  19 +-
 .../router/UniformRandomRouterPolicy.java       |  28 +-
 .../router/WeightedRandomRouterPolicy.java      |  32 +-
 .../policies/router/package-info.java           |   1 -
 .../resolver/AbstractSubClusterResolver.java    |   4 +-
 .../policies/BaseFederationPoliciesTest.java    |  28 +-
 ...ionPolicyInitializationContextValidator.java |  25 +-
 .../TestBroadcastAMRMProxyFederationPolicy.java | 112 ++++
 .../TestLocalityMulticastAMRMProxyPolicy.java   | 566 ++++++++++++++++++
 .../router/TestLoadBasedRouterPolicy.java       |  18 +-
 .../router/TestPriorityRouterPolicy.java        |  15 +-
 .../router/TestWeightedRandomRouterPolicy.java  |  35 +-
 .../utils/FederationPoliciesTestUtil.java       |  64 ++
 .../src/test/resources/nodes                    |   6 +-
 32 files changed, 1950 insertions(+), 453 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/11c53365/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractConfigurableFederationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractConfigurableFederationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractConfigurableFederationPolicy.java
new file mode 100644
index 0000000..4cb9bbe
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractConfigurableFederationPolicy.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import java.util.Map;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+
+/**
+ * Base abstract class for a weighted {@link ConfigurableFederationPolicy}.
+ */
+public abstract class AbstractConfigurableFederationPolicy
+    implements ConfigurableFederationPolicy {
+
+  private WeightedPolicyInfo policyInfo = null;
+  private FederationPolicyInitializationContext policyContext;
+  private boolean isDirty;
+
+  public AbstractConfigurableFederationPolicy() {
+  }
+
+  @Override
+  public void reinitialize(
+      FederationPolicyInitializationContext initializationContext)
+      throws FederationPolicyInitializationException {
+    isDirty = true;
+    FederationPolicyInitializationContextValidator
+        .validate(initializationContext, this.getClass().getCanonicalName());
+
+    // perform consistency checks
+    WeightedPolicyInfo newPolicyInfo = WeightedPolicyInfo.fromByteBuffer(
+        initializationContext.getSubClusterPolicyConfiguration().getParams());
+
+    // if nothing has changed skip the rest of initialization
+    // and signal to childs that the reinit is free via isDirty var.
+    if (policyInfo != null && policyInfo.equals(newPolicyInfo)) {
+      isDirty = false;
+      return;
+    }
+
+    validate(newPolicyInfo);
+    setPolicyInfo(newPolicyInfo);
+    this.policyContext = initializationContext;
+  }
+
+  /**
+   * Overridable validation step for the policy configuration.
+   *
+   * @param newPolicyInfo the configuration to test.
+   *
+   * @throws FederationPolicyInitializationException if the configuration is not
+   *           valid.
+   */
+  public void validate(WeightedPolicyInfo newPolicyInfo)
+      throws FederationPolicyInitializationException {
+    if (newPolicyInfo == null) {
+      throw new FederationPolicyInitializationException(
+          "The policy to " + "validate should not be null.");
+    }
+  }
+
+  /**
+   * Returns true whether the last reinitialization requires actual changes, or
+   * was "free" as the weights have not changed. This is used by subclasses
+   * overriding reinitialize and calling super.reinitialize() to know wheter to
+   * quit early.
+   *
+   * @return whether more work is needed to initialize.
+   */
+  public boolean getIsDirty() {
+    return isDirty;
+  }
+
+  /**
+   * Getter method for the configuration weights.
+   *
+   * @return the {@link WeightedPolicyInfo} representing the policy
+   *         configuration.
+   */
+  public WeightedPolicyInfo getPolicyInfo() {
+    return policyInfo;
+  }
+
+  /**
+   * Setter method for the configuration weights.
+   *
+   * @param policyInfo the {@link WeightedPolicyInfo} representing the policy
+   *          configuration.
+   */
+  public void setPolicyInfo(WeightedPolicyInfo policyInfo) {
+    this.policyInfo = policyInfo;
+  }
+
+  /**
+   * Getter method for the {@link FederationPolicyInitializationContext}.
+   *
+   * @return the context for this policy.
+   */
+  public FederationPolicyInitializationContext getPolicyContext() {
+    return policyContext;
+  }
+
+  /**
+   * Setter method for the {@link FederationPolicyInitializationContext}.
+   *
+   * @param policyContext the context to assign to this policy.
+   */
+  public void setPolicyContext(
+      FederationPolicyInitializationContext policyContext) {
+    this.policyContext = policyContext;
+  }
+
+  /**
+   * This methods gets active subclusters map from the {@code
+   * FederationStateStoreFacade} and validate it not being null/empty.
+   *
+   * @return the map of ids to info for all active subclusters.
+   *
+   * @throws YarnException if we can't get the list.
+   */
+  protected Map<SubClusterId, SubClusterInfo> getActiveSubclusters()
+      throws YarnException {
+
+    Map<SubClusterId, SubClusterInfo> activeSubclusters =
+        getPolicyContext().getFederationStateStoreFacade().getSubClusters(true);
+
+    if (activeSubclusters == null || activeSubclusters.size() < 1) {
+      throw new NoActiveSubclustersException(
+          "Zero active subclusters, cannot pick where to send job.");
+    }
+    return activeSubclusters;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11c53365/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/ConfigurableFederationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/ConfigurableFederationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/ConfigurableFederationPolicy.java
index fd6ceea..5245772 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/ConfigurableFederationPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/ConfigurableFederationPolicy.java
@@ -31,14 +31,11 @@ public interface ConfigurableFederationPolicy {
    * policies. The implementor should provide try-n-swap semantics, and retain
    * state if possible.
    *
-   * @param federationPolicyInitializationContext the new context to provide to
-   *                                              implementor.
+   * @param policyContext the new context to provide to implementor.
    *
    * @throws FederationPolicyInitializationException in case the initialization
-   *                                                 fails.
+   *           fails.
    */
-  void reinitialize(
-      FederationPolicyInitializationContext
-          federationPolicyInitializationContext)
+  void reinitialize(FederationPolicyInitializationContext policyContext)
       throws FederationPolicyInitializationException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11c53365/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java
index 9347fd0..46dd6eb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.federation.policies;
 
 import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 
@@ -30,6 +31,7 @@ public class FederationPolicyInitializationContext {
   private SubClusterPolicyConfiguration federationPolicyConfiguration;
   private SubClusterResolver federationSubclusterResolver;
   private FederationStateStoreFacade federationStateStoreFacade;
+  private SubClusterId homeSubcluster;
 
   public FederationPolicyInitializationContext() {
     federationPolicyConfiguration = null;
@@ -37,20 +39,19 @@ public class FederationPolicyInitializationContext {
     federationStateStoreFacade = null;
   }
 
-  public FederationPolicyInitializationContext(SubClusterPolicyConfiguration
-      policy, SubClusterResolver resolver, FederationStateStoreFacade
-      storeFacade) {
+  public FederationPolicyInitializationContext(
+      SubClusterPolicyConfiguration policy, SubClusterResolver resolver,
+      FederationStateStoreFacade storeFacade) {
     this.federationPolicyConfiguration = policy;
     this.federationSubclusterResolver = resolver;
     this.federationStateStoreFacade = storeFacade;
   }
 
-
   /**
    * Getter for the {@link SubClusterPolicyConfiguration}.
    *
    * @return the {@link SubClusterPolicyConfiguration} to be used for
-   * initialization.
+   *         initialization.
    */
   public SubClusterPolicyConfiguration getSubClusterPolicyConfiguration() {
     return federationPolicyConfiguration;
@@ -59,8 +60,8 @@ public class FederationPolicyInitializationContext {
   /**
    * Setter for the {@link SubClusterPolicyConfiguration}.
    *
-   * @param fedPolicyConfiguration the {@link SubClusterPolicyConfiguration}
-   *                               to be used for initialization.
+   * @param fedPolicyConfiguration the {@link SubClusterPolicyConfiguration} to
+   *          be used for initialization.
    */
   public void setSubClusterPolicyConfiguration(
       SubClusterPolicyConfiguration fedPolicyConfiguration) {
@@ -80,7 +81,7 @@ public class FederationPolicyInitializationContext {
    * Setter for the {@link SubClusterResolver}.
    *
    * @param federationSubclusterResolver the {@link SubClusterResolver} to be
-   *                                     used for initialization.
+   *          used for initialization.
    */
   public void setFederationSubclusterResolver(
       SubClusterResolver federationSubclusterResolver) {
@@ -105,4 +106,24 @@ public class FederationPolicyInitializationContext {
       FederationStateStoreFacade federationStateStoreFacade) {
     this.federationStateStoreFacade = federationStateStoreFacade;
   }
+
+  /**
+   * Returns the current home sub-cluster. Useful for default policy behaviors.
+   *
+   * @return the home sub-cluster.
+   */
+  public SubClusterId getHomeSubcluster() {
+    return homeSubcluster;
+  }
+
+  /**
+   * Sets in the context the home sub-cluster. Useful for default policy
+   * behaviors.
+   *
+   * @param homeSubcluster value to set.
+   */
+  public void setHomeSubcluster(SubClusterId homeSubcluster) {
+    this.homeSubcluster = homeSubcluster;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11c53365/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
index 31f83d4..1b83bbc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
@@ -25,50 +25,44 @@ import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPo
 public final class FederationPolicyInitializationContextValidator {
 
   private FederationPolicyInitializationContextValidator() {
-    //disable constructor per checkstyle
+    // disable constructor per checkstyle
   }
 
   public static void validate(
-      FederationPolicyInitializationContext
-          federationPolicyInitializationContext,
-      String myType) throws FederationPolicyInitializationException {
+      FederationPolicyInitializationContext policyContext, String myType)
+      throws FederationPolicyInitializationException {
 
     if (myType == null) {
-      throw new FederationPolicyInitializationException("The myType parameter"
-          + " should not be null.");
+      throw new FederationPolicyInitializationException(
+          "The myType parameter" + " should not be null.");
     }
 
-    if (federationPolicyInitializationContext == null) {
+    if (policyContext == null) {
       throw new FederationPolicyInitializationException(
           "The FederationPolicyInitializationContext provided is null. Cannot"
-              + " reinitalize "
-              + "successfully.");
+              + " reinitalize " + "successfully.");
     }
 
-    if (federationPolicyInitializationContext.getFederationStateStoreFacade()
-        == null) {
+    if (policyContext.getFederationStateStoreFacade() == null) {
       throw new FederationPolicyInitializationException(
           "The FederationStateStoreFacade provided is null. Cannot"
               + " reinitalize successfully.");
     }
 
-    if (federationPolicyInitializationContext.getFederationSubclusterResolver()
-        == null) {
+    if (policyContext.getFederationSubclusterResolver() == null) {
       throw new FederationPolicyInitializationException(
           "The FederationStateStoreFacase provided is null. Cannot"
               + " reinitalize successfully.");
     }
 
-    if (federationPolicyInitializationContext.getSubClusterPolicyConfiguration()
-        == null) {
+    if (policyContext.getSubClusterPolicyConfiguration() == null) {
       throw new FederationPolicyInitializationException(
           "The FederationSubclusterResolver provided is null. Cannot "
               + "reinitalize successfully.");
     }
 
     String intendedType =
-        federationPolicyInitializationContext.getSubClusterPolicyConfiguration()
-            .getType();
+        policyContext.getSubClusterPolicyConfiguration().getType();
 
     if (!myType.equals(intendedType)) {
       throw new FederationPolicyInitializationException(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11c53365/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java
index e5dba63..39fdba3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java
@@ -25,19 +25,19 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyCo
 /**
  *
  * Implementors need to provide the ability to serliaze a policy and its
- * configuration as a {@link SubClusterPolicyConfiguration}, as well as
- * provide (re)initialization mechanics for the underlying
+ * configuration as a {@link SubClusterPolicyConfiguration}, as well as provide
+ * (re)initialization mechanics for the underlying
  * {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy}.
  *
- * The serialization aspects are used by admin APIs or a policy engine to
- * store a serialized configuration in the {@code FederationStateStore},
- * while the getters methods are used to obtain a propertly inizialized
- * policy in the {@code Router} and {@code AMRMProxy} respectively.
+ * The serialization aspects are used by admin APIs or a policy engine to store
+ * a serialized configuration in the {@code FederationStateStore}, while the
+ * getters methods are used to obtain a propertly inizialized policy in the
+ * {@code Router} and {@code AMRMProxy} respectively.
  *
- * This interface by design binds together
- * {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy} and
- * provide lifecycle support for serialization and deserialization, to reduce
- * configuration mistakes (combining incompatible policies).
+ * This interface by design binds together {@link FederationAMRMProxyPolicy} and
+ * {@link FederationRouterPolicy} and provide lifecycle support for
+ * serialization and deserialization, to reduce configuration mistakes
+ * (combining incompatible policies).
  *
  */
 public interface FederationPolicyManager {
@@ -50,23 +50,17 @@ public interface FederationPolicyManager {
    * the implementors should attempt to reinitalize (retaining state). To affect
    * a complete policy reset oldInstance should be null.
    *
-   * @param federationPolicyInitializationContext the current context
-   * @param oldInstance                           the existing (possibly null)
-   *                                              instance.
+   * @param policyContext the current context
+   * @param oldInstance the existing (possibly null) instance.
    *
-   * @return an updated {@link FederationAMRMProxyPolicy
-  }.
+   * @return an updated {@link FederationAMRMProxyPolicy }.
    *
    * @throws FederationPolicyInitializationException if the initialization
-   *                                                 cannot be completed
-   *                                                 properly. The oldInstance
-   *                                                 should be still valid in
-   *                                                 case of failed
-   *                                                 initialization.
+   *           cannot be completed properly. The oldInstance should be still
+   *           valid in case of failed initialization.
    */
   FederationAMRMProxyPolicy getAMRMPolicy(
-      FederationPolicyInitializationContext
-          federationPolicyInitializationContext,
+      FederationPolicyInitializationContext policyContext,
       FederationAMRMProxyPolicy oldInstance)
       throws FederationPolicyInitializationException;
 
@@ -78,21 +72,17 @@ public interface FederationPolicyManager {
    * implementors should attempt to reinitalize (retaining state). To affect a
    * complete policy reset oldInstance shoulb be set to null.
    *
-   * @param federationPolicyInitializationContext the current context
-   * @param oldInstance                           the existing (possibly null)
-   *                                              instance.
+   * @param policyContext the current context
+   * @param oldInstance the existing (possibly null) instance.
    *
    * @return an updated {@link FederationRouterPolicy}.
    *
    * @throws FederationPolicyInitializationException if the initalization cannot
-   *                                                 be completed properly. The
-   *                                                 oldInstance should be still
-   *                                                 valid in case of failed
-   *                                                 initialization.
+   *           be completed properly. The oldInstance should be still valid in
+   *           case of failed initialization.
    */
   FederationRouterPolicy getRouterPolicy(
-      FederationPolicyInitializationContext
-          federationPolicyInitializationContext,
+      FederationPolicyInitializationContext policyContext,
       FederationRouterPolicy oldInstance)
       throws FederationPolicyInitializationException;
 
@@ -102,23 +92,24 @@ public interface FederationPolicyManager {
    * store.
    *
    * @return a valid policy configuration representing this object
-   * parametrization.
+   *         parametrization.
    *
    * @throws FederationPolicyInitializationException if the current state cannot
-   *                                                 be serialized properly
+   *           be serialized properly
    */
   SubClusterPolicyConfiguration serializeConf()
       throws FederationPolicyInitializationException;
 
-
   /**
    * This method returns the queue this policy is configured for.
+   *
    * @return the name of the queue.
    */
   String getQueue();
 
   /**
    * This methods provides a setter for the queue this policy is specified for.
+   *
    * @param queue the name of the queue.
    */
   void setQueue(String queue);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11c53365/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/AbstractAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/AbstractAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/AbstractAMRMProxyPolicy.java
new file mode 100644
index 0000000..e853744
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/AbstractAMRMProxyPolicy.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
+
+import java.util.Map;
+
+import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+
+/**
+ * Base abstract class for {@link FederationAMRMProxyPolicy} implementations,
+ * that provides common validation for reinitialization.
+ */
+public abstract class AbstractAMRMProxyPolicy extends
+    AbstractConfigurableFederationPolicy implements FederationAMRMProxyPolicy {
+
+  @Override
+  public void validate(WeightedPolicyInfo newPolicyInfo)
+      throws FederationPolicyInitializationException {
+    super.validate(newPolicyInfo);
+    Map<SubClusterIdInfo, Float> newWeights =
+        newPolicyInfo.getAMRMPolicyWeights();
+    if (newWeights == null || newWeights.size() < 1) {
+      throw new FederationPolicyInitializationException(
+          "Weight vector cannot be null/empty.");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11c53365/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java
new file mode 100644
index 0000000..679f4d5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.UnknownSubclusterException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+
+/**
+ * An implementation of the {@link FederationAMRMProxyPolicy} that simply
+ * broadcasts each {@link ResourceRequest} to all the available sub-clusters.
+ */
+public class BroadcastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
+
+  private Set<SubClusterId> knownClusterIds = new HashSet<>();
+
+  @Override
+  public void reinitialize(
+      FederationPolicyInitializationContext policyContext)
+      throws FederationPolicyInitializationException {
+    // overrides initialize to avoid weight checks that do no apply for
+    // this policy.
+    FederationPolicyInitializationContextValidator
+        .validate(policyContext, this.getClass().getCanonicalName());
+    setPolicyContext(policyContext);
+  }
+
+  @Override
+  public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
+      List<ResourceRequest> resourceRequests) throws YarnException {
+
+    Map<SubClusterId, SubClusterInfo> activeSubclusters =
+        getActiveSubclusters();
+
+    Map<SubClusterId, List<ResourceRequest>> answer = new HashMap<>();
+
+    // simply broadcast the resource request to all sub-clusters
+    for (SubClusterId subClusterId : activeSubclusters.keySet()) {
+      answer.put(subClusterId, resourceRequests);
+      knownClusterIds.add(subClusterId);
+    }
+
+    return answer;
+  }
+
+  @Override
+  public void notifyOfResponse(SubClusterId subClusterId,
+      AllocateResponse response) throws YarnException {
+    if (!knownClusterIds.contains(subClusterId)) {
+      throw new UnknownSubclusterException(
+          "The response is received from a subcluster that is unknown to this "
+              + "policy.");
+    }
+    // stateless policy does not care about responses
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11c53365/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java
index 4a3305c..0541df4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java
@@ -17,18 +17,18 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
 
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 
-import java.util.List;
-import java.util.Map;
-
 /**
- * Implementors of this interface provide logic to split the list of {@link
- * ResourceRequest}s received by the AM among various RMs.
+ * Implementors of this interface provide logic to split the list of
+ * {@link ResourceRequest}s received by the AM among various RMs.
  */
 public interface FederationAMRMProxyPolicy
     extends ConfigurableFederationPolicy {
@@ -37,18 +37,17 @@ public interface FederationAMRMProxyPolicy
    * Splits the {@link ResourceRequest}s from the client across one or more
    * sub-clusters based on the policy semantics (e.g., broadcast, load-based).
    *
-   * @param resourceRequests the list of {@link ResourceRequest}s from the
-   *                         AM to be split
+   * @param resourceRequests the list of {@link ResourceRequest}s from the AM to
+   *          be split
    *
    * @return map of sub-cluster as identified by {@link SubClusterId} to the
-   * list of {@link ResourceRequest}s that should be forwarded to it
+   *         list of {@link ResourceRequest}s that should be forwarded to it
    *
    * @throws YarnException in case the request is malformed or no viable
-   *                       sub-clusters can be found.
+   *           sub-clusters can be found.
    */
   Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
-      List<ResourceRequest> resourceRequests)
-      throws YarnException;
+      List<ResourceRequest> resourceRequests) throws YarnException;
 
   /**
    * This method should be invoked to notify the policy about responses being
@@ -60,7 +59,7 @@ public interface FederationAMRMProxyPolicy
    *
    * @throws YarnException in case the response is not valid
    */
-  void notifyOfResponse(SubClusterId subClusterId,
-      AllocateResponse response) throws YarnException;
+  void notifyOfResponse(SubClusterId subClusterId, AllocateResponse response)
+      throws YarnException;
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11c53365/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
new file mode 100644
index 0000000..283f89e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
@@ -0,0 +1,583 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException;
+import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An implementation of the {@link FederationAMRMProxyPolicy} interface that
+ * carefully multicasts the requests with the following behavior:
+ *
+ * <p>
+ * Host localized {@link ResourceRequest}s are always forwarded to the RM that
+ * owns the corresponding node, based on the feedback of a
+ * {@link SubClusterResolver}. If the {@link SubClusterResolver} cannot resolve
+ * this node we default to forwarding the {@link ResourceRequest} to the home
+ * sub-cluster.
+ * </p>
+ *
+ * <p>
+ * Rack localized {@link ResourceRequest}s are forwarded to the RMs that owns
+ * the corresponding rack. Note that in some deployments each rack could be
+ * striped across multiple RMs. Thsi policy respects that. If the
+ * {@link SubClusterResolver} cannot resolve this rack we default to forwarding
+ * the {@link ResourceRequest} to the home sub-cluster.
+ * </p>
+ *
+ * <p>
+ * ANY requests corresponding to node/rack local requests are forwarded only to
+ * the set of RMs that owns the corresponding localized requests. The number of
+ * containers listed in each ANY is proportional to the number of localized
+ * container requests (associated to this ANY via the same allocateRequestId).
+ * </p>
+ *
+ * <p>
+ * ANY that are not associated to node/rack local requests are split among RMs
+ * based on the "weights" in the {@link WeightedPolicyInfo} configuration *and*
+ * headroom information. The {@code headroomAlpha} parameter of the policy
+ * configuration indicates how much headroom contributes to the splitting
+ * choice. Value of 1.0f indicates the weights are interpreted only as 0/1
+ * boolean but all splitting is based on the advertised headroom (fallback to
+ * 1/N for RMs that we don't have headroom info from). An {@code headroomAlpha}
+ * value of 0.0f means headroom is ignored and all splitting decisions are
+ * proportional to the "weights" in the configuration of the policy.
+ * </p>
+ *
+ * <p>
+ * ANY of zero size are forwarded to all known subclusters (i.e., subclusters
+ * where we scheduled containers before), as they may represent a user attempt
+ * to cancel a previous request (and we are mostly stateless now, so should
+ * forward to all known RMs).
+ * </p>
+ *
+ * <p>
+ * Invariants:
+ * </p>
+ *
+ * <p>
+ * The policy always excludes non-active RMs.
+ * </p>
+ *
+ * <p>
+ * The policy always excludes RMs that do not appear in the policy configuration
+ * weights, or have a weight of 0 (even if localized resources explicit refer to
+ * it).
+ * </p>
+ *
+ * <p>
+ * (Bar rounding to closest ceiling of fractional containers) The sum of
+ * requests made to multiple RMs at the ANY level "adds-up" to the user request.
+ * The maximum possible excess in a given request is a number of containers less
+ * or equal to number of sub-clusters in the federation.
+ * </p>
+ */
+public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(LocalityMulticastAMRMProxyPolicy.class);
+
+  private Map<SubClusterId, Float> weights;
+  private SubClusterResolver resolver;
+
+  private Map<SubClusterId, Resource> headroom;
+  private float hrAlpha;
+  private FederationStateStoreFacade federationFacade;
+  private AllocationBookkeeper bookkeeper;
+  private SubClusterId homeSubcluster;
+
+  @Override
+  public void reinitialize(
+      FederationPolicyInitializationContext policyContext)
+      throws FederationPolicyInitializationException {
+
+    // save reference to old weights
+    WeightedPolicyInfo tempPolicy = getPolicyInfo();
+
+    super.reinitialize(policyContext);
+    if (!getIsDirty()) {
+      return;
+    }
+
+    Map<SubClusterId, Float> newWeightsConverted = new HashMap<>();
+    boolean allInactive = true;
+    WeightedPolicyInfo policy = getPolicyInfo();
+    if (policy.getAMRMPolicyWeights() == null
+        || policy.getAMRMPolicyWeights().size() == 0) {
+      allInactive = false;
+    } else {
+      for (Map.Entry<SubClusterIdInfo, Float> e : policy.getAMRMPolicyWeights()
+          .entrySet()) {
+        if (e.getValue() > 0) {
+          allInactive = false;
+        }
+        newWeightsConverted.put(e.getKey().toId(), e.getValue());
+      }
+    }
+    if (allInactive) {
+      // reset the policyInfo and throw
+      setPolicyInfo(tempPolicy);
+      throw new FederationPolicyInitializationException(
+          "The weights used to configure "
+              + "this policy are all set to zero! (no ResourceRequest could be "
+              + "forwarded with this setting.)");
+    }
+
+    if (policyContext.getHomeSubcluster() == null) {
+      setPolicyInfo(tempPolicy);
+      throw new FederationPolicyInitializationException("The homeSubcluster "
+          + "filed in the context must be initialized to use this policy");
+    }
+
+    weights = newWeightsConverted;
+    resolver = policyContext.getFederationSubclusterResolver();
+
+    if (headroom == null) {
+      headroom = new ConcurrentHashMap<>();
+    }
+    hrAlpha = policy.getHeadroomAlpha();
+
+    this.federationFacade =
+        policyContext.getFederationStateStoreFacade();
+    this.bookkeeper = new AllocationBookkeeper();
+    this.homeSubcluster = policyContext.getHomeSubcluster();
+
+  }
+
+  @Override
+  public void notifyOfResponse(SubClusterId subClusterId,
+      AllocateResponse response) throws YarnException {
+    // stateless policy does not care about responses except tracking headroom
+    headroom.put(subClusterId, response.getAvailableResources());
+  }
+
+  @Override
+  public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
+      List<ResourceRequest> resourceRequests) throws YarnException {
+
+    // object used to accumulate statistics about the answer, initialize with
+    // active subclusters.
+    bookkeeper.reinitialize(federationFacade.getSubClusters(true));
+
+    List<ResourceRequest> nonLocalizedRequests =
+        new ArrayList<ResourceRequest>();
+
+    SubClusterId targetId = null;
+    Set<SubClusterId> targetIds = null;
+
+    // if the RR is resolved to a local subcluster add it directly (node and
+    // resolvable racks)
+    for (ResourceRequest rr : resourceRequests) {
+      targetId = null;
+      targetIds = null;
+
+      // Handle: ANY (accumulated for later)
+      if (ResourceRequest.isAnyLocation(rr.getResourceName())) {
+        nonLocalizedRequests.add(rr);
+        continue;
+      }
+
+      // Handle "node" requests
+      try {
+        targetId = resolver.getSubClusterForNode(rr.getResourceName());
+      } catch (YarnException e) {
+        // this might happen as we can't differentiate node from rack names
+        // we log altogether later
+      }
+      if (bookkeeper.isActiveAndEnabled(targetId)) {
+        bookkeeper.addLocalizedNodeRR(targetId, rr);
+        continue;
+      }
+
+      // Handle "rack" requests
+      try {
+        targetIds = resolver.getSubClustersForRack(rr.getResourceName());
+      } catch (YarnException e) {
+        // this might happen as we can't differentiate node from rack names
+        // we log altogether later
+      }
+      if (targetIds != null && targetIds.size() > 0) {
+        for (SubClusterId tid : targetIds) {
+          if (bookkeeper.isActiveAndEnabled(tid)) {
+            bookkeeper.addRackRR(tid, rr);
+          }
+        }
+        continue;
+      }
+
+      // Handle node/rack requests that the SubClusterResolver cannot map to
+      // any cluster. Defaulting to home subcluster.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("ERROR resolving sub-cluster for resourceName: "
+            + rr.getResourceName() + " we are falling back to homeSubCluster:"
+            + homeSubcluster);
+      }
+
+      // If home-subcluster is not active, ignore node/rack request
+      if (bookkeeper.isActiveAndEnabled(homeSubcluster)) {
+        bookkeeper.addLocalizedNodeRR(homeSubcluster, rr);
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("The homeSubCluster (" + homeSubcluster + ") we are "
+              + "defaulting to is not active, the ResourceRequest "
+              + "will be ignored.");
+        }
+      }
+    }
+
+    // handle all non-localized requests (ANY)
+    splitAnyRequests(nonLocalizedRequests, bookkeeper);
+
+    return bookkeeper.getAnswer();
+  }
+
+  /**
+   * It splits a list of non-localized resource requests among sub-clusters.
+   */
+  private void splitAnyRequests(List<ResourceRequest> originalResourceRequests,
+      AllocationBookkeeper allocationBookkeeper) throws YarnException {
+
+    for (ResourceRequest resourceRequest : originalResourceRequests) {
+
+      // FIRST: pick the target set of subclusters (based on whether this RR
+      // is associated with other localized requests via an allocationId)
+      Long allocationId = resourceRequest.getAllocationRequestId();
+      Set<SubClusterId> targetSubclusters;
+      if (allocationBookkeeper.getSubClustersForId(allocationId) != null) {
+        targetSubclusters =
+            allocationBookkeeper.getSubClustersForId(allocationId);
+      } else {
+        targetSubclusters = allocationBookkeeper.getActiveAndEnabledSC();
+      }
+
+      // SECOND: pick how much to ask to each RM for each request
+      splitIndividualAny(resourceRequest, targetSubclusters,
+          allocationBookkeeper);
+    }
+  }
+
+  /**
+   * Return a projection of this ANY {@link ResourceRequest} that belongs to
+   * this sub-cluster. This is done based on the "count" of the containers that
+   * require locality in each sublcuster (if any) or based on the "weights" and
+   * headroom.
+   */
+  private void splitIndividualAny(ResourceRequest originalResourceRequest,
+      Set<SubClusterId> targetSubclusters,
+      AllocationBookkeeper allocationBookkeeper) {
+
+    long allocationId = originalResourceRequest.getAllocationRequestId();
+
+    for (SubClusterId targetId : targetSubclusters) {
+      float numContainer = originalResourceRequest.getNumContainers();
+
+      // If the ANY request has 0 containers to begin with we must forward it to
+      // any RM we have previously contacted (this might be the user way
+      // to cancel a previous request).
+      if (numContainer == 0 && headroom.containsKey(targetId)) {
+        allocationBookkeeper.addAnyRR(targetId, originalResourceRequest);
+      }
+
+      // If ANY is associated with localized asks, split based on their ratio
+      if (allocationBookkeeper.getSubClustersForId(allocationId) != null) {
+        float localityBasedWeight = getLocalityBasedWeighting(allocationId,
+            targetId, allocationBookkeeper);
+        numContainer = numContainer * localityBasedWeight;
+      } else {
+        // split ANY based on load and policy configuration
+        float headroomWeighting =
+            getHeadroomWeighting(targetId, allocationBookkeeper);
+        float policyWeighting =
+            getPolicyConfigWeighting(targetId, allocationBookkeeper);
+        // hrAlpha controls how much headroom influencing decision
+        numContainer = numContainer
+            * (hrAlpha * headroomWeighting + (1 - hrAlpha) * policyWeighting);
+      }
+
+      // if the calculated request is non-empty add it to the answer
+      if (numContainer > 0) {
+        ResourceRequest out =
+            ResourceRequest.newInstance(originalResourceRequest.getPriority(),
+                originalResourceRequest.getResourceName(),
+                originalResourceRequest.getCapability(),
+                originalResourceRequest.getNumContainers(),
+                originalResourceRequest.getRelaxLocality(),
+                originalResourceRequest.getNodeLabelExpression(),
+                originalResourceRequest.getExecutionTypeRequest());
+        out.setAllocationRequestId(allocationId);
+        out.setNumContainers((int) Math.ceil(numContainer));
+        if (out.isAnyLocation(out.getResourceName())) {
+          allocationBookkeeper.addAnyRR(targetId, out);
+        } else {
+          allocationBookkeeper.addRackRR(targetId, out);
+        }
+      }
+    }
+  }
+
+  /**
+   * Compute the weight to assign to a subcluster based on how many local
+   * requests a subcluster is target of.
+   */
+  private float getLocalityBasedWeighting(long reqId, SubClusterId targetId,
+      AllocationBookkeeper allocationBookkeeper) {
+    float totWeight = allocationBookkeeper.getTotNumLocalizedContainers();
+    float localWeight =
+        allocationBookkeeper.getNumLocalizedContainers(reqId, targetId);
+    return totWeight > 0 ? localWeight / totWeight : 0;
+  }
+
+  /**
+   * Compute the "weighting" to give to a sublcuster based on the configured
+   * policy weights (for the active subclusters).
+   */
+  private float getPolicyConfigWeighting(SubClusterId targetId,
+      AllocationBookkeeper allocationBookkeeper) {
+    float totWeight = allocationBookkeeper.totPolicyWeight;
+    Float localWeight = weights.get(targetId);
+    return (localWeight != null && totWeight > 0) ? localWeight / totWeight : 0;
+  }
+
+  /**
+   * Compute the weighting based on available headroom. This is proportional to
+   * the available headroom memory announced by RM, or to 1/N for RMs we have
+   * not seen yet. If all RMs report zero headroom, we fallback to 1/N again.
+   */
+  private float getHeadroomWeighting(SubClusterId targetId,
+      AllocationBookkeeper allocationBookkeeper) {
+
+    // baseline weight for all RMs
+    float headroomWeighting =
+        1 / (float) allocationBookkeeper.getActiveAndEnabledSC().size();
+
+    // if we have headroom infomration for this sub-cluster (and we are safe
+    // from /0 issues)
+    if (headroom.containsKey(targetId)
+        && allocationBookkeeper.totHeadroomMemory > 0) {
+      // compute which portion of the RMs that are active/enabled have reported
+      // their headroom (needed as adjustment factor)
+      // (note: getActiveAndEnabledSC should never be null/zero)
+      float ratioHeadroomKnown = allocationBookkeeper.totHeadRoomEnabledRMs
+          / (float) allocationBookkeeper.getActiveAndEnabledSC().size();
+
+      // headroomWeighting is the ratio of headroom memory in the targetId
+      // cluster / total memory. The ratioHeadroomKnown factor is applied to
+      // adjust for missing information and ensure sum of allocated containers
+      // closely approximate what the user asked (small excess).
+      headroomWeighting = (headroom.get(targetId).getMemorySize()
+          / allocationBookkeeper.totHeadroomMemory) * (ratioHeadroomKnown);
+    }
+    return headroomWeighting;
+  }
+
+  /**
+   * This helper class is used to book-keep the requests made to each
+   * subcluster, and maintain useful statistics to split ANY requests.
+   */
+  private final class AllocationBookkeeper {
+
+    // the answer being accumulated
+    private Map<SubClusterId, List<ResourceRequest>> answer = new TreeMap<>();
+
+    // stores how many containers we have allocated in each RM for localized
+    // asks, used to correctly "spread" the corresponding ANY
+    private Map<Long, Map<SubClusterId, AtomicLong>> countContainersPerRM =
+        new HashMap<>();
+
+    private Set<SubClusterId> activeAndEnabledSC = new HashSet<>();
+    private long totNumLocalizedContainers = 0;
+    private float totHeadroomMemory = 0;
+    private int totHeadRoomEnabledRMs = 0;
+    private float totPolicyWeight = 0;
+
+    private void reinitialize(
+        Map<SubClusterId, SubClusterInfo> activeSubclusters)
+        throws YarnException {
+
+      // reset data structures
+      answer.clear();
+      countContainersPerRM.clear();
+      activeAndEnabledSC.clear();
+      totNumLocalizedContainers = 0;
+      totHeadroomMemory = 0;
+      totHeadRoomEnabledRMs = 0;
+      totPolicyWeight = 0;
+
+      // pre-compute the set of subclusters that are both active and enabled by
+      // the policy weights, and accumulate their total weight
+      for (Map.Entry<SubClusterId, Float> entry : weights.entrySet()) {
+        if (entry.getValue() > 0
+            && activeSubclusters.containsKey(entry.getKey())) {
+          activeAndEnabledSC.add(entry.getKey());
+          totPolicyWeight += entry.getValue();
+        }
+      }
+
+      if (activeAndEnabledSC.size() < 1) {
+        throw new NoActiveSubclustersException(
+            "None of the subclusters enabled in this policy (weight>0) are "
+                + "currently active we cannot forward the ResourceRequest(s)");
+      }
+
+      // pre-compute headroom-based weights for active/enabled subclusters
+      for (Map.Entry<SubClusterId, Resource> r : headroom.entrySet()) {
+        if (activeAndEnabledSC.contains(r.getKey())) {
+          totHeadroomMemory += r.getValue().getMemorySize();
+          totHeadRoomEnabledRMs++;
+        }
+      }
+
+    }
+
+    /**
+     * Add to the answer a localized node request, and keeps track of statistics
+     * on a per-allocation-id and per-subcluster bases.
+     */
+    private void addLocalizedNodeRR(SubClusterId targetId, ResourceRequest rr) {
+      Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName()));
+
+      if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) {
+        countContainersPerRM.put(rr.getAllocationRequestId(), new HashMap<>());
+      }
+      if (!countContainersPerRM.get(rr.getAllocationRequestId())
+          .containsKey(targetId)) {
+        countContainersPerRM.get(rr.getAllocationRequestId()).put(targetId,
+            new AtomicLong(0));
+      }
+      countContainersPerRM.get(rr.getAllocationRequestId()).get(targetId)
+          .addAndGet(rr.getNumContainers());
+
+      totNumLocalizedContainers += rr.getNumContainers();
+
+      internalAddToAnswer(targetId, rr);
+    }
+
+    /**
+     * Add a rack-local request to the final asnwer.
+     */
+    public void addRackRR(SubClusterId targetId, ResourceRequest rr) {
+      Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName()));
+      internalAddToAnswer(targetId, rr);
+    }
+
+    /**
+     * Add an ANY request to the final answer.
+     */
+    private void addAnyRR(SubClusterId targetId, ResourceRequest rr) {
+      Preconditions.checkArgument(rr.isAnyLocation(rr.getResourceName()));
+      internalAddToAnswer(targetId, rr);
+    }
+
+    private void internalAddToAnswer(SubClusterId targetId,
+        ResourceRequest partialRR) {
+      if (!answer.containsKey(targetId)) {
+        answer.put(targetId, new ArrayList<ResourceRequest>());
+      }
+      answer.get(targetId).add(partialRR);
+    }
+
+    /**
+     * Return all known subclusters associated with an allocation id.
+     *
+     * @param allocationId the allocation id considered
+     *
+     * @return the list of {@link SubClusterId}s associated with this allocation
+     *         id
+     */
+    private Set<SubClusterId> getSubClustersForId(long allocationId) {
+      if (countContainersPerRM.get(allocationId) == null) {
+        return null;
+      }
+      return countContainersPerRM.get(allocationId).keySet();
+    }
+
+    /**
+     * Return the answer accumulated so far.
+     *
+     * @return the answer
+     */
+    private Map<SubClusterId, List<ResourceRequest>> getAnswer() {
+      return answer;
+    }
+
+    /**
+     * Return the set of sub-clusters that are both active and allowed by our
+     * policy (weight > 0).
+     *
+     * @return a set of active and enabled {@link SubClusterId}s
+     */
+    private Set<SubClusterId> getActiveAndEnabledSC() {
+      return activeAndEnabledSC;
+    }
+
+    /**
+     * Return the total number of container coming from localized requests.
+     */
+    private long getTotNumLocalizedContainers() {
+      return totNumLocalizedContainers;
+    }
+
+    /**
+     * Returns the number of containers matching an allocation Id that are
+     * localized in the targetId subcluster.
+     */
+    private long getNumLocalizedContainers(long allocationId,
+        SubClusterId targetId) {
+      AtomicLong c = countContainersPerRM.get(allocationId).get(targetId);
+      return c == null ? 0 : c.get();
+    }
+
+    /**
+     * Returns true is the subcluster request is both active and enabled.
+     */
+    private boolean isActiveAndEnabled(SubClusterId targetId) {
+      if (targetId == null) {
+        return false;
+      } else {
+        return getActiveAndEnabledSC().contains(targetId);
+      }
+    }
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11c53365/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/package-info.java
index 99da20b..ef72647 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/package-info.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/package-info.java
@@ -17,4 +17,3 @@
  */
 /** AMRMPRoxy policies. **/
 package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
-

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11c53365/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java
index a0fa37f..62eb03b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java
@@ -17,10 +17,19 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.dao;
 
-import com.sun.jersey.api.json.JSONConfiguration;
-import com.sun.jersey.api.json.JSONJAXBContext;
-import com.sun.jersey.api.json.JSONMarshaller;
-import com.sun.jersey.api.json.JSONUnmarshaller;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -29,24 +38,16 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlRootElement;
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Map;
+import com.sun.jersey.api.json.JSONConfiguration;
+import com.sun.jersey.api.json.JSONJAXBContext;
+import com.sun.jersey.api.json.JSONMarshaller;
+import com.sun.jersey.api.json.JSONUnmarshaller;
 
 /**
  * This is a DAO class for the configuration of parameteres for federation
  * policies. This generalizes several possible configurations as two lists of
- * {@link SubClusterIdInfo} and corresponding weights as a
- * {@link Float}. The interpretation of the weight is left to the logic in
- * the policy.
+ * {@link SubClusterIdInfo} and corresponding weights as a {@link Float}. The
+ * interpretation of the weight is left to the logic in the policy.
  */
 
 @InterfaceAudience.Private
@@ -57,12 +58,14 @@ public class WeightedPolicyInfo {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(WeightedPolicyInfo.class);
-
+  private static JSONJAXBContext jsonjaxbContext = initContext();
   private Map<SubClusterIdInfo, Float> routerPolicyWeights = new HashMap<>();
   private Map<SubClusterIdInfo, Float> amrmPolicyWeights = new HashMap<>();
   private float headroomAlpha;
 
-  private static JSONJAXBContext jsonjaxbContext = initContext();
+  public WeightedPolicyInfo() {
+    // JAXB needs this
+  }
 
   private static JSONJAXBContext initContext() {
     try {
@@ -74,46 +77,6 @@ public class WeightedPolicyInfo {
     return null;
   }
 
-  public WeightedPolicyInfo() {
-    //JAXB needs this
-  }
-
-  /**
-   * Setter method for Router weights.
-   *
-   * @param policyWeights the router weights.
-   */
-  public void setRouterPolicyWeights(
-      Map<SubClusterIdInfo, Float> policyWeights) {
-    this.routerPolicyWeights = policyWeights;
-  }
-
-  /**
-   * Setter method for ARMRMProxy weights.
-   *
-   * @param policyWeights the amrmproxy weights.
-   */
-  public void setAMRMPolicyWeights(
-      Map<SubClusterIdInfo, Float> policyWeights) {
-    this.amrmPolicyWeights = policyWeights;
-  }
-
-  /**
-   * Getter of the router weights.
-   * @return the router weights.
-   */
-  public Map<SubClusterIdInfo, Float> getRouterPolicyWeights() {
-    return routerPolicyWeights;
-  }
-
-  /**
-   * Getter for AMRMProxy weights.
-   * @return the AMRMProxy weights.
-   */
-  public Map<SubClusterIdInfo, Float> getAMRMPolicyWeights() {
-    return amrmPolicyWeights;
-  }
-
   /**
    * Deserializes a {@link WeightedPolicyInfo} from a byte UTF-8 JSON
    * representation.
@@ -123,14 +86,14 @@ public class WeightedPolicyInfo {
    * @return the {@link WeightedPolicyInfo} represented.
    *
    * @throws FederationPolicyInitializationException if a deserializaiton error
-   *                                                 occurs.
+   *           occurs.
    */
   public static WeightedPolicyInfo fromByteBuffer(ByteBuffer bb)
       throws FederationPolicyInitializationException {
 
     if (jsonjaxbContext == null) {
-      throw new FederationPolicyInitializationException("JSONJAXBContext should"
-          + " not be null.");
+      throw new FederationPolicyInitializationException(
+          "JSONJAXBContext should" + " not be null.");
     }
 
     try {
@@ -139,9 +102,8 @@ public class WeightedPolicyInfo {
       bb.get(bytes);
       String params = new String(bytes, Charset.forName("UTF-8"));
 
-      WeightedPolicyInfo weightedPolicyInfo = unmarshaller
-          .unmarshalFromJSON(new StringReader(params),
-              WeightedPolicyInfo.class);
+      WeightedPolicyInfo weightedPolicyInfo = unmarshaller.unmarshalFromJSON(
+          new StringReader(params), WeightedPolicyInfo.class);
       return weightedPolicyInfo;
     } catch (JAXBException j) {
       throw new FederationPolicyInitializationException(j);
@@ -149,19 +111,56 @@ public class WeightedPolicyInfo {
   }
 
   /**
-   * Converts the policy into a byte array representation in the input {@link
-   * ByteBuffer}.
+   * Getter of the router weights.
+   *
+   * @return the router weights.
+   */
+  public Map<SubClusterIdInfo, Float> getRouterPolicyWeights() {
+    return routerPolicyWeights;
+  }
+
+  /**
+   * Setter method for Router weights.
+   *
+   * @param policyWeights the router weights.
+   */
+  public void setRouterPolicyWeights(
+      Map<SubClusterIdInfo, Float> policyWeights) {
+    this.routerPolicyWeights = policyWeights;
+  }
+
+  /**
+   * Getter for AMRMProxy weights.
+   *
+   * @return the AMRMProxy weights.
+   */
+  public Map<SubClusterIdInfo, Float> getAMRMPolicyWeights() {
+    return amrmPolicyWeights;
+  }
+
+  /**
+   * Setter method for ARMRMProxy weights.
+   *
+   * @param policyWeights the amrmproxy weights.
+   */
+  public void setAMRMPolicyWeights(Map<SubClusterIdInfo, Float> policyWeights) {
+    this.amrmPolicyWeights = policyWeights;
+  }
+
+  /**
+   * Converts the policy into a byte array representation in the input
+   * {@link ByteBuffer}.
    *
    * @return byte array representation of this policy configuration.
    *
    * @throws FederationPolicyInitializationException if a serialization error
-   *                                                 occurs.
+   *           occurs.
    */
   public ByteBuffer toByteBuffer()
       throws FederationPolicyInitializationException {
     if (jsonjaxbContext == null) {
-      throw new FederationPolicyInitializationException("JSONJAXBContext should"
-          + " not be null.");
+      throw new FederationPolicyInitializationException(
+          "JSONJAXBContext should" + " not be null.");
     }
     try {
       String s = toJSONString();
@@ -186,22 +185,21 @@ public class WeightedPolicyInfo {
       return false;
     }
 
-    WeightedPolicyInfo otherPolicy =
-        (WeightedPolicyInfo) other;
+    WeightedPolicyInfo otherPolicy = (WeightedPolicyInfo) other;
     Map<SubClusterIdInfo, Float> otherAMRMWeights =
         otherPolicy.getAMRMPolicyWeights();
     Map<SubClusterIdInfo, Float> otherRouterWeights =
         otherPolicy.getRouterPolicyWeights();
 
-    boolean amrmWeightsMatch = otherAMRMWeights != null &&
-        getAMRMPolicyWeights() != null &&
-        CollectionUtils.isEqualCollection(otherAMRMWeights.entrySet(),
-            getAMRMPolicyWeights().entrySet());
+    boolean amrmWeightsMatch =
+        otherAMRMWeights != null && getAMRMPolicyWeights() != null
+            && CollectionUtils.isEqualCollection(otherAMRMWeights.entrySet(),
+                getAMRMPolicyWeights().entrySet());
 
-    boolean routerWeightsMatch = otherRouterWeights != null &&
-        getRouterPolicyWeights() != null &&
-        CollectionUtils.isEqualCollection(otherRouterWeights.entrySet(),
-            getRouterPolicyWeights().entrySet());
+    boolean routerWeightsMatch =
+        otherRouterWeights != null && getRouterPolicyWeights() != null
+            && CollectionUtils.isEqualCollection(otherRouterWeights.entrySet(),
+                getRouterPolicyWeights().entrySet());
 
     return amrmWeightsMatch && routerWeightsMatch;
   }
@@ -215,10 +213,10 @@ public class WeightedPolicyInfo {
    * Return the parameter headroomAlpha, used by policies that balance
    * weight-based and load-based considerations in their decisions.
    *
-   * For policies that use this parameter, values close to 1 indicate that
-   * most of the decision should be based on currently observed headroom from
-   * various sub-clusters, values close to zero, indicate that the decision
-   * should be mostly based on weights and practically ignore current load.
+   * For policies that use this parameter, values close to 1 indicate that most
+   * of the decision should be based on currently observed headroom from various
+   * sub-clusters, values close to zero, indicate that the decision should be
+   * mostly based on weights and practically ignore current load.
    *
    * @return the value of headroomAlpha.
    */
@@ -227,13 +225,13 @@ public class WeightedPolicyInfo {
   }
 
   /**
-   * Set the parameter headroomAlpha, used by policies that balance
-   * weight-based and load-based considerations in their decisions.
+   * Set the parameter headroomAlpha, used by policies that balance weight-based
+   * and load-based considerations in their decisions.
    *
-   * For policies that use this parameter, values close to 1 indicate that
-   * most of the decision should be based on currently observed headroom from
-   * various sub-clusters, values close to zero, indicate that the decision
-   * should be mostly based on weights and practically ignore current load.
+   * For policies that use this parameter, values close to 1 indicate that most
+   * of the decision should be based on currently observed headroom from various
+   * sub-clusters, values close to zero, indicate that the decision should be
+   * mostly based on weights and practically ignore current load.
    *
    * @param headroomAlpha the value to use for balancing.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11c53365/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java
index 43f5b83..c292e52 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java
@@ -17,4 +17,3 @@
  */
 /** DAO objects for serializing/deserializing policy configurations. **/
 package org.apache.hadoop.yarn.server.federation.policies.dao;
-

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11c53365/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/exceptions/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/exceptions/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/exceptions/package-info.java
index 3318da9..ad2d543 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/exceptions/package-info.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/exceptions/package-info.java
@@ -17,4 +17,3 @@
  */
 /** Exceptions for policies. **/
 package org.apache.hadoop.yarn.server.federation.policies.exceptions;
-

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11c53365/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/package-info.java
index 7d9a121..fa3fcc5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/package-info.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/package-info.java
@@ -17,4 +17,3 @@
  */
 /** Federation Policies. **/
 package org.apache.hadoop.yarn.server.federation.policies;
-

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11c53365/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java
new file mode 100644
index 0000000..f49af1d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.router;
+
+import java.util.Map;
+
+import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+
+/**
+ * Base abstract class for {@link FederationRouterPolicy} implementations, that
+ * provides common validation for reinitialization.
+ */
+public abstract class AbstractRouterPolicy extends
+    AbstractConfigurableFederationPolicy implements FederationRouterPolicy {
+
+  @Override
+  public void validate(WeightedPolicyInfo newPolicyInfo)
+      throws FederationPolicyInitializationException {
+    super.validate(newPolicyInfo);
+    Map<SubClusterIdInfo, Float> newWeights =
+        newPolicyInfo.getRouterPolicyWeights();
+    if (newWeights == null || newWeights.size() < 1) {
+      throw new FederationPolicyInitializationException(
+          "Weight vector cannot be null/empty.");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11c53365/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseWeightedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseWeightedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseWeightedRouterPolicy.java
deleted file mode 100644
index e888979..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseWeightedRouterPolicy.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.policies.router;
-
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
-import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
-import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
-import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
-import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
-
-import java.util.Map;
-
-/**
- * Abstract class provides common validation of reinitialize(), for all
- * policies that are "weight-based".
- */
-public abstract class BaseWeightedRouterPolicy
-    implements FederationRouterPolicy {
-
-  private WeightedPolicyInfo policyInfo = null;
-  private FederationPolicyInitializationContext policyContext;
-
-  public BaseWeightedRouterPolicy() {
-  }
-
-  @Override
-  public void reinitialize(FederationPolicyInitializationContext
-      federationPolicyContext)
-      throws FederationPolicyInitializationException {
-    FederationPolicyInitializationContextValidator
-        .validate(federationPolicyContext, this.getClass().getCanonicalName());
-
-    // perform consistency checks
-    WeightedPolicyInfo newPolicyInfo = WeightedPolicyInfo
-        .fromByteBuffer(
-            federationPolicyContext.getSubClusterPolicyConfiguration()
-                .getParams());
-
-    // if nothing has changed skip the rest of initialization
-    if (policyInfo != null && policyInfo.equals(newPolicyInfo)) {
-      return;
-    }
-
-    validate(newPolicyInfo);
-    setPolicyInfo(newPolicyInfo);
-    this.policyContext = federationPolicyContext;
-  }
-
-  /**
-   * Overridable validation step for the policy configuration.
-   * @param newPolicyInfo the configuration to test.
-   * @throws FederationPolicyInitializationException if the configuration is
-   * not valid.
-   */
-  public void validate(WeightedPolicyInfo newPolicyInfo) throws
-      FederationPolicyInitializationException {
-    if (newPolicyInfo == null) {
-      throw new FederationPolicyInitializationException("The policy to "
-          + "validate should not be null.");
-    }
-    Map<SubClusterIdInfo, Float> newWeights =
-        newPolicyInfo.getRouterPolicyWeights();
-    if (newWeights == null || newWeights.size() < 1) {
-      throw new FederationPolicyInitializationException(
-          "Weight vector cannot be null/empty.");
-    }
-  }
-
-
-  /**
-   * Getter method for the configuration weights.
-   *
-   * @return the {@link WeightedPolicyInfo} representing the policy
-   * configuration.
-   */
-  public WeightedPolicyInfo getPolicyInfo() {
-    return policyInfo;
-  }
-
-  /**
-   * Setter method for the configuration weights.
-   *
-   * @param policyInfo the {@link WeightedPolicyInfo} representing the policy
-   *                   configuration.
-   */
-  public void setPolicyInfo(
-      WeightedPolicyInfo policyInfo) {
-    this.policyInfo = policyInfo;
-  }
-
-  /**
-   * Getter method for the {@link FederationPolicyInitializationContext}.
-   * @return the context for this policy.
-   */
-  public FederationPolicyInitializationContext getPolicyContext() {
-    return policyContext;
-  }
-
-  /**
-   * Setter method for the {@link FederationPolicyInitializationContext}.
-   * @param policyContext the context to assign to this policy.
-   */
-  public void setPolicyContext(
-      FederationPolicyInitializationContext policyContext) {
-    this.policyContext = policyContext;
-  }
-
-  /**
-   * This methods gets active subclusters map from the {@code
-   * FederationStateStoreFacade} and validate it not being null/empty.
-   *
-   * @return the map of ids to info for all active subclusters.
-   * @throws YarnException if we can't get the list.
-   */
-  protected Map<SubClusterId, SubClusterInfo> getActiveSubclusters()
-      throws YarnException {
-
-    Map<SubClusterId, SubClusterInfo> activeSubclusters = getPolicyContext()
-        .getFederationStateStoreFacade().getSubClusters(true);
-
-    if (activeSubclusters == null || activeSubclusters.size() < 1) {
-      throw new NoActiveSubclustersException(
-          "Zero active subclusters, cannot pick where to send job.");
-    }
-    return activeSubclusters;
-  }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11c53365/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java
index 42c86cc..90ea0a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java
@@ -35,11 +35,10 @@ public interface FederationRouterPolicy extends ConfigurableFederationPolicy {
    * @param appSubmissionContext the context for the app being submitted.
    *
    * @return the sub-cluster as identified by {@link SubClusterId} to route the
-   * request to.
+   *         request to.
    *
    * @throws YarnException if the policy cannot determine a viable subcluster.
    */
   SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext)
-      throws YarnException;
+      ApplicationSubmissionContext appSubmissionContext) throws YarnException;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org