You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/08/02 00:32:32 UTC

[38/50] [abbrv] hadoop git commit: YARN-6724. Add ability to blacklist sub-clusters when invoking Routing policies. (Giovanni Matteo Fumarola via Subru).

YARN-6724. Add ability to blacklist sub-clusters when invoking Routing policies. (Giovanni Matteo Fumarola via Subru).

(cherry picked from commit f8e5de59697cb78686f0e605dc7e93628b5f3297)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4cfec943
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4cfec943
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4cfec943

Branch: refs/heads/trunk
Commit: 4cfec943b177e2123a935e70d39776521883c2bc
Parents: 433ee44
Author: Subru Krishnan <su...@apache.org>
Authored: Wed Jun 21 19:08:47 2017 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Tue Aug 1 17:28:26 2017 -0700

----------------------------------------------------------------------
 .../federation/policies/RouterPolicyFacade.java | 15 +++++--
 .../policies/router/FederationRouterPolicy.java | 18 +++++---
 .../policies/router/HashBasedRouterPolicy.java  | 22 ++++++++--
 .../policies/router/LoadBasedRouterPolicy.java  |  7 +++-
 .../policies/router/PriorityRouterPolicy.java   |  7 +++-
 .../policies/router/RejectRouterPolicy.java     | 26 ++++++++----
 .../router/UniformRandomRouterPolicy.java       | 23 +++++++++--
 .../router/WeightedRandomRouterPolicy.java      | 11 ++++-
 .../policies/BaseFederationPoliciesTest.java    |  2 +-
 .../policies/TestRouterPolicyFacade.java        | 12 +++---
 .../policies/router/BaseRouterPoliciesTest.java | 43 +++++++++++++++++++-
 .../router/TestHashBasedRouterPolicy.java       |  2 +-
 .../router/TestLoadBasedRouterPolicy.java       |  2 +-
 .../router/TestPriorityRouterPolicy.java        |  2 +-
 .../policies/router/TestRejectRouterPolicy.java |  4 +-
 .../router/TestUniformRandomRouterPolicy.java   |  2 +-
 .../router/TestWeightedRandomRouterPolicy.java  |  2 +-
 17 files changed, 157 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
index 5e31a08..44c1b10 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.federation.policies;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -110,16 +111,22 @@ public class RouterPolicyFacade {
    * This method provides a wrapper of all policy functionalities for routing .
    * Internally it manages configuration changes, and policy init/reinit.
    *
-   * @param appSubmissionContext the application to route.
+   * @param appSubmissionContext the {@link ApplicationSubmissionContext} that
+   *          has to be routed to an appropriate subCluster for execution.
    *
-   * @return the id of the subcluster that will be the "home" for this
+   * @param blackListSubClusters the list of subClusters as identified by
+   *          {@link SubClusterId} to blackList from the selection of the home
+   *          subCluster.
+   *
+   * @return the {@link SubClusterId} that will be the "home" for this
    *         application.
    *
    * @throws YarnException if there are issues initializing policies, or no
    *           valid sub-cluster id could be found for this app.
    */
   public SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+      ApplicationSubmissionContext appSubmissionContext,
+      List<SubClusterId> blackListSubClusters) throws YarnException {
 
     // the maps are concurrent, but we need to protect from reset()
     // reinitialization mid-execution by creating a new reference local to this
@@ -186,7 +193,7 @@ public class RouterPolicyFacade {
           + "and no default specified.");
     }
 
-    return policy.getHomeSubcluster(appSubmissionContext);
+    return policy.getHomeSubcluster(appSubmissionContext, blackListSubClusters);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java
index 90ea0a8..9325bd8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java
@@ -17,6 +17,8 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
+import java.util.List;
+
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
@@ -29,16 +31,22 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 public interface FederationRouterPolicy extends ConfigurableFederationPolicy {
 
   /**
-   * Determines the sub-cluster that the user application submision should be
+   * Determines the sub-cluster that the user application submission should be
    * routed to.
    *
-   * @param appSubmissionContext the context for the app being submitted.
+   * @param appSubmissionContext the {@link ApplicationSubmissionContext} that
+   *          has to be routed to an appropriate subCluster for execution.
+   *
+   * @param blackListSubClusters the list of subClusters as identified by
+   *          {@link SubClusterId} to blackList from the selection of the home
+   *          subCluster.
    *
-   * @return the sub-cluster as identified by {@link SubClusterId} to route the
-   *         request to.
+   * @return the {@link SubClusterId} that will be the "home" for this
+   *         application.
    *
    * @throws YarnException if the policy cannot determine a viable subcluster.
    */
   SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext) throws YarnException;
