You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/09/22 00:58:34 UTC

[13/50] [abbrv] hadoop git commit: YARN-5676. Add a HashBasedRouterPolicy, and small policies and test refactoring. (Carlo Curino via Subru).

YARN-5676. Add a HashBasedRouterPolicy, and small policies and test refactoring. (Carlo Curino via Subru).

(cherry picked from commit 575137f41c27eb72d05d923337f3030a35403e8f)
(cherry picked from commit 4128c9522dcdc16bb3527f74a48ed1242458a165)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d32ffa9e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d32ffa9e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d32ffa9e

Branch: refs/heads/branch-2
Commit: d32ffa9e5ebb55b1fed4948f3750ce2159d235b4
Parents: d87d2b5
Author: Subru Krishnan <su...@apache.org>
Authored: Tue Nov 22 15:02:22 2016 -0800
Committer: Carlo Curino <cu...@apache.org>
Committed: Thu Sep 21 16:23:45 2017 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |   3 +-
 .../policies/AbstractPolicyManager.java         | 175 -----------------
 .../policies/FederationPolicyManager.java       | 117 ------------
 .../PriorityBroadcastPolicyManager.java         |  66 -------
 .../federation/policies/RouterPolicyFacade.java |   1 +
 .../policies/UniformBroadcastPolicyManager.java |  56 ------
 .../policies/WeightedLocalityPolicyManager.java |  67 -------
 .../policies/manager/AbstractPolicyManager.java | 190 +++++++++++++++++++
 .../manager/FederationPolicyManager.java        | 118 ++++++++++++
 .../manager/HashBroadcastPolicyManager.java     |  38 ++++
 .../manager/PriorityBroadcastPolicyManager.java |  66 +++++++
 .../manager/UniformBroadcastPolicyManager.java  |  44 +++++
 .../manager/WeightedLocalityPolicyManager.java  |  67 +++++++
 .../policies/manager/package-info.java          |  19 ++
 .../policies/router/AbstractRouterPolicy.java   |  19 ++
 .../policies/router/HashBasedRouterPolicy.java  |  81 ++++++++
 .../policies/router/LoadBasedRouterPolicy.java  |   3 +
 .../policies/router/PriorityRouterPolicy.java   |   3 +
 .../router/UniformRandomRouterPolicy.java       |  10 +-
 .../router/WeightedRandomRouterPolicy.java      |   3 +
 .../policies/BaseFederationPoliciesTest.java    |  17 +-
 .../policies/BasePolicyManagerTest.java         | 108 -----------
 ...ionPolicyInitializationContextValidator.java |   1 +
 .../TestPriorityBroadcastPolicyManager.java     |  72 -------
 .../policies/TestRouterPolicyFacade.java        |   2 +
 .../TestUniformBroadcastPolicyManager.java      |  40 ----
 .../TestWeightedLocalityPolicyManager.java      |  79 --------
 .../policies/manager/BasePolicyManagerTest.java | 104 ++++++++++
 .../TestHashBasedBroadcastPolicyManager.java    |  40 ++++
 .../TestPriorityBroadcastPolicyManager.java     |  72 +++++++
 .../TestUniformBroadcastPolicyManager.java      |  40 ++++
 .../TestWeightedLocalityPolicyManager.java      |  79 ++++++++
 .../policies/router/BaseRouterPoliciesTest.java |  51 +++++
 .../router/TestHashBasedRouterPolicy.java       |  83 ++++++++
 .../router/TestLoadBasedRouterPolicy.java       |   3 +-
 .../router/TestPriorityRouterPolicy.java        |   3 +-
 .../router/TestUniformRandomRouterPolicy.java   |   3 +-
 .../router/TestWeightedRandomRouterPolicy.java  |  15 +-
 38 files changed, 1160 insertions(+), 798 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 055428d..906d632 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2547,7 +2547,8 @@ public class YarnConfiguration extends Configuration {
       + "policy-manager";
 
   public static final String DEFAULT_FEDERATION_POLICY_MANAGER = "org.apache"
-      + ".hadoop.yarn.server.federation.policies.UniformBroadcastPolicyManager";
+      + ".hadoop.yarn.server.federation.policies"
+      + ".manager.UniformBroadcastPolicyManager";
 
   public static final String FEDERATION_POLICY_MANAGER_PARAMS =
       FEDERATION_PREFIX + "policy-manager-params";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java
deleted file mode 100644
index e77f2e3..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.policies;
-
-import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
-import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
-import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class provides basic implementation for common methods that multiple
- * policies will need to implement.
- */
-public abstract class AbstractPolicyManager implements
-    FederationPolicyManager {
-
-  private String queue;
-  @SuppressWarnings("checkstyle:visibilitymodifier")
-  protected Class routerFederationPolicy;
-  @SuppressWarnings("checkstyle:visibilitymodifier")
-  protected Class amrmProxyFederationPolicy;
-
-  public static final Logger LOG =
-      LoggerFactory.getLogger(AbstractPolicyManager.class);
-  /**
-   * This default implementation validates the
-   * {@link FederationPolicyInitializationContext},
-   * then checks whether it needs to reinstantiate the class (null or
-   * mismatching type), and reinitialize the policy.
-   *
-   * @param federationPolicyContext the current context
-   * @param oldInstance             the existing (possibly null) instance.
-   *
-   * @return a valid and fully reinitalized {@link FederationAMRMProxyPolicy}
-   * instance
-   *
-   * @throws FederationPolicyInitializationException if the reinitalization is
-   *                                                 not valid, and ensure
-   *                                                 previous state is preserved
-   */
-  public FederationAMRMProxyPolicy getAMRMPolicy(
-      FederationPolicyInitializationContext federationPolicyContext,
-      FederationAMRMProxyPolicy oldInstance)
-      throws FederationPolicyInitializationException {
-
-    if (amrmProxyFederationPolicy == null) {
-      throw new FederationPolicyInitializationException("The parameter "
-          + "amrmProxyFederationPolicy should be initialized in "
-          + this.getClass().getSimpleName() + " constructor.");
-    }
-
-    try {
-      return (FederationAMRMProxyPolicy) internalPolicyGetter(
-          federationPolicyContext, oldInstance, amrmProxyFederationPolicy);
-    } catch (ClassCastException e) {
-      throw new FederationPolicyInitializationException(e);
-    }
-
-  }
-
-  /**
-   * This default implementation validates the
-   * {@link FederationPolicyInitializationContext},
-   * then checks whether it needs to reinstantiate the class (null or
-   * mismatching type), and reinitialize the policy.
-   *
-   * @param federationPolicyContext the current context
-   * @param oldInstance             the existing (possibly null) instance.
-   *
-   * @return a valid and fully reinitalized {@link FederationRouterPolicy}
-   * instance
-   *
-   * @throws FederationPolicyInitializationException if the reinitalization is
-   *                                                 not valid, and ensure
-   *                                                 previous state is preserved
-   */
-
-  public FederationRouterPolicy getRouterPolicy(
-      FederationPolicyInitializationContext federationPolicyContext,
-      FederationRouterPolicy oldInstance)
-      throws FederationPolicyInitializationException {
-
-    //checks that sub-types properly initialize the types of policies
-    if (routerFederationPolicy == null) {
-      throw new FederationPolicyInitializationException("The policy "
-          + "type should be initialized in " + this.getClass().getSimpleName()
-          + " constructor.");
-    }
-
-    try {
-      return (FederationRouterPolicy) internalPolicyGetter(
-          federationPolicyContext, oldInstance, routerFederationPolicy);
-    } catch (ClassCastException e) {
-      throw new FederationPolicyInitializationException(e);
-    }
-  }
-
-  @Override
-  public String getQueue() {
-    return queue;
-  }
-
-  @Override
-  public void setQueue(String queue) {
-    this.queue = queue;
-  }
-
-  /**
-   * Common functionality to instantiate a reinitialize a {@link
-   * ConfigurableFederationPolicy}.
-   */
-  private ConfigurableFederationPolicy internalPolicyGetter(
-      final FederationPolicyInitializationContext federationPolicyContext,
-      ConfigurableFederationPolicy oldInstance, Class policy)
-      throws FederationPolicyInitializationException {
-
-    FederationPolicyInitializationContextValidator
-        .validate(federationPolicyContext, this.getClass().getCanonicalName());
-
-    if (oldInstance == null || !oldInstance.getClass().equals(policy)) {
-      try {
-        oldInstance = (ConfigurableFederationPolicy) policy.newInstance();
-      } catch (InstantiationException e) {
-        throw new FederationPolicyInitializationException(e);
-      } catch (IllegalAccessException e) {
-        throw new FederationPolicyInitializationException(e);
-      }
-    }
-
-    //copying the context to avoid side-effects
-    FederationPolicyInitializationContext modifiedContext =
-        updateContext(federationPolicyContext,
-            oldInstance.getClass().getCanonicalName());
-
-    oldInstance.reinitialize(modifiedContext);
-    return oldInstance;
-  }
-
-  /**
-   * This method is used to copy-on-write the context, that will be passed
-   * downstream to the router/amrmproxy policies.
-   */
-  private FederationPolicyInitializationContext updateContext(
-      FederationPolicyInitializationContext federationPolicyContext,
-      String type) {
-    // copying configuration and context to avoid modification of original
-    SubClusterPolicyConfiguration newConf = SubClusterPolicyConfiguration
-        .newInstance(federationPolicyContext
-            .getSubClusterPolicyConfiguration());
-    newConf.setType(type);
-
-    return new FederationPolicyInitializationContext(newConf,
-                  federationPolicyContext.getFederationSubclusterResolver(),
-                  federationPolicyContext.getFederationStateStoreFacade(),
-                  federationPolicyContext.getHomeSubcluster());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java
deleted file mode 100644
index 39fdba3..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.policies;
-
-import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
-import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
-import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
-
-/**
- *
- * Implementors need to provide the ability to serliaze a policy and its
- * configuration as a {@link SubClusterPolicyConfiguration}, as well as provide
- * (re)initialization mechanics for the underlying
- * {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy}.
- *
- * The serialization aspects are used by admin APIs or a policy engine to store
- * a serialized configuration in the {@code FederationStateStore}, while the
- * getters methods are used to obtain a propertly inizialized policy in the
- * {@code Router} and {@code AMRMProxy} respectively.
- *
- * This interface by design binds together {@link FederationAMRMProxyPolicy} and
- * {@link FederationRouterPolicy} and provide lifecycle support for
- * serialization and deserialization, to reduce configuration mistakes
- * (combining incompatible policies).
- *
- */
-public interface FederationPolicyManager {
-
-  /**
-   * If the current instance is compatible, this method returns the same
-   * instance of {@link FederationAMRMProxyPolicy} reinitialized with the
-   * current context, otherwise a new instance initialized with the current
-   * context is provided. If the instance is compatible with the current class
-   * the implementors should attempt to reinitalize (retaining state). To affect
-   * a complete policy reset oldInstance should be null.
-   *
-   * @param policyContext the current context
-   * @param oldInstance the existing (possibly null) instance.
-   *
-   * @return an updated {@link FederationAMRMProxyPolicy }.
-   *
-   * @throws FederationPolicyInitializationException if the initialization
-   *           cannot be completed properly. The oldInstance should be still
-   *           valid in case of failed initialization.
-   */
-  FederationAMRMProxyPolicy getAMRMPolicy(
-      FederationPolicyInitializationContext policyContext,
-      FederationAMRMProxyPolicy oldInstance)
-      throws FederationPolicyInitializationException;
-
-  /**
-   * If the current instance is compatible, this method returns the same
-   * instance of {@link FederationRouterPolicy} reinitialized with the current
-   * context, otherwise a new instance initialized with the current context is
-   * provided. If the instance is compatible with the current class the
-   * implementors should attempt to reinitalize (retaining state). To affect a
-   * complete policy reset oldInstance shoulb be set to null.
-   *
-   * @param policyContext the current context
-   * @param oldInstance the existing (possibly null) instance.
-   *
-   * @return an updated {@link FederationRouterPolicy}.
-   *
-   * @throws FederationPolicyInitializationException if the initalization cannot
-   *           be completed properly. The oldInstance should be still valid in
-   *           case of failed initialization.
-   */
-  FederationRouterPolicy getRouterPolicy(
-      FederationPolicyInitializationContext policyContext,
-      FederationRouterPolicy oldInstance)
-      throws FederationPolicyInitializationException;
-
-  /**
-   * This method is invoked to derive a {@link SubClusterPolicyConfiguration}.
-   * This is to be used when writing a policy object in the federation policy
-   * store.
-   *
-   * @return a valid policy configuration representing this object
-   *         parametrization.
-   *
-   * @throws FederationPolicyInitializationException if the current state cannot
-   *           be serialized properly
-   */
-  SubClusterPolicyConfiguration serializeConf()
-      throws FederationPolicyInitializationException;
-
-  /**
-   * This method returns the queue this policy is configured for.
-   *
-   * @return the name of the queue.
-   */
-  String getQueue();
-
-  /**
-   * This methods provides a setter for the queue this policy is specified for.
-   *
-   * @param queue the name of the queue.
-   */
-  void setQueue(String queue);
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java
deleted file mode 100644
index ebdcf42..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.policies;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
-import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
-import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
-import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Policy that allows operator to configure "weights" for routing. This picks a
- * {@link PriorityRouterPolicy} for the router and a
- * {@link BroadcastAMRMProxyPolicy} for the amrmproxy as they are designed to
- * work together.
- */
-public class PriorityBroadcastPolicyManager extends AbstractPolicyManager {
-
-  private WeightedPolicyInfo weightedPolicyInfo;
-
-  public PriorityBroadcastPolicyManager() {
-    // this structurally hard-codes two compatible policies for Router and
-    // AMRMProxy.
-    routerFederationPolicy = PriorityRouterPolicy.class;
-    amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
-    weightedPolicyInfo = new WeightedPolicyInfo();
-  }
-
-  @Override
-  public SubClusterPolicyConfiguration serializeConf()
-      throws FederationPolicyInitializationException {
-    ByteBuffer buf = weightedPolicyInfo.toByteBuffer();
-    return SubClusterPolicyConfiguration.newInstance(getQueue(),
-        this.getClass().getCanonicalName(), buf);
-  }
-
-  @VisibleForTesting
-  public WeightedPolicyInfo getWeightedPolicyInfo() {
-    return weightedPolicyInfo;
-  }
-
-  @VisibleForTesting
-  public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) {
-    this.weightedPolicyInfo = weightedPolicyInfo;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
index a3fd15a..8c22623 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
 import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
 import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java
deleted file mode 100644
index a01f8fa..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.policies;
-
-import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
-import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
-import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
-
-import java.nio.ByteBuffer;
-
-/**
- * This class represents a simple implementation of a {@code
- * FederationPolicyManager}.
- *
- * It combines the basic policies: {@link UniformRandomRouterPolicy} and
- * {@link BroadcastAMRMProxyPolicy}, which are designed to work together and
- * "spread" the load among sub-clusters uniformly.
- *
- * This simple policy might impose heavy load on the RMs and return more
- * containers than a job requested as all requests are (replicated and)
- * broadcasted.
- */
-public class UniformBroadcastPolicyManager
-    extends AbstractPolicyManager {
-
-  public UniformBroadcastPolicyManager() {
-    //this structurally hard-codes two compatible policies for Router and
-    // AMRMProxy.
-    routerFederationPolicy = UniformRandomRouterPolicy.class;
-    amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
-  }
-
-  @Override
-  public SubClusterPolicyConfiguration serializeConf()
-      throws FederationPolicyInitializationException {
-    ByteBuffer buf = ByteBuffer.allocate(0);
-    return SubClusterPolicyConfiguration
-        .newInstance(getQueue(), this.getClass().getCanonicalName(), buf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java
deleted file mode 100644
index f3c6673..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.policies;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
-import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
-import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
-import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
-
-import java.nio.ByteBuffer;
-
-/**
- * Policy that allows operator to configure "weights" for routing. This picks a
- * {@link WeightedRandomRouterPolicy} for the router and a {@link
- * LocalityMulticastAMRMProxyPolicy} for the amrmproxy as they are designed to
- * work together.
- */
-public class WeightedLocalityPolicyManager
-    extends AbstractPolicyManager {
-
-  private WeightedPolicyInfo weightedPolicyInfo;
-
-  public WeightedLocalityPolicyManager() {
-    //this structurally hard-codes two compatible policies for Router and
-    // AMRMProxy.
-    routerFederationPolicy =  WeightedRandomRouterPolicy.class;
-    amrmProxyFederationPolicy = LocalityMulticastAMRMProxyPolicy.class;
-    weightedPolicyInfo = new WeightedPolicyInfo();
-  }
-
-  @Override
-  public SubClusterPolicyConfiguration serializeConf()
-      throws FederationPolicyInitializationException {
-    ByteBuffer buf = weightedPolicyInfo.toByteBuffer();
-    return SubClusterPolicyConfiguration
-        .newInstance(getQueue(), this.getClass().getCanonicalName(), buf);
-  }
-
-  @VisibleForTesting
-  public WeightedPolicyInfo getWeightedPolicyInfo() {
-    return weightedPolicyInfo;
-  }
-
-  @VisibleForTesting
-  public void setWeightedPolicyInfo(
-      WeightedPolicyInfo weightedPolicyInfo) {
-    this.weightedPolicyInfo = weightedPolicyInfo;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/AbstractPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/AbstractPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/AbstractPolicyManager.java
new file mode 100644
index 0000000..f7a89c6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/AbstractPolicyManager.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.manager;
+
+import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This class provides basic implementation for common methods that multiple
+ * policies will need to implement.
+ */
+public abstract class AbstractPolicyManager implements
+    FederationPolicyManager {
+
+  private String queue;
+  @SuppressWarnings("checkstyle:visibilitymodifier")
+  protected Class routerFederationPolicy;
+  @SuppressWarnings("checkstyle:visibilitymodifier")
+  protected Class amrmProxyFederationPolicy;
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(AbstractPolicyManager.class);
+  /**
+   * This default implementation validates the
+   * {@link FederationPolicyInitializationContext},
+   * then checks whether it needs to reinstantiate the class (null or
+   * mismatching type), and reinitialize the policy.
+   *
+   * @param federationPolicyContext the current context
+   * @param oldInstance             the existing (possibly null) instance.
+   *
+   * @return a valid and fully reinitalized {@link FederationAMRMProxyPolicy}
+   * instance
+   *
+   * @throws FederationPolicyInitializationException if the reinitalization is
+   *                                                 not valid, and ensure
+   *                                                 previous state is preserved
+   */
+  public FederationAMRMProxyPolicy getAMRMPolicy(
+      FederationPolicyInitializationContext federationPolicyContext,
+      FederationAMRMProxyPolicy oldInstance)
+      throws FederationPolicyInitializationException {
+
+    if (amrmProxyFederationPolicy == null) {
+      throw new FederationPolicyInitializationException("The parameter "
+          + "amrmProxyFederationPolicy should be initialized in "
+          + this.getClass().getSimpleName() + " constructor.");
+    }
+
+    try {
+      return (FederationAMRMProxyPolicy) internalPolicyGetter(
+          federationPolicyContext, oldInstance, amrmProxyFederationPolicy);
+    } catch (ClassCastException e) {
+      throw new FederationPolicyInitializationException(e);
+    }
+
+  }
+
+  /**
+   * This default implementation validates the
+   * {@link FederationPolicyInitializationContext},
+   * then checks whether it needs to reinstantiate the class (null or
+   * mismatching type), and reinitialize the policy.
+   *
+   * @param federationPolicyContext the current context
+   * @param oldInstance             the existing (possibly null) instance.
+   *
+   * @return a valid and fully reinitalized {@link FederationRouterPolicy}
+   * instance
+   *
+   * @throws FederationPolicyInitializationException if the reinitalization is
+   *                                                 not valid, and ensure
+   *                                                 previous state is preserved
+   */
+
+  public FederationRouterPolicy getRouterPolicy(
+      FederationPolicyInitializationContext federationPolicyContext,
+      FederationRouterPolicy oldInstance)
+      throws FederationPolicyInitializationException {
+
+    //checks that sub-types properly initialize the types of policies
+    if (routerFederationPolicy == null) {
+      throw new FederationPolicyInitializationException("The policy "
+          + "type should be initialized in " + this.getClass().getSimpleName()
+          + " constructor.");
+    }
+
+    try {
+      return (FederationRouterPolicy) internalPolicyGetter(
+          federationPolicyContext, oldInstance, routerFederationPolicy);
+    } catch (ClassCastException e) {
+      throw new FederationPolicyInitializationException(e);
+    }
+  }
+
+  @Override
+  public SubClusterPolicyConfiguration serializeConf()
+      throws FederationPolicyInitializationException {
+    // default implementation works only for sub-classes which do not require
+    // any parameters
+    ByteBuffer buf = ByteBuffer.allocate(0);
+    return SubClusterPolicyConfiguration
+        .newInstance(getQueue(), this.getClass().getCanonicalName(), buf);
+  }
+
+  @Override
+  public String getQueue() {
+    return queue;
+  }
+
+  @Override
+  public void setQueue(String queue) {
+    this.queue = queue;
+  }
+
+  /**
+   * Common functionality to instantiate a reinitialize a {@link
+   * ConfigurableFederationPolicy}.
+   */
+  private ConfigurableFederationPolicy internalPolicyGetter(
+      final FederationPolicyInitializationContext federationPolicyContext,
+      ConfigurableFederationPolicy oldInstance, Class policy)
+      throws FederationPolicyInitializationException {
+
+    FederationPolicyInitializationContextValidator
+        .validate(federationPolicyContext, this.getClass().getCanonicalName());
+
+    if (oldInstance == null || !oldInstance.getClass().equals(policy)) {
+      try {
+        oldInstance = (ConfigurableFederationPolicy) policy.newInstance();
+      } catch (InstantiationException e) {
+        throw new FederationPolicyInitializationException(e);
+      } catch (IllegalAccessException e) {
+        throw new FederationPolicyInitializationException(e);
+      }
+    }
+
+    //copying the context to avoid side-effects
+    FederationPolicyInitializationContext modifiedContext =
+        updateContext(federationPolicyContext,
+            oldInstance.getClass().getCanonicalName());
+
+    oldInstance.reinitialize(modifiedContext);
+    return oldInstance;
+  }
+
+  /**
+   * This method is used to copy-on-write the context, that will be passed
+   * downstream to the router/amrmproxy policies.
+   */
+  private FederationPolicyInitializationContext updateContext(
+      FederationPolicyInitializationContext federationPolicyContext,
+      String type) {
+    // copying configuration and context to avoid modification of original
+    SubClusterPolicyConfiguration newConf = SubClusterPolicyConfiguration
+        .newInstance(federationPolicyContext
+            .getSubClusterPolicyConfiguration());
+    newConf.setType(type);
+
+    return new FederationPolicyInitializationContext(newConf,
+                  federationPolicyContext.getFederationSubclusterResolver(),
+                  federationPolicyContext.getFederationStateStoreFacade(),
+                  federationPolicyContext.getHomeSubcluster());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/FederationPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/FederationPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/FederationPolicyManager.java
new file mode 100644
index 0000000..1434c80
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/FederationPolicyManager.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.manager;
+
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+
+/**
+ *
+ * Implementors need to provide the ability to serliaze a policy and its
+ * configuration as a {@link SubClusterPolicyConfiguration}, as well as provide
+ * (re)initialization mechanics for the underlying
+ * {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy}.
+ *
+ * The serialization aspects are used by admin APIs or a policy engine to store
+ * a serialized configuration in the {@code FederationStateStore}, while the
+ * getters methods are used to obtain a propertly inizialized policy in the
+ * {@code Router} and {@code AMRMProxy} respectively.
+ *
+ * This interface by design binds together {@link FederationAMRMProxyPolicy} and
+ * {@link FederationRouterPolicy} and provide lifecycle support for
+ * serialization and deserialization, to reduce configuration mistakes
+ * (combining incompatible policies).
+ *
+ */
+public interface FederationPolicyManager {
+
+  /**
+   * If the current instance is compatible, this method returns the same
+   * instance of {@link FederationAMRMProxyPolicy} reinitialized with the
+   * current context, otherwise a new instance initialized with the current
+   * context is provided. If the instance is compatible with the current class
+   * the implementors should attempt to reinitalize (retaining state). To affect
+   * a complete policy reset oldInstance should be null.
+   *
+   * @param policyContext the current context
+   * @param oldInstance the existing (possibly null) instance.
+   *
+   * @return an updated {@link FederationAMRMProxyPolicy }.
+   *
+   * @throws FederationPolicyInitializationException if the initialization
+   *           cannot be completed properly. The oldInstance should be still
+   *           valid in case of failed initialization.
+   */
+  FederationAMRMProxyPolicy getAMRMPolicy(
+      FederationPolicyInitializationContext policyContext,
+      FederationAMRMProxyPolicy oldInstance)
+      throws FederationPolicyInitializationException;
+
+  /**
+   * If the current instance is compatible, this method returns the same
+   * instance of {@link FederationRouterPolicy} reinitialized with the current
+   * context, otherwise a new instance initialized with the current context is
+   * provided. If the instance is compatible with the current class the
+   * implementors should attempt to reinitalize (retaining state). To affect a
+   * complete policy reset oldInstance shoulb be set to null.
+   *
+   * @param policyContext the current context
+   * @param oldInstance the existing (possibly null) instance.
+   *
+   * @return an updated {@link FederationRouterPolicy}.
+   *
+   * @throws FederationPolicyInitializationException if the initalization cannot
+   *           be completed properly. The oldInstance should be still valid in
+   *           case of failed initialization.
+   */
+  FederationRouterPolicy getRouterPolicy(
+      FederationPolicyInitializationContext policyContext,
+      FederationRouterPolicy oldInstance)
+      throws FederationPolicyInitializationException;
+
+  /**
+   * This method is invoked to derive a {@link SubClusterPolicyConfiguration}.
+   * This is to be used when writing a policy object in the federation policy
+   * store.
+   *
+   * @return a valid policy configuration representing this object
+   *         parametrization.
+   *
+   * @throws FederationPolicyInitializationException if the current state cannot
+   *           be serialized properly
+   */
+  SubClusterPolicyConfiguration serializeConf()
+      throws FederationPolicyInitializationException;
+
+  /**
+   * This method returns the queue this policy is configured for.
+   *
+   * @return the name of the queue.
+   */
+  String getQueue();
+
+  /**
+   * This methods provides a setter for the queue this policy is specified for.
+   *
+   * @param queue the name of the queue.
+   */
+  void setQueue(String queue);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HashBroadcastPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HashBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HashBroadcastPolicyManager.java
new file mode 100644
index 0000000..08ab08f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HashBroadcastPolicyManager.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.manager;
+
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.router.HashBasedRouterPolicy;
+
+/**
+ * Policy that routes applications via hashing of their queuename, and broadcast
+ * resource requests. This picks a {@link HashBasedRouterPolicy} for the router
+ * and a {@link BroadcastAMRMProxyPolicy} for the amrmproxy as they are designed
+ * to work together.
+ */
+public class HashBroadcastPolicyManager extends AbstractPolicyManager {
+
+  public HashBroadcastPolicyManager() {
+    // this structurally hard-codes two compatible policies for Router and
+    // AMRMProxy.
+    routerFederationPolicy = HashBasedRouterPolicy.class;
+    amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/PriorityBroadcastPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/PriorityBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/PriorityBroadcastPolicyManager.java
new file mode 100644
index 0000000..8139e12
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/PriorityBroadcastPolicyManager.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.manager;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Policy that allows operator to configure "weights" for routing. This picks a
+ * {@link PriorityRouterPolicy} for the router and a
+ * {@link BroadcastAMRMProxyPolicy} for the amrmproxy as they are designed to
+ * work together.
+ */
+public class PriorityBroadcastPolicyManager extends AbstractPolicyManager {
+
+  private WeightedPolicyInfo weightedPolicyInfo;
+
+  public PriorityBroadcastPolicyManager() {
+    // this structurally hard-codes two compatible policies for Router and
+    // AMRMProxy.
+    routerFederationPolicy = PriorityRouterPolicy.class;
+    amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
+    weightedPolicyInfo = new WeightedPolicyInfo();
+  }
+
+  @Override
+  public SubClusterPolicyConfiguration serializeConf()
+      throws FederationPolicyInitializationException {
+    ByteBuffer buf = weightedPolicyInfo.toByteBuffer();
+    return SubClusterPolicyConfiguration.newInstance(getQueue(),
+        this.getClass().getCanonicalName(), buf);
+  }
+
+  @VisibleForTesting
+  public WeightedPolicyInfo getWeightedPolicyInfo() {
+    return weightedPolicyInfo;
+  }
+
+  @VisibleForTesting
+  public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) {
+    this.weightedPolicyInfo = weightedPolicyInfo;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/UniformBroadcastPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/UniformBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/UniformBroadcastPolicyManager.java
new file mode 100644
index 0000000..5db0466
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/UniformBroadcastPolicyManager.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.manager;
+
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
+
+/**
+ * This class represents a simple implementation of a {@code
+ * FederationPolicyManager}.
+ *
+ * It combines the basic policies: {@link UniformRandomRouterPolicy} and
+ * {@link BroadcastAMRMProxyPolicy}, which are designed to work together and
+ * "spread" the load among sub-clusters uniformly.
+ *
+ * This simple policy might impose heavy load on the RMs and return more
+ * containers than a job requested as all requests are (replicated and)
+ * broadcasted.
+ */
+public class UniformBroadcastPolicyManager extends AbstractPolicyManager {
+
+  public UniformBroadcastPolicyManager() {
+    // this structurally hard-codes two compatible policies for Router and
+    // AMRMProxy.
+    routerFederationPolicy = UniformRandomRouterPolicy.class;
+    amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java
new file mode 100644
index 0000000..109b534
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.manager;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Policy that allows operator to configure "weights" for routing. This picks a
+ * {@link WeightedRandomRouterPolicy} for the router and a {@link
+ * LocalityMulticastAMRMProxyPolicy} for the amrmproxy as they are designed to
+ * work together.
+ */
+public class WeightedLocalityPolicyManager
+    extends AbstractPolicyManager {
+
+  private WeightedPolicyInfo weightedPolicyInfo;
+
+  public WeightedLocalityPolicyManager() {
+    //this structurally hard-codes two compatible policies for Router and
+    // AMRMProxy.
+    routerFederationPolicy =  WeightedRandomRouterPolicy.class;
+    amrmProxyFederationPolicy = LocalityMulticastAMRMProxyPolicy.class;
+    weightedPolicyInfo = new WeightedPolicyInfo();
+  }
+
+  @Override
+  public SubClusterPolicyConfiguration serializeConf()
+      throws FederationPolicyInitializationException {
+    ByteBuffer buf = weightedPolicyInfo.toByteBuffer();
+    return SubClusterPolicyConfiguration
+        .newInstance(getQueue(), this.getClass().getCanonicalName(), buf);
+  }
+
+  @VisibleForTesting
+  public WeightedPolicyInfo getWeightedPolicyInfo() {
+    return weightedPolicyInfo;
+  }
+
+  @VisibleForTesting
+  public void setWeightedPolicyInfo(
+      WeightedPolicyInfo weightedPolicyInfo) {
+    this.weightedPolicyInfo = weightedPolicyInfo;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/package-info.java
new file mode 100644
index 0000000..9515c01
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Various implementation of FederationPolicyManager. **/
+package org.apache.hadoop.yarn.server.federation.policies.manager;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java
index f49af1d..730fb41 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java
@@ -20,8 +20,11 @@ package org.apache.hadoop.yarn.server.federation.policies.router;
 
 import java.util.Map;
 
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
 
@@ -44,4 +47,20 @@ public abstract class AbstractRouterPolicy extends
     }
   }
 
+  public void validate(ApplicationSubmissionContext appSubmissionContext)
+      throws FederationPolicyException {
+
+    if (appSubmissionContext == null) {
+      throw new FederationPolicyException(
+          "Cannot route an application with null context.");
+    }
+
+    // if the queue is not specified we set it to default value, to be
+    // compatible with YARN behavior.
+    String queue = appSubmissionContext.getQueue();
+    if (queue == null) {
+      appSubmissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
new file mode 100644
index 0000000..e40e87e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.router;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+
+/**
+ * This {@link FederationRouterPolicy} pick a subcluster based on the hash of
+ * the job's queue name. Useful to provide a default behavior when too many
+ * queues exist in a system. This also ensures that all jobs belonging to a
+ * queue are mapped to the same sub-cluster (likely help with locality).
+ */
+public class HashBasedRouterPolicy extends AbstractRouterPolicy {
+
+  @Override
+  public void reinitialize(
+      FederationPolicyInitializationContext federationPolicyContext)
+      throws FederationPolicyInitializationException {
+    FederationPolicyInitializationContextValidator
+        .validate(federationPolicyContext, this.getClass().getCanonicalName());
+
+    // note: this overrides BaseRouterPolicy and ignores the weights
+    setPolicyContext(federationPolicyContext);
+  }
+
+  /**
+   * Simply picks from alphabetically-sorted active subclusters based on the
+   * hash of quey name. Jobs of the same queue will all be routed to the same
+   * sub-cluster, as far as the number of active sub-cluster and their names
+   * remain the same.
+   *
+   * @param appSubmissionContext the context for the app being submitted.
+   *
+   * @return a hash-based chosen subcluster.
+   *
+   * @throws YarnException if there are no active subclusters.
+   */
+  public SubClusterId getHomeSubcluster(
+      ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+
+    // throws if no active subclusters available
+    Map<SubClusterId, SubClusterInfo> activeSubclusters =
+        getActiveSubclusters();
+
+    validate(appSubmissionContext);
+
+    int chosenPosition = Math.abs(
+        appSubmissionContext.getQueue().hashCode() % activeSubclusters.size());
+
+    List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
+    Collections.sort(list);
+    return list.get(chosenPosition);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
index 5de749f..2ca15bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
@@ -64,6 +64,9 @@ public class LoadBasedRouterPolicy extends AbstractRouterPolicy {
   public SubClusterId getHomeSubcluster(
       ApplicationSubmissionContext appSubmissionContext) throws YarnException {
 
+    // null checks and default-queue behavior
+    validate(appSubmissionContext);
+
     Map<SubClusterId, SubClusterInfo> activeSubclusters =
         getActiveSubclusters();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
index bc3a1f7..13d9140 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
@@ -36,6 +36,9 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy {
   public SubClusterId getHomeSubcluster(
       ApplicationSubmissionContext appSubmissionContext) throws YarnException {
 
+    // null checks and default-queue behavior
+    validate(appSubmissionContext);
+
     Map<SubClusterId, SubClusterInfo> activeSubclusters =
         getActiveSubclusters();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
index b8f9cc3..d820449 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
@@ -48,11 +48,10 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy {
   }
 
   @Override
-  public void reinitialize(
-      FederationPolicyInitializationContext policyContext)
+  public void reinitialize(FederationPolicyInitializationContext policyContext)
       throws FederationPolicyInitializationException {
-    FederationPolicyInitializationContextValidator
-        .validate(policyContext, this.getClass().getCanonicalName());
+    FederationPolicyInitializationContextValidator.validate(policyContext,
+        this.getClass().getCanonicalName());
 
     // note: this overrides AbstractRouterPolicy and ignores the weights
 
@@ -73,6 +72,9 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy {
   public SubClusterId getHomeSubcluster(
       ApplicationSubmissionContext appSubmissionContext) throws YarnException {
 
+    // null checks and default-queue behavior
+    validate(appSubmissionContext);
+
     Map<SubClusterId, SubClusterInfo> activeSubclusters =
         getActiveSubclusters();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
index ac75ae9..5727134 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
@@ -43,6 +43,9 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
   public SubClusterId getHomeSubcluster(
       ApplicationSubmissionContext appSubmissionContext) throws YarnException {
 
+    // null checks and default-queue behavior
+    validate(appSubmissionContext);
+
     Map<SubClusterId, SubClusterInfo> activeSubclusters =
         getActiveSubclusters();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
index ba897da..6bd8bf0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.federation.policies;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
@@ -35,8 +36,10 @@ import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPo
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
 import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
 import org.junit.Test;
 
@@ -46,7 +49,7 @@ import org.junit.Test;
 public abstract class BaseFederationPoliciesTest {
 
   private ConfigurableFederationPolicy policy;
-  private WeightedPolicyInfo policyInfo;
+  private WeightedPolicyInfo policyInfo = mock(WeightedPolicyInfo.class);
   private Map<SubClusterId, SubClusterInfo> activeSubclusters = new HashMap<>();
   private FederationPolicyInitializationContext federationPolicyContext;
   private ApplicationSubmissionContext applicationSubmissionContext =
@@ -103,7 +106,7 @@ public abstract class BaseFederationPoliciesTest {
       ((FederationRouterPolicy) localPolicy)
           .getHomeSubcluster(getApplicationSubmissionContext());
     } else {
-      String[] hosts = new String[] {"host1", "host2" };
+      String[] hosts = new String[] {"host1", "host2"};
       List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil
           .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
       ((FederationAMRMProxyPolicy) localPolicy)
@@ -170,4 +173,14 @@ public abstract class BaseFederationPoliciesTest {
     this.homeSubCluster = homeSubCluster;
   }
 
+  public void setMockActiveSubclusters(int numSubclusters) {
+    for (int i = 1; i <= numSubclusters; i++) {
+      SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
+      SubClusterInfo sci = mock(SubClusterInfo.class);
+      when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
+      when(sci.getSubClusterId()).thenReturn(sc.toId());
+      getActiveSubclusters().put(sc.toId(), sci);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java
deleted file mode 100644
index c609886..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.policies;
-
-import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
-import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
-import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
-import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * This class provides common test methods for testing {@code
- * FederationPolicyManager}s.
- */
-public abstract class BasePolicyManagerTest {
-
-
-  @SuppressWarnings("checkstyle:visibilitymodifier")
-  protected FederationPolicyManager wfp = null;
-  @SuppressWarnings("checkstyle:visibilitymodifier")
-  protected Class expectedPolicyManager;
-  @SuppressWarnings("checkstyle:visibilitymodifier")
-  protected Class expectedAMRMProxyPolicy;
-  @SuppressWarnings("checkstyle:visibilitymodifier")
-  protected Class expectedRouterPolicy;
-
-
-  @Test
-  public void testSerializeAndInstantiate() throws Exception {
-    serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
-        expectedAMRMProxyPolicy,
-        expectedRouterPolicy);
-  }
-
-  @Test(expected = FederationPolicyInitializationException.class)
-  public void testSerializeAndInstantiateBad1() throws Exception {
-    serializeAndDeserializePolicyManager(wfp, String.class,
-        expectedAMRMProxyPolicy, expectedRouterPolicy);
-  }
-
-  @Test(expected = AssertionError.class)
-  public void testSerializeAndInstantiateBad2() throws Exception {
-    serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
-        String.class, expectedRouterPolicy);
-  }
-
-  @Test(expected = AssertionError.class)
-  public void testSerializeAndInstantiateBad3() throws Exception {
-    serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
-        expectedAMRMProxyPolicy, String.class);
-  }
-
-  protected static void serializeAndDeserializePolicyManager(
-      FederationPolicyManager wfp, Class policyManagerType,
-      Class expAMRMProxyPolicy, Class expRouterPolicy) throws Exception {
-
-    // serializeConf it in a context
-    SubClusterPolicyConfiguration fpc =
-        wfp.serializeConf();
-    fpc.setType(policyManagerType.getCanonicalName());
-    FederationPolicyInitializationContext context = new
-        FederationPolicyInitializationContext();
-    context.setSubClusterPolicyConfiguration(fpc);
-    context
-        .setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade());
-    context.setFederationSubclusterResolver(
-        FederationPoliciesTestUtil.initResolver());
-    context.setHomeSubcluster(SubClusterId.newInstance("homesubcluster"));
-
-    // based on the "context" created instantiate new class and use it
-    Class c = Class.forName(wfp.getClass().getCanonicalName());
-    FederationPolicyManager wfp2 = (FederationPolicyManager) c.newInstance();
-
-    FederationAMRMProxyPolicy federationAMRMProxyPolicy =
-        wfp2.getAMRMPolicy(context, null);
-
-    //needed only for tests (getARMRMPolicy change the "type" in conf)
-    fpc.setType(wfp.getClass().getCanonicalName());
-
-    FederationRouterPolicy federationRouterPolicy =
-        wfp2.getRouterPolicy(context, null);
-
-    Assert.assertEquals(federationAMRMProxyPolicy.getClass(),
-        expAMRMProxyPolicy);
-
-    Assert.assertEquals(federationRouterPolicy.getClass(),
-        expRouterPolicy);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
index d906b92..611a486 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
 import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
 import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32ffa9e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java
deleted file mode 100644
index 5e5bc83..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.policies;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
-import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
-import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Simple test of {@link PriorityBroadcastPolicyManager}.
- */
-public class TestPriorityBroadcastPolicyManager extends BasePolicyManagerTest {
-
-  private WeightedPolicyInfo policyInfo;
-
-  @Before
-  public void setup() {
-    // configure a policy
-
-    wfp = new PriorityBroadcastPolicyManager();
-    wfp.setQueue("queue1");
-    SubClusterId sc1 = SubClusterId.newInstance("sc1");
-    SubClusterId sc2 = SubClusterId.newInstance("sc2");
-    policyInfo = new WeightedPolicyInfo();
-
-    Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
-    routerWeights.put(new SubClusterIdInfo(sc1), 0.2f);
-    routerWeights.put(new SubClusterIdInfo(sc2), 0.8f);
-    policyInfo.setRouterPolicyWeights(routerWeights);
-
-    ((PriorityBroadcastPolicyManager) wfp).setWeightedPolicyInfo(policyInfo);
-
-    // set expected params that the base test class will use for tests
-    expectedPolicyManager = PriorityBroadcastPolicyManager.class;
-    expectedAMRMProxyPolicy = BroadcastAMRMProxyPolicy.class;
-    expectedRouterPolicy = PriorityRouterPolicy.class;
-  }
-
-  @Test
-  public void testPolicyInfoSetCorrectly() throws Exception {
-    serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
-        expectedAMRMProxyPolicy, expectedRouterPolicy);
-
-    // check the policyInfo propagates through ser/der correctly
-    Assert.assertEquals(
-        ((PriorityBroadcastPolicyManager) wfp).getWeightedPolicyInfo(),
-        policyInfo);
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org