You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/09 19:55:57 UTC

[39/50] [abbrv] hadoop git commit: YARN-7682. Expose canSatisfyConstraints utility function to validate a placement against a constraint. (Panagiotis Garefalakis via asuresh)

YARN-7682. Expose canSatisfyConstraints utility function to validate a placement against a constraint. (Panagiotis Garefalakis via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/81ac40f4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/81ac40f4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/81ac40f4

Branch: refs/heads/YARN-6592
Commit: 81ac40f46dedd935f21083bd2315c715ad6c7267
Parents: cab36f0
Author: Arun Suresh <as...@apache.org>
Authored: Wed Jan 3 08:00:50 2018 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Tue Jan 9 11:54:47 2018 -0800

----------------------------------------------------------------------
 .../constraint/PlacementConstraintsUtil.java    | 132 +++++++++
 .../algorithm/DefaultPlacementAlgorithm.java    |  55 +---
 .../TestPlacementConstraintsUtil.java           | 287 +++++++++++++++++++
 .../constraint/TestPlacementProcessor.java      | 204 +++++++++++--
 4 files changed, 601 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/81ac40f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
new file mode 100644
index 0000000..956a3c9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.SingleConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
+
+/**
+ * This class contains various static methods used by the Placement Algorithms
+ * to simplify constrained placement.
+ * (see also {@link DefaultPlacementAlgorithm}).
+ */
+@Public
+@Unstable
+public final class PlacementConstraintsUtil {
+
+  // Suppresses default constructor, ensuring non-instantiability.
+  private PlacementConstraintsUtil() {
+  }
+
+  /**
+   * Returns true if **single** application constraint with associated
+   * allocationTags and scope is satisfied by a specific scheduler Node.
+   *
+   * @param appId the application id
+   * @param sc the placement constraint
+   * @param te the target expression
+   * @param node the scheduler node
+   * @param tm the allocation tags store
+   * @return true if single application constraint is satisfied by node
+   * @throws InvalidAllocationTagsQueryException
+   */
+  private static boolean canSatisfySingleConstraintExpression(
+      ApplicationId appId, SingleConstraint sc, TargetExpression te,
+      SchedulerNode node, AllocationTagsManager tm)
+      throws InvalidAllocationTagsQueryException {
+    long minScopeCardinality = 0;
+    long maxScopeCardinality = 0;
+    if (sc.getScope() == PlacementConstraints.NODE) {
+      minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId,
+          te.getTargetValues(), Long::max);
+      maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId,
+          te.getTargetValues(), Long::min);
+    } else if (sc.getScope() == PlacementConstraints.RACK) {
+      minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId,
+          te.getTargetValues(), Long::max);
+      maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId,
+          te.getTargetValues(), Long::min);
+    }
+    // Make sure Anti-affinity satisfies hard upper limit
+    maxScopeCardinality = sc.getMaxCardinality() == 0 ? maxScopeCardinality - 1
+        : maxScopeCardinality;
+
+    return (minScopeCardinality >= sc.getMinCardinality()
+        && maxScopeCardinality < sc.getMaxCardinality());
+  }
+
+  /**
+   * Returns true if all application constraints with associated allocationTags
+   * are **currently** satisfied by a specific scheduler Node.
+   * To do so the method retrieves and goes through all application constraint
+   * expressions and checks if the specific allocation is between the allowed
+   * min-max cardinality values under the constraint scope (Node/Rack/etc).
+   *
+   * @param appId the application id
+   * @param allocationTags the allocation tags set
+   * @param node the scheduler node
+   * @param pcm the placement constraints store
+   * @param tagsManager the allocation tags store
+   * @return true if all application constraints are satisfied by node
+   * @throws InvalidAllocationTagsQueryException
+   */
+  public static boolean canSatisfyConstraints(ApplicationId appId,
+      Set<String> allocationTags, SchedulerNode node,
+      PlacementConstraintManager pcm, AllocationTagsManager tagsManager)
+      throws InvalidAllocationTagsQueryException {
+    PlacementConstraint constraint = pcm.getConstraint(appId, allocationTags);
+    if (constraint == null) {
+      return true;
+    }
+    // Transform to SimpleConstraint
+    SingleConstraintTransformer singleTransformer =
+        new SingleConstraintTransformer(constraint);
+    constraint = singleTransformer.transform();
+    AbstractConstraint sConstraintExpr = constraint.getConstraintExpr();
+    SingleConstraint single = (SingleConstraint) sConstraintExpr;
+    // Iterate through TargetExpressions
+    Iterator<TargetExpression> expIt = single.getTargetExpressions().iterator();
+    while (expIt.hasNext()) {
+      TargetExpression currentExp = expIt.next();
+      // Supporting AllocationTag Expressions for now
+      if (currentExp.getTargetType().equals(TargetType.ALLOCATION_TAG)) {
+        // Check if conditions are met
+        if (!canSatisfySingleConstraintExpression(appId, single, currentExp,
+            node, tagsManager)) {
+          return false;
+        }
+      }
+    }
+    // return true if all targetExpressions are satisfied
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81ac40f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
index 395c156..9ed9ab1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
@@ -19,19 +19,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algor
 
 import java.util.Iterator;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
-import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
-import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput;
@@ -65,58 +62,14 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
             .getNodes(filter);
   }
 
