You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gi...@apache.org on 2020/01/31 01:00:08 UTC

[hadoop] branch trunk updated: YARN-8982. [Router] Add locality policy. Contributed by Young Chen.

This is an automated email from the ASF dual-hosted git repository.

gifuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bf8686f  YARN-8982. [Router] Add locality policy. Contributed by Young Chen.
bf8686f is described below

commit bf8686f43f0176f51bdb9b41f63be6801f26413f
Author: Giovanni Matteo Fumarola <gi...@apache.org>
AuthorDate: Thu Jan 30 16:59:36 2020 -0800

    YARN-8982. [Router] Add locality policy. Contributed by Young Chen.
---
 .../federation/policies/FederationPolicyUtils.java |  11 +-
 .../manager/WeightedLocalityPolicyManager.java     |  11 +-
 .../policies/router/LocalityRouterPolicy.java      | 196 ++++++++++++++
 .../policies/BaseFederationPoliciesTest.java       |   4 +-
 .../manager/TestWeightedLocalityPolicyManager.java |   4 +-
 .../policies/router/TestLocalityRouterPolicy.java  | 282 +++++++++++++++++++++
 .../router/TestWeightedRandomRouterPolicy.java     |  18 +-
 7 files changed, 510 insertions(+), 16 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java
index aaa2c43..3aeeca3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java
@@ -37,6 +37,8 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Utility class for Federation policy.
  */