+      ApplicationSubmissionContext appSubmissionContext,
+      List<SubClusterId> blackListSubClusters) throws YarnException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
index e40e87e..257a9fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
@@ -55,19 +55,35 @@ public class HashBasedRouterPolicy extends AbstractRouterPolicy {
    * sub-cluster, as far as the number of active sub-cluster and their names
    * remain the same.
    *
-   * @param appSubmissionContext the context for the app being submitted.
+   * @param appSubmissionContext the {@link ApplicationSubmissionContext} that
+   *          has to be routed to an appropriate subCluster for execution.
    *
-   * @return a hash-based chosen subcluster.
+   * @param blackListSubClusters the list of subClusters as identified by
+   *          {@link SubClusterId} to blackList from the selection of the home
+   *          subCluster.
+   *
+   * @return a hash-based chosen {@link SubClusterId} that will be the "home"
+   *         for this application.
    *
    * @throws YarnException if there are no active subclusters.
    */
+  @Override
   public SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+      ApplicationSubmissionContext appSubmissionContext,
+      List<SubClusterId> blackListSubClusters) throws YarnException {
 
     // throws if no active subclusters available
     Map<SubClusterId, SubClusterInfo> activeSubclusters =
         getActiveSubclusters();
 
+    if (blackListSubClusters != null) {
+
+      // Remove from the active SubClusters from StateStore the blacklisted ones
+      for (SubClusterId scId : blackListSubClusters) {
+        activeSubclusters.remove(scId);
+      }
+    }
+
     validate(appSubmissionContext);
 
     int chosenPosition = Math.abs(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
index 2ca15bf..c124001 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -62,7 +63,8 @@ public class LoadBasedRouterPolicy extends AbstractRouterPolicy {
 
   @Override
   public SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+      ApplicationSubmissionContext appSubmissionContext,
+      List<SubClusterId> blacklist) throws YarnException {
 
     // null checks and default-queue behavior
     validate(appSubmissionContext);
@@ -76,6 +78,9 @@ public class LoadBasedRouterPolicy extends AbstractRouterPolicy {
     long currBestMem = -1;
     for (Map.Entry<SubClusterId, SubClusterInfo> entry : activeSubclusters
         .entrySet()) {
+      if (blacklist != null && blacklist.contains(entry.getKey())) {
+        continue;
+      }
       SubClusterIdInfo id = new SubClusterIdInfo(entry.getKey());
       if (weights.containsKey(id) && weights.get(id) > 0) {
         long availableMemory = getAvailableMemory(entry.getValue());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
index 13d9140..59f8767 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -34,7 +35,8 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy {
 
   @Override
   public SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+      ApplicationSubmissionContext appSubmissionContext,
+      List<SubClusterId> blacklist) throws YarnException {
 
     // null checks and default-queue behavior
     validate(appSubmissionContext);
@@ -50,6 +52,9 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy {
     Float currentBest = Float.MIN_VALUE;
     for (SubClusterId id : activeSubclusters.keySet()) {
       SubClusterIdInfo idInfo = new SubClusterIdInfo(id);
+      if (blacklist != null && blacklist.contains(id)) {
+        continue;
+      }
       if (weights.containsKey(idInfo) && weights.get(idInfo) > currentBest) {
         currentBest = weights.get(idInfo);
         chosen = id;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java
index faf3279..b4c0192 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java
@@ -17,6 +17,8 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
+import java.util.List;
+
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
@@ -27,8 +29,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 
 /**
  * This {@link FederationRouterPolicy} simply rejects all incoming requests.
- * This is useful to prevent applications running in a queue to be run
- * anywhere in the federated cluster.
+ * This is useful to prevent applications running in a queue to be run anywhere
+ * in the federated cluster.
  */
 public class RejectRouterPolicy extends AbstractRouterPolicy {
 
@@ -44,23 +46,31 @@ public class RejectRouterPolicy extends AbstractRouterPolicy {
   /**
    * The policy always reject requests.
    *
-   * @param appSubmissionContext the context for the app being submitted.
+   * @param appSubmissionContext the {@link ApplicationSubmissionContext} that
+   *          has to be routed to an appropriate subCluster for execution.
+   *
+   * @param blackListSubClusters the list of subClusters as identified by
+   *          {@link SubClusterId} to blackList from the selection of the home
+   *          subCluster.
    *
    * @return (never).
    *
-   * @throws YarnException (always) to prevent applications in this queue to
-   * be run anywhere in the federated cluster.
+   * @throws YarnException (always) to prevent applications in this queue to be
+   *           run anywhere in the federated cluster.
    */
+  @Override
   public SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+      ApplicationSubmissionContext appSubmissionContext,
+      List<SubClusterId> blackListSubClusters) throws YarnException {
 
     // run standard validation, as error might differ
     validate(appSubmissionContext);
 
     throw new FederationPolicyException("The policy configured for this queue"
         + " (" + appSubmissionContext.getQueue() + ") reject all routing "
-        + "requests by construction. Application " + appSubmissionContext
-        .getApplicationId() + " cannot be routed to any RM.");
+        + "requests by construction. Application "
+        + appSubmissionContext.getApplicationId()
+        + " cannot be routed to any RM.");
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
index d820449..bc729b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
@@ -59,18 +59,24 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy {
   }
 
   /**
-   * Simply picks a random active subcluster to start the AM (this does NOT
+   * Simply picks a random active subCluster to start the AM (this does NOT
    * depend on the weights in the policy).
    *
-   * @param appSubmissionContext the context for the app being submitted
-   *          (ignored).
+   * @param appSubmissionContext the {@link ApplicationSubmissionContext} that
+   *          has to be routed to an appropriate subCluster for execution.
+   *
+   * @param blackListSubClusters the list of subClusters as identified by
+   *          {@link SubClusterId} to blackList from the selection of the home
+   *          subCluster.
    *
    * @return a randomly chosen subcluster.
    *
    * @throws YarnException if there are no active subclusters.
    */
+  @Override
   public SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+      ApplicationSubmissionContext appSubmissionContext,
+      List<SubClusterId> blackListSubClusters) throws YarnException {
 
     // null checks and default-queue behavior
     validate(appSubmissionContext);
@@ -79,6 +85,15 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy {
         getActiveSubclusters();
 
     List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
+
+    if (blackListSubClusters != null) {
+
+      // Remove from the active SubClusters from StateStore the blacklisted ones
+      for (SubClusterId scId : blackListSubClusters) {
+        list.remove(scId);
+      }
+    }
+
     return list.get(rand.nextInt(list.size()));
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
index 5727134..7f230a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
@@ -41,7 +42,8 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
 
   @Override
   public SubClusterId getHomeSubcluster(
-      ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+      ApplicationSubmissionContext appSubmissionContext,
+      List<SubClusterId> blacklist) throws YarnException {
 
     // null checks and default-queue behavior
     validate(appSubmissionContext);
@@ -58,6 +60,9 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
 
     float totActiveWeight = 0;
     for (Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()) {
+      if (blacklist != null && blacklist.contains(entry.getKey().toId())) {
+        continue;
+      }
       if (entry.getKey() != null
           && activeSubclusters.containsKey(entry.getKey().toId())) {
         totActiveWeight += entry.getValue();
@@ -66,6 +71,9 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
     float lookupValue = rand.nextFloat() * totActiveWeight;
 
     for (SubClusterId id : activeSubclusters.keySet()) {
+      if (blacklist != null && blacklist.contains(id)) {
+        continue;
+      }
       SubClusterIdInfo idInfo = new SubClusterIdInfo(id);
       if (weights.containsKey(idInfo)) {
         lookupValue -= weights.get(idInfo);
@@ -77,4 +85,5 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
     // should never happen
     return null;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
index 6bd8bf0..23978ed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
@@ -104,7 +104,7 @@ public abstract class BaseFederationPoliciesTest {
     ConfigurableFederationPolicy localPolicy = getPolicy();
     if (localPolicy instanceof FederationRouterPolicy) {
       ((FederationRouterPolicy) localPolicy)
-          .getHomeSubcluster(getApplicationSubmissionContext());
+          .getHomeSubcluster(getApplicationSubmissionContext(), null);
     } else {
       String[] hosts = new String[] {"host1", "host2"};
       List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java
index 5fa02d6..d0e2dec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java
@@ -95,7 +95,7 @@ public class TestRouterPolicyFacade {
 
     // first call runs using standard UniformRandomRouterPolicy
     SubClusterId chosen =
-        routerFacade.getHomeSubcluster(applicationSubmissionContext);
+        routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
     Assert.assertTrue(subClusterIds.contains(chosen));
     Assert.assertTrue(routerFacade.globalPolicyMap
         .get(queue1) instanceof UniformRandomRouterPolicy);
@@ -107,7 +107,7 @@ public class TestRouterPolicyFacade {
         .newInstance(getPriorityPolicy(queue1)));
 
     // second call is routed by new policy PriorityRouterPolicy
-    chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext);
+    chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
     Assert.assertTrue(chosen.equals(subClusterIds.get(0)));
     Assert.assertTrue(routerFacade.globalPolicyMap
         .get(queue1) instanceof PriorityRouterPolicy);
@@ -126,7 +126,7 @@ public class TestRouterPolicyFacade {
 
     // when invoked it returns the expected SubClusterId.
     SubClusterId chosen =
-        routerFacade.getHomeSubcluster(applicationSubmissionContext);
+        routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
     Assert.assertTrue(subClusterIds.contains(chosen));
 
     // now the caching of policies must have added an entry for this queue
@@ -160,19 +160,19 @@ public class TestRouterPolicyFacade {
     String uninitQueue = "non-initialized-queue";
     when(applicationSubmissionContext.getQueue()).thenReturn(uninitQueue);
     SubClusterId chosen =
-        routerFacade.getHomeSubcluster(applicationSubmissionContext);
+        routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
     Assert.assertTrue(subClusterIds.contains(chosen));
     Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue));
 
     // empty string
     when(applicationSubmissionContext.getQueue()).thenReturn("");
-    chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext);
+    chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
     Assert.assertTrue(subClusterIds.contains(chosen));
     Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue));
 
     // null queue also falls back to default
     when(applicationSubmissionContext.getQueue()).thenReturn(null);
-    chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext);
+    chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
     Assert.assertTrue(subClusterIds.contains(chosen));
     Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue));
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java
index 2e7a0af..c7a7767 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java
@@ -18,11 +18,19 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Test;
@@ -40,12 +48,43 @@ public abstract class BaseRouterPoliciesTest
         ApplicationSubmissionContext.newInstance(null, null, null, null, null,
             false, false, 0, Resources.none(), null, false, null, null);
     SubClusterId chosen =
-        localPolicy.getHomeSubcluster(applicationSubmissionContext);
+        localPolicy.getHomeSubcluster(applicationSubmissionContext, null);
     Assert.assertNotNull(chosen);
   }
 
   @Test(expected = FederationPolicyException.class)
   public void testNullAppContext() throws YarnException {
-    ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(null);
+    ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(null, null);
+  }
+
+  @Test
+  public void testBlacklistSubcluster() throws YarnException {
+    FederationRouterPolicy localPolicy = (FederationRouterPolicy) getPolicy();
+    ApplicationSubmissionContext applicationSubmissionContext =
+        ApplicationSubmissionContext.newInstance(null, null, null, null, null,
+            false, false, 0, Resources.none(), null, false, null, null);
+    Map<SubClusterId, SubClusterInfo> activeSubClusters =
+        getActiveSubclusters();
+    if (activeSubClusters != null && activeSubClusters.size() > 1
+        && !(localPolicy instanceof RejectRouterPolicy)) {
+      // blacklist all the active subcluster but one.
+      Random random = new Random();
+      List<SubClusterId> blacklistSubclusters =
+          new ArrayList<SubClusterId>(activeSubClusters.keySet());
+      SubClusterId removed = blacklistSubclusters
+          .remove(random.nextInt(blacklistSubclusters.size()));
+      // bias LoadBasedRouterPolicy
+      getPolicyInfo().getRouterPolicyWeights()
+          .put(new SubClusterIdInfo(removed), 1.0f);
+      FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
+          getPolicyInfo(), getActiveSubclusters());
+
+      SubClusterId chosen = localPolicy.getHomeSubcluster(
+          applicationSubmissionContext, blacklistSubclusters);
+
+      // check that the selected sub-cluster is only one not blacklisted
+      Assert.assertNotNull(chosen);
+      Assert.assertEquals(removed, chosen);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java
index af7fe43..ee3e09d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java
@@ -70,7 +70,7 @@ public class TestHashBasedRouterPolicy extends BaseRouterPoliciesTest {
     for (int i = 0; i < jobPerSub * numSubclusters; i++) {
       when(applicationSubmissionContext.getQueue()).thenReturn("queue" + i);
       chosen = ((FederationRouterPolicy) getPolicy())
-          .getHomeSubcluster(applicationSubmissionContext);
+          .getHomeSubcluster(applicationSubmissionContext, null);
       counter.get(chosen).addAndGet(1);
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
index b70b4aa..dc8f99b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
@@ -97,7 +97,7 @@ public class TestLoadBasedRouterPolicy extends BaseRouterPoliciesTest {
   public void testLoadIsRespected() throws YarnException {
 
     SubClusterId chosen = ((FederationRouterPolicy) getPolicy())
-        .getHomeSubcluster(getApplicationSubmissionContext());
+        .getHomeSubcluster(getApplicationSubmissionContext(), null);
 
     // check the "planted" best cluster is chosen
     Assert.assertEquals("sc05", chosen.getId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
index 42d919d..3c036c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
@@ -78,7 +78,7 @@ public class TestPriorityRouterPolicy extends BaseRouterPoliciesTest {
   @Test
   public void testPickLowestWeight() throws YarnException {
     SubClusterId chosen = ((FederationRouterPolicy) getPolicy())
-        .getHomeSubcluster(getApplicationSubmissionContext());
+        .getHomeSubcluster(getApplicationSubmissionContext(), null);
     Assert.assertEquals("sc5", chosen.getId());
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java
index 049ebbf..1747f73 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java
@@ -47,7 +47,7 @@ public class TestRejectRouterPolicy extends BaseRouterPoliciesTest {
   @Test(expected = FederationPolicyException.class)
   public void testNoClusterIsChosen() throws YarnException {
     ((FederationRouterPolicy) getPolicy())
-        .getHomeSubcluster(getApplicationSubmissionContext());
+        .getHomeSubcluster(getApplicationSubmissionContext(), null);
   }
 
   @Override
@@ -57,7 +57,7 @@ public class TestRejectRouterPolicy extends BaseRouterPoliciesTest {
     ApplicationSubmissionContext applicationSubmissionContext =
         ApplicationSubmissionContext.newInstance(null, null, null, null, null,
             false, false, 0, Resources.none(), null, false, null, null);
-    localPolicy.getHomeSubcluster(applicationSubmissionContext);
+    localPolicy.getHomeSubcluster(applicationSubmissionContext, null);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java
index b45aa2a..05490ab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java
@@ -57,7 +57,7 @@ public class TestUniformRandomRouterPolicy extends BaseRouterPoliciesTest {
   @Test
   public void testOneSubclusterIsChosen() throws YarnException {
     SubClusterId chosen = ((FederationRouterPolicy) getPolicy())
-        .getHomeSubcluster(getApplicationSubmissionContext());
+        .getHomeSubcluster(getApplicationSubmissionContext(), null);
     Assert.assertTrue(getActiveSubclusters().keySet().contains(chosen));
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
index 09173e6..c969a30 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
@@ -98,7 +98,7 @@ public class TestWeightedRandomRouterPolicy extends BaseRouterPoliciesTest {
 
     for (float i = 0; i < numberOfDraws; i++) {
       SubClusterId chosenId = ((FederationRouterPolicy) getPolicy())
-          .getHomeSubcluster(getApplicationSubmissionContext());
+          .getHomeSubcluster(getApplicationSubmissionContext(), null);
       counter.get(chosenId).incrementAndGet();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org