-  /**
-   * TODO: Method will be moved to PlacementConstraintsUtil class (YARN-7682)
-   * @param applicationId
-   * @param allocationTags
-   * @param nodeId
-   * @param tagsManager
-   * @return boolean
-   * @throws InvalidAllocationTagsQueryException
-   */
-  public boolean canAssign(ApplicationId applicationId,
-      Set<String> allocationTags, NodeId nodeId,
-      AllocationTagsManager tagsManager)
-      throws InvalidAllocationTagsQueryException {
-    PlacementConstraint constraint =
-        constraintManager.getConstraint(applicationId, allocationTags);
-    if (constraint == null) {
-      return true;
-    }
-    // TODO: proper transformations
-    // Currently works only for simple anti-affinity
-    // NODE scope target expressions
-    PlacementConstraintTransformations.SpecializedConstraintTransformer transformer =
-        new PlacementConstraintTransformations.SpecializedConstraintTransformer(
-            constraint);
-    PlacementConstraint transform = transformer.transform();
-    PlacementConstraint.TargetConstraint targetConstraint =
-        (PlacementConstraint.TargetConstraint) transform.getConstraintExpr();
-    // Assume a single target expression tag;
-    // The Sample Algorithm assumes a constraint will always be a simple
-    // Target Constraint with a single entry in the target set.
-    // As mentioned in the class javadoc - This algorithm should be
-    // used mostly for testing and validating end-2-end workflow.
-    String targetTag = targetConstraint.getTargetExpressions().iterator().next()
-        .getTargetValues().iterator().next();
-    // TODO: Assuming anti-affinity constraint
-    long nodeCardinality =
-        tagsManager.getNodeCardinality(nodeId, applicationId, targetTag);
-    if (nodeCardinality != 0) {
-      return false;
-    }
-    // return true if it is a valid placement
-    return true;
-  }
-
   public boolean attemptPlacementOnNode(ApplicationId appId,
       SchedulingRequest schedulingRequest, SchedulerNode schedulerNode)
       throws InvalidAllocationTagsQueryException {
     int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations();
     if (numAllocs > 0) {
-      if (canAssign(appId,
-          schedulingRequest.getAllocationTags(), schedulerNode.getNodeID(),
-          tagsManager)) {
+      if (PlacementConstraintsUtil.canSatisfyConstraints(appId,
+          schedulingRequest.getAllocationTags(), schedulerNode,
+          constraintManager, tagsManager)) {
         return true;
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81ac40f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
new file mode 100644
index 0000000..7492233
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.RACK;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
+
+import java.util.AbstractMap;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Test the PlacementConstraint Utility class functionality.
+ */
+public class TestPlacementConstraintsUtil {
+
+  private List<RMNode> rmNodes;
+  private RMContext rmContext;
+  private static final int GB = 1024;
+  private ApplicationId appId1;
+  private PlacementConstraint c1, c2, c3, c4;
+  private Set<String> sourceTag1, sourceTag2;
+  private Map<Set<String>, PlacementConstraint> constraintMap1, constraintMap2;
+
+  @Before
+  public void setup() {
+    MockRM rm = new MockRM();
+    rm.start();
+    MockNodes.resetHostIds();
+    rmNodes = MockNodes.newNodes(2, 2, Resource.newInstance(4096, 4));
+    for (RMNode rmNode : rmNodes) {
+      rm.getRMContext().getRMNodes().putIfAbsent(rmNode.getNodeID(), rmNode);
+    }
+    rmContext = rm.getRMContext();
+
+    // Build appIDs, constraints, source tags, and constraint map.
+    long ts = System.currentTimeMillis();
+    appId1 = BuilderUtils.newApplicationId(ts, 123);
+
+    c1 = PlacementConstraints.build(targetIn(NODE, allocationTag("hbase-m")));
+    c2 = PlacementConstraints.build(targetIn(RACK, allocationTag("hbase-rs")));
+    c3 = PlacementConstraints
+        .build(targetNotIn(NODE, allocationTag("hbase-m")));
+    c4 = PlacementConstraints
+        .build(targetNotIn(RACK, allocationTag("hbase-rs")));
+
+    sourceTag1 = new HashSet<>(Arrays.asList("spark"));
+    sourceTag2 = new HashSet<>(Arrays.asList("zk"));
+
+    constraintMap1 = Stream
+        .of(new AbstractMap.SimpleEntry<>(sourceTag1, c1),
+            new AbstractMap.SimpleEntry<>(sourceTag2, c2))
+        .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey,
+            AbstractMap.SimpleEntry::getValue));
+    constraintMap2 = Stream
+        .of(new AbstractMap.SimpleEntry<>(sourceTag1, c3),
+            new AbstractMap.SimpleEntry<>(sourceTag2, c4))
+        .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey,
+            AbstractMap.SimpleEntry::getValue));
+  }
+
+  @Test
+  public void testNodeAffinityAssignment()
+      throws InvalidAllocationTagsQueryException {
+    PlacementConstraintManagerService pcm =
+        new MemoryPlacementConstraintManager();
+    AllocationTagsManager tm = new AllocationTagsManager(rmContext);
+    // Register App1 with affinity constraint map
+    pcm.registerApplication(appId1, constraintMap1);
+    // No containers are running so all 'zk' and 'spark' allocations should fail
+    // on every cluster NODE
+    Iterator<RMNode> nodeIterator = rmNodes.iterator();
+    while (nodeIterator.hasNext()) {
+      RMNode currentNode = nodeIterator.next();
+      FiCaSchedulerNode schedulerNode = TestUtils.getMockNode(
+          currentNode.getHostName(), currentNode.getRackName(), 123, 4 * GB);
+      Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+          sourceTag1, schedulerNode, pcm, tm));
+      Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+          sourceTag2, schedulerNode, pcm, tm));
+    }
+    /**
+     * Now place container:
+     * Node0:123 (Rack1):
+     *    container_app1_1 (hbase-m)
+     */
+    RMNode n0_r1 = rmNodes.get(0);
+    RMNode n1_r1 = rmNodes.get(1);
+    RMNode n2_r2 = rmNodes.get(2);
+    RMNode n3_r2 = rmNodes.get(3);
+    FiCaSchedulerNode schedulerNode0 = TestUtils
+        .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
+    FiCaSchedulerNode schedulerNode1 = TestUtils
+        .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
+    FiCaSchedulerNode schedulerNode2 = TestUtils
+        .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
+    FiCaSchedulerNode schedulerNode3 = TestUtils
+        .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
+    // 1 Containers on node 0 with allocationTag 'hbase-m'
+    ContainerId hbase_m = ContainerId
+        .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
+    tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-m"));
+
+    // 'spark' placement on Node0 should now SUCCEED
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        sourceTag1, schedulerNode0, pcm, tm));
+    // FAIL on the rest of the nodes
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        sourceTag1, schedulerNode1, pcm, tm));
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        sourceTag1, schedulerNode2, pcm, tm));
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        sourceTag1, schedulerNode3, pcm, tm));
+  }
+
+  @Test
+  public void testRackAffinityAssignment()
+      throws InvalidAllocationTagsQueryException {
+    PlacementConstraintManagerService pcm =
+        new MemoryPlacementConstraintManager();
+    AllocationTagsManager tm = new AllocationTagsManager(rmContext);
+    // Register App1 with affinity constraint map
+    pcm.registerApplication(appId1, constraintMap1);
+    /**
+     * Now place container:
+     * Node0:123 (Rack1):
+     *    container_app1_1 (hbase-rs)
+     */
+    RMNode n0_r1 = rmNodes.get(0);
+    RMNode n1_r1 = rmNodes.get(1);
+    RMNode n2_r2 = rmNodes.get(2);
+    RMNode n3_r2 = rmNodes.get(3);
+    // 1 Containers on Node0-Rack1 with allocationTag 'hbase-rs'
+    ContainerId hbase_m = ContainerId
+        .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
+    tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-rs"));
+
+    FiCaSchedulerNode schedulerNode0 = TestUtils
+        .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
+    FiCaSchedulerNode schedulerNode1 = TestUtils
+        .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
+    FiCaSchedulerNode schedulerNode2 = TestUtils
+        .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
+    FiCaSchedulerNode schedulerNode3 = TestUtils
+        .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
+    // 'zk' placement on Rack1 should now SUCCEED
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        sourceTag2, schedulerNode0, pcm, tm));
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        sourceTag2, schedulerNode1, pcm, tm));
+
+    // FAIL on the rest of the RACKs
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        sourceTag2, schedulerNode2, pcm, tm));
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        sourceTag2, schedulerNode3, pcm, tm));
+  }
+
+  @Test
+  public void testNodeAntiAffinityAssignment()
+      throws InvalidAllocationTagsQueryException {
+    PlacementConstraintManagerService pcm =
+        new MemoryPlacementConstraintManager();
+    AllocationTagsManager tm = new AllocationTagsManager(rmContext);
+    // Register App1 with anti-affinity constraint map
+    pcm.registerApplication(appId1, constraintMap2);
+    /**
+     * place container:
+     * Node0:123 (Rack1):
+     *    container_app1_1 (hbase-m)
+     */
+    RMNode n0_r1 = rmNodes.get(0);
+    RMNode n1_r1 = rmNodes.get(1);
+    RMNode n2_r2 = rmNodes.get(2);
+    RMNode n3_r2 = rmNodes.get(3);
+    FiCaSchedulerNode schedulerNode0 = TestUtils
+        .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
+    FiCaSchedulerNode schedulerNode1 = TestUtils
+        .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
+    FiCaSchedulerNode schedulerNode2 = TestUtils
+        .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
+    FiCaSchedulerNode schedulerNode3 = TestUtils
+        .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
+    // 1 Containers on node 0 with allocationTag 'hbase-m'
+    ContainerId hbase_m = ContainerId
+        .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
+    tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-m"));
+
+    // 'spark' placement on Node0 should now FAIL
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        sourceTag1, schedulerNode0, pcm, tm));
+    // SUCCEED on the rest of the nodes
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        sourceTag1, schedulerNode1, pcm, tm));
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        sourceTag1, schedulerNode2, pcm, tm));
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        sourceTag1, schedulerNode3, pcm, tm));
+  }
+
+  @Test
+  public void testRackAntiAffinityAssignment()
+      throws InvalidAllocationTagsQueryException {
+    AllocationTagsManager tm = new AllocationTagsManager(rmContext);
+    PlacementConstraintManagerService pcm =
+        new MemoryPlacementConstraintManager();
+    // Register App1 with anti-affinity constraint map
+    pcm.registerApplication(appId1, constraintMap2);
+    /**
+     * Place container:
+     * Node0:123 (Rack1):
+     *    container_app1_1 (hbase-rs)
+     */
+    RMNode n0_r1 = rmNodes.get(0);
+    RMNode n1_r1 = rmNodes.get(1);
+    RMNode n2_r2 = rmNodes.get(2);
+    RMNode n3_r2 = rmNodes.get(3);
+    // 1 Containers on Node0-Rack1 with allocationTag 'hbase-rs'
+    ContainerId hbase_m = ContainerId
+        .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
+    tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-rs"));
+
+    FiCaSchedulerNode schedulerNode0 = TestUtils
+        .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB);
+    FiCaSchedulerNode schedulerNode1 = TestUtils
+        .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB);
+    FiCaSchedulerNode schedulerNode2 = TestUtils
+        .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB);
+    FiCaSchedulerNode schedulerNode3 = TestUtils
+        .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB);
+
+    // 'zk' placement on Rack1 should FAIL
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        sourceTag2, schedulerNode0, pcm, tm));
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        sourceTag2, schedulerNode1, pcm, tm));
+
+    // SUCCEED on the rest of the RACKs
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        sourceTag2, schedulerNode2, pcm, tm));
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
+        sourceTag2, schedulerNode3, pcm, tm));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81ac40f4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
index 87dd5b7..c260fe0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.RejectionReason;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceSizing;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -48,16 +49,21 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import static java.lang.Thread.sleep;
 import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
 import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetCardinality;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
 
 /**
  * This tests end2end workflow of the constraint placement framework.
@@ -104,7 +110,7 @@ public class TestPlacementProcessor {
   }
 
   @Test(timeout = 300000)
-  public void testPlacement() throws Exception {
+  public void testAntiAffinityPlacement() throws Exception {
     HashMap<NodeId, MockNM> nodes = new HashMap<>();
     MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
     nodes.put(nm1.getNodeId(), nm1);
@@ -120,44 +126,174 @@ public class TestPlacementProcessor {
     nm4.registerNode();
 
     RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    // Containers with allocationTag 'foo' are restricted to 1 per NODE
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
-        Collections.singletonMap(
-            Collections.singleton("foo"),
+        Collections.singletonMap(Collections.singleton("foo"),
             PlacementConstraints.build(
-                PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))
-        ));
+                PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))));
     am1.addSchedulingRequest(
-        Arrays.asList(
-            schedulingRequest(1, 1, 1, 512, "foo"),
+        Arrays.asList(schedulingRequest(1, 1, 1, 512, "foo"),
             schedulingRequest(1, 2, 1, 512, "foo"),
             schedulingRequest(1, 3, 1, 512, "foo"),
-            schedulingRequest(1, 5, 1, 512, "foo"))
-    );
+            schedulingRequest(1, 5, 1, 512, "foo")));
     AllocateResponse allocResponse = am1.schedule(); // send the request
     List<Container> allocatedContainers = new ArrayList<>();
     allocatedContainers.addAll(allocResponse.getAllocatedContainers());
 
     // kick the scheduler
-
-    while (allocatedContainers.size() < 4) {
-      nm1.nodeHeartbeat(true);
-      nm2.nodeHeartbeat(true);
-      nm3.nodeHeartbeat(true);
-      nm4.nodeHeartbeat(true);
-      LOG.info("Waiting for containers to be created for app 1...");
-      sleep(1000);
-      allocResponse = am1.schedule();
-      allocatedContainers.addAll(allocResponse.getAllocatedContainers());
-    }
+    waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 4);
 
     Assert.assertEquals(4, allocatedContainers.size());
-    Set<NodeId> nodeIds = allocatedContainers.stream()
-        .map(x -> x.getNodeId()).collect(Collectors.toSet());
-    // Ensure unique nodes
+    Set<NodeId> nodeIds = allocatedContainers.stream().map(x -> x.getNodeId())
+        .collect(Collectors.toSet());
+    // Ensure unique nodes (antiaffinity)
     Assert.assertEquals(4, nodeIds.size());
   }
 
   @Test(timeout = 300000)
+  public void testCardinalityPlacement() throws Exception {
+    HashMap<NodeId, MockNM> nodes = new HashMap<>();
+    MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm1.getNodeId(), nm1);
+    MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm2.getNodeId(), nm2);
+    MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm3.getNodeId(), nm3);
+    MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm4.getNodeId(), nm4);
+    nm1.registerNode();
+    nm2.registerNode();
+    nm3.registerNode();
+    nm4.registerNode();
+
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    // Containers with allocationTag 'foo' should not exceed 4 per NODE
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
+        Collections.singletonMap(Collections.singleton("foo"),
+            PlacementConstraints.build(PlacementConstraints
+                .targetCardinality(NODE, 0, 4, allocationTag("foo")))));
+    am1.addSchedulingRequest(
+        Arrays.asList(schedulingRequest(1, 1, 1, 512, "foo"),
+            schedulingRequest(1, 2, 1, 512, "foo"),
+            schedulingRequest(1, 3, 1, 512, "foo"),
+            schedulingRequest(1, 4, 1, 512, "foo"),
+            schedulingRequest(1, 5, 1, 512, "foo"),
+            schedulingRequest(1, 6, 1, 512, "foo"),
+            schedulingRequest(1, 7, 1, 512, "foo"),
+            schedulingRequest(1, 8, 1, 512, "foo")));
+    AllocateResponse allocResponse = am1.schedule(); // send the request
+    List<Container> allocatedContainers = new ArrayList<>();
+    allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+
+    // kick the scheduler
+    waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 8);
+
+    Assert.assertEquals(8, allocatedContainers.size());
+    Map<NodeId, Long> nodeIdContainerIdMap =
+        allocatedContainers.stream().collect(
+            Collectors.groupingBy(c -> c.getNodeId(), Collectors.counting()));
+    // Ensure no more than 4 containers per node
+    for (NodeId n : nodeIdContainerIdMap.keySet()) {
+      Assert.assertTrue(nodeIdContainerIdMap.get(n) < 5);
+    }
+  }
+
+  @Test(timeout = 300000)
+  public void testAffinityPlacement() throws Exception {
+    HashMap<NodeId, MockNM> nodes = new HashMap<>();
+    MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm1.getNodeId(), nm1);
+    MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm2.getNodeId(), nm2);
+    MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm3.getNodeId(), nm3);
+    MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm4.getNodeId(), nm4);
+    nm1.registerNode();
+    nm2.registerNode();
+    nm3.registerNode();
+    nm4.registerNode();
+
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    // Containers with allocationTag 'foo' should be placed where
+    // containers with allocationTag 'bar' are already running
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
+        Collections.singletonMap(Collections.singleton("foo"),
+            PlacementConstraints.build(
+                PlacementConstraints.targetIn(NODE, allocationTag("bar")))));
+    am1.addSchedulingRequest(
+        Arrays.asList(schedulingRequest(1, 1, 1, 512, "bar"),
+            schedulingRequest(1, 2, 1, 512, "foo"),
+            schedulingRequest(1, 3, 1, 512, "foo"),
+            schedulingRequest(1, 4, 1, 512, "foo"),
+            schedulingRequest(1, 5, 1, 512, "foo")));
+    AllocateResponse allocResponse = am1.schedule(); // send the request
+    List<Container> allocatedContainers = new ArrayList<>();
+    allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+
+    // kick the scheduler
+    waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 5);
+
+    Assert.assertEquals(5, allocatedContainers.size());
+    Set<NodeId> nodeIds = allocatedContainers.stream().map(x -> x.getNodeId())
+        .collect(Collectors.toSet());
+    // Ensure all containers end up on the same node (affinity)
+    Assert.assertEquals(1, nodeIds.size());
+  }
+
+  @Test(timeout = 300000)
+  public void testComplexPlacement() throws Exception {
+    HashMap<NodeId, MockNM> nodes = new HashMap<>();
+    MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm1.getNodeId(), nm1);
+    MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm2.getNodeId(), nm2);
+    MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm3.getNodeId(), nm3);
+    MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm4.getNodeId(), nm4);
+    nm1.registerNode();
+    nm2.registerNode();
+    nm3.registerNode();
+    nm4.registerNode();
+
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    Map<Set<String>, PlacementConstraint> constraintMap = new HashMap<>();
+    // Containers with allocationTag 'bar' should not exceed 1 per NODE
+    constraintMap.put(Collections.singleton("bar"),
+        PlacementConstraints.build(targetNotIn(NODE, allocationTag("bar"))));
+    // Containers with allocationTag 'foo' should be placed where 'bar' exists
+    constraintMap.put(Collections.singleton("foo"),
+        PlacementConstraints.build(targetIn(NODE, allocationTag("bar"))));
+    // Containers with allocationTag 'foo' should not exceed 2 per NODE
+    constraintMap.put(Collections.singleton("foo"), PlacementConstraints
+        .build(targetCardinality(NODE, 0, 2, allocationTag("foo"))));
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, constraintMap);
+    am1.addSchedulingRequest(
+        Arrays.asList(schedulingRequest(1, 1, 1, 512, "bar"),
+            schedulingRequest(1, 2, 1, 512, "bar"),
+            schedulingRequest(1, 3, 1, 512, "foo"),
+            schedulingRequest(1, 4, 1, 512, "foo"),
+            schedulingRequest(1, 5, 1, 512, "foo"),
+            schedulingRequest(1, 6, 1, 512, "foo")));
+    AllocateResponse allocResponse = am1.schedule(); // send the request
+    List<Container> allocatedContainers = new ArrayList<>();
+    allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+
+    // kick the scheduler
+    waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 6);
+
+    Assert.assertEquals(6, allocatedContainers.size());
+    Map<NodeId, Long> nodeIdContainerIdMap =
+        allocatedContainers.stream().collect(
+            Collectors.groupingBy(c -> c.getNodeId(), Collectors.counting()));
+    // Ensure no more than 3 containers per node (1 'bar', 2 'foo')
+    for (NodeId n : nodeIdContainerIdMap.keySet()) {
+      Assert.assertTrue(nodeIdContainerIdMap.get(n) < 4);
+    }
+  }
+
+  @Test(timeout = 300000)
   public void testSchedulerRejection() throws Exception {
     HashMap<NodeId, MockNM> nodes = new HashMap<>();
     MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
@@ -174,6 +310,7 @@ public class TestPlacementProcessor {
     nm4.registerNode();
 
     RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    // Containers with allocationTag 'foo' are restricted to 1 per NODE
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
         Collections.singletonMap(
             Collections.singleton("foo"),
@@ -196,7 +333,6 @@ public class TestPlacementProcessor {
     rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
 
     // kick the scheduler
-
     while (allocCount < 11) {
       nm1.nodeHeartbeat(true);
       nm2.nodeHeartbeat(true);
@@ -253,9 +389,10 @@ public class TestPlacementProcessor {
     nm2.registerNode();
     nm3.registerNode();
     nm4.registerNode();
-    // No not register nm5 yet..
+    // Do not register nm5 yet..
 
     RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    // Containers with allocationTag 'foo' are restricted to 1 per NODE
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
         Collections.singletonMap(
             Collections.singleton("foo"),
@@ -323,6 +460,7 @@ public class TestPlacementProcessor {
     nm4.registerNode();
 
     RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    // Containers with allocationTag 'foo' are restricted to 1 per NODE
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
         Collections.singletonMap(
             Collections.singleton("foo"),
@@ -346,7 +484,6 @@ public class TestPlacementProcessor {
     rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
 
     // kick the scheduler
-
     while (allocCount < 11) {
       nm1.nodeHeartbeat(true);
       nm2.nodeHeartbeat(true);
@@ -373,6 +510,21 @@ public class TestPlacementProcessor {
         rej.getReason());
   }
 
+  private static void waitForContainerAllocation(Collection<MockNM> nodes,
+      MockAM am, List<Container> allocatedContainers, int containerNum)
+      throws Exception {
+    while (allocatedContainers.size() < containerNum) {
+      for (MockNM node : nodes) {
+        node.nodeHeartbeat(true);
+      }
+      LOG.info("Waiting for containers to be created for "
+          + am.getApplicationAttemptId().getApplicationId() + "...");
+      sleep(1000);
+      AllocateResponse allocResponse = am.schedule();
+      allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+    }
+  }
+
   protected static SchedulingRequest schedulingRequest(
       int priority, long allocReqId, int cores, int mem, String... tags) {
     return schedulingRequest(priority, allocReqId, cores, mem,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org