@@ -48,7 +50,7 @@ public final class FederationPolicyUtils {
   public static final String NO_ACTIVE_SUBCLUSTER_AVAILABLE =
       "No active SubCluster available to submit the request.";
 
-  private static final Random RAND = new Random(System.currentTimeMillis());
+  private static Random rand = new Random(System.currentTimeMillis());
 
   /** Disable constructor. */
   private FederationPolicyUtils() {
@@ -223,7 +225,7 @@ public final class FederationPolicyUtils {
     if (totalWeight == 0) {
       return -1;
     }
-    float samplePoint = RAND.nextFloat() * totalWeight;
+    float samplePoint = rand.nextFloat() * totalWeight;
     int lastIndex = 0;
     for (i = 0; i < weights.size(); i++) {
       if (weights.get(i) > 0) {
@@ -239,4 +241,9 @@ public final class FederationPolicyUtils {
     // float rounding kicks in during subtractions
     return lastIndex;
   }
+
+  @VisibleForTesting
+  public static void setRand(long seed){
+    rand.setSeed(seed);
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java
index 109b534..a144501 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java
@@ -17,18 +17,19 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.manager;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.nio.ByteBuffer;
+
 import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
-import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.router.LocalityRouterPolicy;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
 
-import java.nio.ByteBuffer;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Policy that allows operator to configure "weights" for routing. This picks a
- * {@link WeightedRandomRouterPolicy} for the router and a {@link
+ * {@link LocalityRouterPolicy} for the router and a {@link
  * LocalityMulticastAMRMProxyPolicy} for the amrmproxy as they are designed to
  * work together.
  */
@@ -40,7 +41,7 @@ public class WeightedLocalityPolicyManager
   public WeightedLocalityPolicyManager() {
     //this structurally hard-codes two compatible policies for Router and
     // AMRMProxy.
-    routerFederationPolicy =  WeightedRandomRouterPolicy.class;
+    routerFederationPolicy =  LocalityRouterPolicy.class;
     amrmProxyFederationPolicy = LocalityMulticastAMRMProxyPolicy.class;
     weightedPolicyInfo = new WeightedPolicyInfo();
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java
new file mode 100644
index 0000000..469240a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.router;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Collections;
+
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This policy selects the subcluster depending on the node where the Client
+ * wants to run its application.
+ *
+ * It succeeds if:
+ *
+ * - There are three AMContainerResourceRequests in the order
+ *   NODE, RACK, ANY
+ *
+ * It falls back to WeightedRandomRouterPolicy in case of:
+ *
+ * - Null or empty AMContainerResourceRequests;
+ *
+ * - One AMContainerResourceRequests and it has ANY as ResourceName;
+ *
+ * - The node is in blacklisted SubClusters.
+ *
+ * It fails if:
+ *
+ * - The node does not exist and RelaxLocality is False;
+ *
+ * - We have an invalid number (not 0, 1 or 3) resource requests
+ */
+public class LocalityRouterPolicy extends WeightedRandomRouterPolicy {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(LocalityRouterPolicy.class);
+
+  private SubClusterResolver resolver;
+  private List<SubClusterId> enabledSCs;
+
+  @Override
+  public void reinitialize(FederationPolicyInitializationContext policyContext)
+      throws FederationPolicyInitializationException {
+    super.reinitialize(policyContext);
+    resolver = policyContext.getFederationSubclusterResolver();
+    Map<SubClusterIdInfo, Float> weights =
+        getPolicyInfo().getRouterPolicyWeights();
+    enabledSCs = new ArrayList<SubClusterId>();
+    for (Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()) {
+      if (entry != null && entry.getValue() > 0) {
+        enabledSCs.add(entry.getKey().toId());
+      }
+    }
+  }
+
+  @Override
+  public SubClusterId getHomeSubcluster(
+      ApplicationSubmissionContext appSubmissionContext,
+      List<SubClusterId> blackListSubClusters) throws YarnException {
+
+    // null checks and default-queue behavior
+    validate(appSubmissionContext);
+
+    List<ResourceRequest> rrList =
+        appSubmissionContext.getAMContainerResourceRequests();
+
+    // Fast path for FailForward to WeightedRandomRouterPolicy
+    if (rrList == null || rrList.isEmpty() || (rrList.size() == 1
+        && ResourceRequest.isAnyLocation(rrList.get(0).getResourceName()))) {
+      return super
+          .getHomeSubcluster(appSubmissionContext, blackListSubClusters);
+    }
+
+    if (rrList.size() != 3) {
+      throw new FederationPolicyException(
+          "Invalid number of resource requests: " + rrList.size());
+    }
+
+    Map<SubClusterId, SubClusterInfo> activeSubClusters =
+        getActiveSubclusters();
+    List<SubClusterId> validSubClusters =
+        new ArrayList<>(activeSubClusters.keySet());
+    FederationPolicyUtils
+        .validateSubClusterAvailability(validSubClusters, blackListSubClusters);
+    if (blackListSubClusters != null) {
+      // Remove from the active SubClusters from StateStore the blacklisted ones
+      validSubClusters.removeAll(blackListSubClusters);
+    }
+
+    try {
+      // With three requests, this has been processed by the
+      // ResourceRequestInterceptorREST, and should have
+      // node, rack, and any
+      SubClusterId targetId = null;
+      ResourceRequest nodeRequest = null;
+      ResourceRequest rackRequest = null;
+      ResourceRequest anyRequest = null;
+      for (ResourceRequest rr : rrList) {
+        // Handle "node" requests
+        try {
+          targetId = resolver.getSubClusterForNode(rr.getResourceName());
+          nodeRequest = rr;
+        } catch (YarnException e) {
+          LOG.error("Cannot resolve node : {}", e.getLocalizedMessage());
+        }
+        // Handle "rack" requests
+        try {
+          resolver.getSubClustersForRack(rr.getResourceName());
+          rackRequest = rr;
+        } catch (YarnException e) {
+          LOG.error("Cannot resolve rack : {}", e.getLocalizedMessage());
+        }
+        // Handle "ANY" requests
+        if (ResourceRequest.isAnyLocation(rr.getResourceName())) {
+          anyRequest = rr;
+          continue;
+        }
+      }
+      if (nodeRequest == null) {
+        throw new YarnException("Missing node request");
+      }
+      if (rackRequest == null) {
+        throw new YarnException("Missing rack request");
+      }
+      if (anyRequest == null) {
+        throw new YarnException("Missing any request");
+      }
+      LOG.info(
+          "Node request: " + nodeRequest.getResourceName() + ", Rack request: "
+              + rackRequest.getResourceName() + ", Any request: " + anyRequest
+              .getResourceName());
+      // Handle "node" requests
+      if (validSubClusters.contains(targetId) && enabledSCs
+          .contains(targetId)) {
+        LOG.info("Node {} is in SubCluster: {}", nodeRequest.getResourceName(),
+            targetId);
+        return targetId;
+      } else {
+        throw new YarnException("The node " + nodeRequest.getResourceName()
+            + " is in a blacklist SubCluster or not active. ");
+      }
+    } catch (YarnException e) {
+      LOG.error("Validating resource requests failed, Falling back to "
+          + "WeightedRandomRouterPolicy placement: " + e.getMessage());
+      // FailForward to WeightedRandomRouterPolicy
+      // Overwrite request to use a default ANY
+      ResourceRequest amReq = Records.newRecord(ResourceRequest.class);
+      amReq.setPriority(appSubmissionContext.getPriority());
+      amReq.setResourceName(ResourceRequest.ANY);
+      amReq.setCapability(appSubmissionContext.getResource());
+      amReq.setNumContainers(1);
+      amReq.setRelaxLocality(true);
+      amReq.setNodeLabelExpression(
+          appSubmissionContext.getNodeLabelExpression());
+      amReq.setExecutionTypeRequest(
+          ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED));
+      appSubmissionContext
+          .setAMContainerResourceRequests(Collections.singletonList(amReq));
+      return super
+          .getHomeSubcluster(appSubmissionContext, blackListSubClusters);
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
index 57d3c67..249efd3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
@@ -162,8 +162,8 @@ public abstract class BaseFederationPoliciesTest {
     return rand;
   }
 
-  public void setRand(Random rand) {
-    this.rand = rand;
+  public void setRand(long seed) {
+    this.rand.setSeed(seed);
   }
 
   public SubClusterId getHomeSubCluster() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestWeightedLocalityPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestWeightedLocalityPolicyManager.java
index 51661473..d2ef7a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestWeightedLocalityPolicyManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestWeightedLocalityPolicyManager.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.federation.policies.manager;
 
 import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
-import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.router.LocalityRouterPolicy;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
 import org.junit.Assert;
@@ -63,7 +63,7 @@ public class TestWeightedLocalityPolicyManager extends
     //set expected params that the base test class will use for tests
     expectedPolicyManager = WeightedLocalityPolicyManager.class;
     expectedAMRMProxyPolicy = LocalityMulticastAMRMProxyPolicy.class;
-    expectedRouterPolicy = WeightedRandomRouterPolicy.class;
+    expectedRouterPolicy = LocalityRouterPolicy.class;
   }
 
   @Test
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLocalityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLocalityRouterPolicy.java
new file mode 100644
index 0000000..0593932
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLocalityRouterPolicy.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.router;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
+import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test class to validate the correctness of LocalityRouterPolicy.
+ */
+public class TestLocalityRouterPolicy extends TestWeightedRandomRouterPolicy {
+
+  /*
+   * The MachineList for the default Resolver has the following nodes:
+   *
+   * node1<=>subcluster1
+   *
+   * node2<=>subcluster2
+   *
+   * noDE3<=>subcluster3
+   *
+   * node4<=>subcluster3
+   *
+   * subcluster0-rack0-host0<=>subcluster0
+   *
+   * Subcluster1-RACK1-HOST1<=>subcluster1
+   *
+   * SUBCLUSTER1-RACK1-HOST2<=>subcluster1
+   *
+   * SubCluster2-RACK3-HOST3<=>subcluster2
+   */
+
+  @Before
+  public void setUp() throws Exception {
+    setPolicy(new LocalityRouterPolicy());
+    setPolicyInfo(new WeightedPolicyInfo());
+
+    configureWeights(4);
+
+    initializePolicy(new YarnConfiguration());
+  }
+
+  private void initializePolicy(Configuration conf) throws YarnException {
+    setFederationPolicyContext(new FederationPolicyInitializationContext());
+    SubClusterResolver resolver = FederationPoliciesTestUtil.initResolver();
+    getFederationPolicyContext().setFederationSubclusterResolver(resolver);
+    ByteBuffer buf = getPolicyInfo().toByteBuffer();
+    getFederationPolicyContext().setSubClusterPolicyConfiguration(
+        SubClusterPolicyConfiguration
+            .newInstance("queue1", getPolicy().getClass().getCanonicalName(),
+                buf));
+    getFederationPolicyContext().setHomeSubcluster(getHomeSubCluster());
+    FederationPoliciesTestUtil
+        .initializePolicyContext(getFederationPolicyContext(), getPolicy(),
+            getPolicyInfo(), getActiveSubclusters(), conf);
+  }
+
+  /**
+   * This test validates the correctness in case of the request has 1 node and
+   * the node belongs to an active subcluster.
+   */
+  @Test
+  public void testNodeInActiveSubCluster() throws YarnException {
+    List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
+    requests.add(ResourceRequest
+        .newInstance(Priority.UNDEFINED, "node1", Resource.newInstance(10, 1),
+            1));
+    requests.add(ResourceRequest
+        .newInstance(Priority.UNDEFINED, "rack1", Resource.newInstance(10, 1),
+            1));
+    requests.add(ResourceRequest
+        .newInstance(Priority.UNDEFINED, ResourceRequest.ANY,
+            Resource.newInstance(10, 1), 1));
+    ApplicationSubmissionContext asc = ApplicationSubmissionContext
+        .newInstance(null, null, null, null, null, false, false, 0,
+            Resources.none(), null, false, null, null);
+    asc.setAMContainerResourceRequests(requests);
+
+    SubClusterId chosen =
+        ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(asc, null);
+    // If node1 is active, we should choose the sub cluster with node1
+    if (getActiveSubclusters().containsKey(
+        getFederationPolicyContext().getFederationSubclusterResolver()
+            .getSubClusterForNode("node1").getId())) {
+      Assert.assertEquals(
+          getFederationPolicyContext().getFederationSubclusterResolver()
+              .getSubClusterForNode("node1"), chosen);
+    }
+    // Regardless, we should choose an active SubCluster
+    Assert.assertTrue(getActiveSubclusters().containsKey(chosen));
+  }
+
+  /**
+   * This test validates the correctness in case of the request has multiple
+   * ResourceRequests. The tests without ResourceRequests are done in
+   * TestWeightedRandomRouterPolicy.
+   */
+  @Test
+  public void testMultipleResourceRequests() throws YarnException {
+    List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
+    requests.add(ResourceRequest
+        .newInstance(Priority.UNDEFINED, "node1", Resource.newInstance(10, 1),
+            1));
+    requests.add(ResourceRequest
+        .newInstance(Priority.UNDEFINED, "node2", Resource.newInstance(10, 1),
+            1));
+    ApplicationSubmissionContext asc = ApplicationSubmissionContext
+        .newInstance(null, null, null, null, null, false, false, 0,
+            Resources.none(), null, false, null, null);
+    asc.setAMContainerResourceRequests(requests);
+    try {
+      ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(asc, null);
+      Assert.fail();
+    } catch (FederationPolicyException e) {
+      Assert.assertTrue(
+          e.getMessage().startsWith("Invalid number of resource requests: "));
+    }
+  }
+
+  /**
+   * This test validates the correctness in case of the request has 1 node and
+   * the node does not exist in the Resolver MachineList file.
+   */
+  @Test
+  public void testNodeNotExists() throws YarnException {
+    List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
+    boolean relaxLocality = true;
+    requests.add(ResourceRequest
+        .newInstance(Priority.UNDEFINED, "node5", Resource.newInstance(10, 1),
+            1, relaxLocality));
+    requests.add(ResourceRequest
+        .newInstance(Priority.UNDEFINED, "rack1", Resource.newInstance(10, 1),
+            1));
+    requests.add(ResourceRequest
+        .newInstance(Priority.UNDEFINED, ResourceRequest.ANY,
+            Resource.newInstance(10, 1), 1));
+    ApplicationSubmissionContext asc = ApplicationSubmissionContext
+        .newInstance(null, null, null, null, null, false, false, 0,
+            Resources.none(), null, false, null, null);
+    asc.setAMContainerResourceRequests(requests);
+
+    try {
+      ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(asc, null);
+    } catch (FederationPolicyException e) {
+      Assert.fail();
+    }
+  }
+
+  /**
+   * This test validates the correctness in case of the request has 1 node and
+   * the node is in a blacklist subclusters.
+   */
+  @Test
+  public void testNodeInABlacklistSubCluster() throws YarnException {
+    // Blacklist SubCluster3
+    String subClusterToBlacklist = "subcluster3";
+    // Remember the current value of subcluster3
+    Float value =
+        getPolicyInfo().getRouterPolicyWeights().get(subClusterToBlacklist);
+    getPolicyInfo().getRouterPolicyWeights()
+        .put(new SubClusterIdInfo(subClusterToBlacklist), 0.0f);
+    initializePolicy(new YarnConfiguration());
+
+    FederationPoliciesTestUtil
+        .initializePolicyContext(getFederationPolicyContext(), getPolicy(),
+            getPolicyInfo(), getActiveSubclusters(), new Configuration());
+
+    List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
+    boolean relaxLocality = true;
+    requests.add(ResourceRequest
+        .newInstance(Priority.UNDEFINED, "node4", Resource.newInstance(10, 1),
+            1, relaxLocality));
+    requests.add(ResourceRequest
+        .newInstance(Priority.UNDEFINED, "rack1", Resource.newInstance(10, 1),
+            1));
+    requests.add(ResourceRequest
+        .newInstance(Priority.UNDEFINED, ResourceRequest.ANY,
+            Resource.newInstance(10, 1), 1));
+    ApplicationSubmissionContext asc = ApplicationSubmissionContext
+        .newInstance(null, null, null, null, null, false, false, 0,
+            Resources.none(), null, false, null, null);
+    asc.setAMContainerResourceRequests(requests);
+
+    try {
+      SubClusterId targetId =
+          ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(asc, null);
+      // The selected subcluster HAS no to be the same as the one blacklisted.
+      Assert.assertNotEquals(targetId.getId(), subClusterToBlacklist);
+    } catch (FederationPolicyException e) {
+      Assert.fail();
+    }
+
+    // Set again the previous value for the other tests
+    getPolicyInfo().getRouterPolicyWeights()
+        .put(new SubClusterIdInfo(subClusterToBlacklist), value);
+  }
+
+  /**
+   * This test validates the correctness in case of the request has 1 node and
+   * the node is not in the policy weights.
+   */
+  @Test
+  public void testNodeNotInPolicy() throws YarnException {
+    // Blacklist SubCluster3
+    String subClusterToBlacklist = "subcluster3";
+    // Remember the current value of subcluster3
+    Float value =
+        getPolicyInfo().getRouterPolicyWeights().get(subClusterToBlacklist);
+    getPolicyInfo().getRouterPolicyWeights().remove(subClusterToBlacklist);
+    initializePolicy(new YarnConfiguration());
+
+    FederationPoliciesTestUtil
+        .initializePolicyContext(getFederationPolicyContext(), getPolicy(),
+            getPolicyInfo(), getActiveSubclusters(), new Configuration());
+
+    List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
+    boolean relaxLocality = true;
+    requests.add(ResourceRequest
+        .newInstance(Priority.UNDEFINED, "node4", Resource.newInstance(10, 1),
+            1, relaxLocality));
+    requests.add(ResourceRequest
+        .newInstance(Priority.UNDEFINED, "rack1", Resource.newInstance(10, 1),
+            1));
+    requests.add(ResourceRequest
+        .newInstance(Priority.UNDEFINED, ResourceRequest.ANY,
+            Resource.newInstance(10, 1), 1));
+    ApplicationSubmissionContext asc = ApplicationSubmissionContext
+        .newInstance(null, null, null, null, null, false, false, 0,
+            Resources.none(), null, false, null, null);
+    asc.setAMContainerResourceRequests(requests);
+
+    try {
+      SubClusterId targetId =
+          ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(asc, null);
+      // The selected subcluster HAS no to be the same as the one blacklisted.
+      Assert.assertNotEquals(targetId.getId(), subClusterToBlacklist);
+    } catch (FederationPolicyException e) {
+      Assert.fail();
+    }
+
+    // Set again the previous value for the other tests
+    getPolicyInfo().getRouterPolicyWeights()
+        .put(new SubClusterIdInfo(subClusterToBlacklist), value);
+  }
+}
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
index c969a30..d549250 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
@@ -47,10 +48,21 @@ public class TestWeightedRandomRouterPolicy extends BaseRouterPoliciesTest {
   public void setUp() throws Exception {
     setPolicy(new WeightedRandomRouterPolicy());
     setPolicyInfo(new WeightedPolicyInfo());
+
+    configureWeights(20);
+
+    FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
+        getPolicyInfo(), getActiveSubclusters());
+  }
+
+  public void configureWeights(float numSubClusters) {
+    // Set random seed to remove random failures
+    FederationPolicyUtils.setRand(5);
+    setRand(5);
+
     Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
     Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
 
-    float numSubClusters = 20;
     // simulate N subclusters each with a 5% chance of being inactive
     for (int i = 0; i < numSubClusters; i++) {
       SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
@@ -74,10 +86,6 @@ public class TestWeightedRandomRouterPolicy extends BaseRouterPoliciesTest {
     }
     getPolicyInfo().setRouterPolicyWeights(routerWeights);
     getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
-
-    FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
-        getPolicyInfo(), getActiveSubclusters());
-
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org