You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ww...@apache.org on 2018/08/28 09:33:32 UTC

hadoop git commit: YARN-8721. Relax NE node-attribute check when attribute doesn't exist on a node. Contributed by Sunil Govindan.

Repository: hadoop
Updated Branches:
  refs/heads/YARN-3409 87f236cf0 -> 23433323e


YARN-8721. Relax NE node-attribute check when attribute doesn't exist on a node. Contributed by Sunil Govindan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/23433323
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/23433323
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/23433323

Branch: refs/heads/YARN-3409
Commit: 23433323ef901373a31eabba6b97b18def67320b
Parents: 87f236c
Author: Weiwei Yang <ww...@apache.org>
Authored: Tue Aug 28 17:25:19 2018 +0800
Committer: Weiwei Yang <ww...@apache.org>
Committed: Tue Aug 28 17:25:19 2018 +0800

----------------------------------------------------------------------
 .../yarn/nodelabels/NodeAttributesManager.java  |   7 +
 .../nodelabels/NodeAttributesManagerImpl.java   |  23 +++
 .../scheduler/capacity/CapacityScheduler.java   |   6 +
 .../constraint/PlacementConstraintsUtil.java    |  90 +++++----
 .../scheduler/capacity/TestUtils.java           |  20 +-
 ...stSingleConstraintAppPlacementAllocator.java | 192 ++++++++++++++++++-
 6 files changed, 286 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/23433323/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
index 68c6ec6..20f72d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.NodeAttribute;
 import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
 
 /**
@@ -126,4 +127,10 @@ public abstract class NodeAttributesManager extends AbstractService {
 
   // futuristic
   // public set<NodeId> getNodesMatchingExpression(String nodeLabelExp);
+
+  /**
+   * Refresh node attributes on a given node during RM recovery.
+   * @param nodeId Node Id
+   */
+  public abstract void refreshNodeAttributesToScheduler(NodeId nodeId);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/23433323/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
index fd1037c..9cef87e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
@@ -724,4 +724,27 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
   public void setRMContext(RMContext context) {
     this.rmContext  = context;
   }
+
+  /**
+   * Refresh node attributes on a given node during RM recovery.
+   * @param nodeId Node Id
+   */
+  public void refreshNodeAttributesToScheduler(NodeId nodeId) {
+    String hostName = nodeId.getHost();
+    Map<String, Set<NodeAttribute>> newNodeToAttributesMap =
+        new HashMap<>();
+    Host host = nodeCollections.get(hostName);
+    if (host == null || host.attributes == null) {
+      return;
+    }
+    newNodeToAttributesMap.put(hostName, host.attributes.keySet());
+
+    // Notify RM
+    if (rmContext != null && rmContext.getDispatcher() != null) {
+      LOG.info("Updated NodeAttribute event to RM:" + newNodeToAttributesMap
+          .values());
+      rmContext.getDispatcher().getEventHandler().handle(
+          new NodeAttributesUpdateSchedulerEvent(newNodeToAttributesMap));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/23433323/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index ffba701..a5644db 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1989,6 +1989,12 @@ public class CapacityScheduler extends
             schedulerNode.getTotalResource());
       }
 
+      // recover attributes from store if any.
+      if (rmContext.getNodeAttributesManager() != null) {
+        rmContext.getNodeAttributesManager()
+            .refreshNodeAttributesToScheduler(schedulerNode.getNodeID());
+      }
+
       Resource clusterResource = getClusterResource();
       getRootQueue().updateClusterResource(clusterResource,
           new ResourceLimits(clusterResource));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/23433323/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
index ccd334c..0227296 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
@@ -113,7 +113,7 @@ public final class PlacementConstraintsUtil {
             || maxScopeCardinality <= desiredMaxCardinality);
   }
 
-  private static boolean canSatisfyNodeConstraintExpresssion(
+  private static boolean canSatisfyNodeConstraintExpression(
       SingleConstraint sc, TargetExpression targetExpression,
       SchedulerNode schedulerNode) {
     Set<String> values = targetExpression.getTargetValues();
@@ -138,45 +138,67 @@ public final class PlacementConstraintsUtil {
         return true;
       }
 
-      if (schedulerNode.getNodeAttributes() == null ||
-          !schedulerNode.getNodeAttributes().contains(requestAttribute)) {
-        if(LOG.isDebugEnabled()) {
+      return getNodeConstraintEvaluatedResult(schedulerNode, opCode,
+          requestAttribute);
+    }
+    return true;
+  }
+
+  private static boolean getNodeConstraintEvaluatedResult(
+      SchedulerNode schedulerNode,
+      NodeAttributeOpCode opCode, NodeAttribute requestAttribute) {
+    // In case, attributes in a node is empty or incoming attributes doesn't
+    // exist on given node, accept such nodes for scheduling if opCode is
+    // equals to NE. (for eg. java != 1.8 could be scheduled on a node
+    // where java is not configured.)
+    if (schedulerNode.getNodeAttributes() == null ||
+        !schedulerNode.getNodeAttributes().contains(requestAttribute)) {
+      if (opCode == NodeAttributeOpCode.NE) {
+        if (LOG.isDebugEnabled()) {
           LOG.debug("Incoming requestAttribute:" + requestAttribute
-              + "is not present in " + schedulerNode.getNodeID());
+              + "is not present in " + schedulerNode.getNodeID()
+              + ", however opcode is NE. Hence accept this node.");
         }
-        return false;
+        return true;
       }
-      boolean found = false;
-      for (Iterator<NodeAttribute> it = schedulerNode.getNodeAttributes()
-          .iterator(); it.hasNext();) {
-        NodeAttribute nodeAttribute = it.next();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Starting to compare Incoming requestAttribute :"
-              + requestAttribute
-              + " with requestAttribute value= " + requestAttribute
-              .getAttributeValue()
-              + ", stored nodeAttribute value=" + nodeAttribute
-              .getAttributeValue());
-        }
-        if (requestAttribute.equals(nodeAttribute)) {
-          if (isOpCodeMatches(requestAttribute, nodeAttribute, opCode)) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(
-                  "Incoming requestAttribute:" + requestAttribute
-                      + " matches with node:" + schedulerNode.getNodeID());
-            }
-            found = true;
-            return found;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Incoming requestAttribute:" + requestAttribute
+            + "is not present in " + schedulerNode.getNodeID()
+            + ", skip such node.");
+      }
+      return false;
+    }
+
+    boolean found = false;
+    for (Iterator<NodeAttribute> it = schedulerNode.getNodeAttributes()
+        .iterator(); it.hasNext();) {
+      NodeAttribute nodeAttribute = it.next();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Starting to compare Incoming requestAttribute :"
+            + requestAttribute
+            + " with requestAttribute value= " + requestAttribute
+            .getAttributeValue()
+            + ", stored nodeAttribute value=" + nodeAttribute
+            .getAttributeValue());
+      }
+      if (requestAttribute.equals(nodeAttribute)) {
+        if (isOpCodeMatches(requestAttribute, nodeAttribute, opCode)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                "Incoming requestAttribute:" + requestAttribute
+                    + " matches with node:" + schedulerNode.getNodeID());
           }
+          found = true;
+          return found;
         }
       }
-      if (!found) {
-        if(LOG.isDebugEnabled()) {
-          LOG.info("skip this node:" + schedulerNode.getNodeID()
-              + " for requestAttribute:" + requestAttribute);
-        }
-        return false;
+    }
+    if (!found) {
+      if (LOG.isDebugEnabled()) {
+        LOG.info("skip this node:" + schedulerNode.getNodeID()
+            + " for requestAttribute:" + requestAttribute);
       }
+      return false;
     }
     return true;
   }
@@ -217,7 +239,7 @@ public final class PlacementConstraintsUtil {
         }
       } else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE)) {
         // This is a node attribute expression, check it.
-        if (!canSatisfyNodeConstraintExpresssion(singleConstraint, currentExp,
+        if (!canSatisfyNodeConstraintExpression(singleConstraint, currentExp,
             schedulerNode)) {
           return false;
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/23433323/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index b13790d..c692bae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -22,16 +22,7 @@ import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceInformation;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -205,7 +196,14 @@ public class TestUtils {
     ApplicationId applicationId = BuilderUtils.newApplicationId(0l, appId);
     return ApplicationAttemptId.newInstance(applicationId, attemptId);
   }
-  
+
+  public static FiCaSchedulerNode getMockNodeWithAttributes(String host,
+      String rack, int port, int memory, Set<NodeAttribute> attributes) {
+    FiCaSchedulerNode node = getMockNode(host, rack, port, memory, 1);
+    when(node.getNodeAttributes()).thenReturn(attributes);
+    return node;
+  }
+
   public static FiCaSchedulerNode getMockNode(String host, String rack,
       int port, int memory) {
     return getMockNode(host, rack, port, memory, 1);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/23433323/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java
index 902c6d5..a665b8d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java
@@ -18,14 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
 
+import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceSizing;
-import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
 import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -44,6 +38,8 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.util.HashSet;
+import java.util.Set;
 import java.util.function.LongBinaryOperator;
 
 import static org.mockito.Matchers.any;
@@ -326,4 +322,186 @@ public class TestSingleConstraintAppPlacementAllocator {
     Assert.assertFalse(allocator
         .precheckNode(node2, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
   }
+
+  @Test
+  public void testNodeAttributesFunctionality() {
+    // 1. Simple java=1.8 validation
+    SchedulingRequest schedulingRequest =
+        SchedulingRequest.newBuilder().executionType(
+            ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+            .allocationRequestId(10L).priority(Priority.newInstance(1))
+            .placementConstraintExpression(PlacementConstraints
+                .targetNodeAttribute(PlacementConstraints.NODE,
+                    NodeAttributeOpCode.EQ,
+                    PlacementConstraints.PlacementTargets
+                        .nodeAttribute("java", "1.8"),
+                    PlacementConstraints.PlacementTargets.nodePartition(""))
+                .build()).resourceSizing(
+            ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+            .build();
+    allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
+    Set<NodeAttribute> attributes = new HashSet<>();
+    attributes.add(
+        NodeAttribute.newInstance("java", NodeAttributeType.STRING, "1.8"));
+    boolean result = allocator.canAllocate(NodeType.NODE_LOCAL,
+        TestUtils.getMockNodeWithAttributes("host1", "/rack1", 123, 1024,
+            attributes));
+    Assert.assertTrue("Allocation should be success for java=1.8", result);
+
+    // 2. verify python!=3 validation
+    SchedulingRequest schedulingRequest2 =
+        SchedulingRequest.newBuilder().executionType(
+            ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+            .allocationRequestId(10L).priority(Priority.newInstance(1))
+            .placementConstraintExpression(PlacementConstraints
+                .targetNodeAttribute(PlacementConstraints.NODE,
+                    NodeAttributeOpCode.NE,
+                    PlacementConstraints.PlacementTargets
+                        .nodeAttribute("python", "3"),
+                    PlacementConstraints.PlacementTargets.nodePartition(""))
+                .build()).resourceSizing(
+            ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+            .build();
+    // Create allocator
+    allocator = new SingleConstraintAppPlacementAllocator();
+    allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+    allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest2, false);
+    attributes = new HashSet<>();
+    result = allocator.canAllocate(NodeType.NODE_LOCAL,
+        TestUtils.getMockNodeWithAttributes("host1", "/rack1", 123, 1024,
+            attributes));
+    Assert.assertTrue("Allocation should be success as python doesn't exist",
+        result);
+
+    // 3. verify python!=3 validation when node has python=2
+    allocator = new SingleConstraintAppPlacementAllocator();
+    allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+    allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest2, false);
+    attributes = new HashSet<>();
+    attributes.add(
+        NodeAttribute.newInstance("python", NodeAttributeType.STRING, "2"));
+    result = allocator.canAllocate(NodeType.NODE_LOCAL,
+        TestUtils.getMockNodeWithAttributes("host1", "/rack1", 123, 1024,
+            attributes));
+    Assert.assertTrue(
+        "Allocation should be success as python=3 doesn't exist in node",
+        result);
+
+    // 4. verify python!=3 validation when node has python=3
+    allocator = new SingleConstraintAppPlacementAllocator();
+    allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+    allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest2, false);
+    attributes = new HashSet<>();
+    attributes.add(
+        NodeAttribute.newInstance("python", NodeAttributeType.STRING, "3"));
+    result = allocator.canAllocate(NodeType.NODE_LOCAL,
+        TestUtils.getMockNodeWithAttributes("host1", "/rack1", 123, 1024,
+            attributes));
+    Assert.assertFalse("Allocation should fail as python=3 exist in node",
+        result);
+  }
+
+  @Test
+  public void testConjunctionNodeAttributesFunctionality() {
+    // 1. verify and(python!=3:java=1.8) validation when node has python=3
+    SchedulingRequest schedulingRequest1 =
+        SchedulingRequest.newBuilder().executionType(
+            ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+            .allocationRequestId(10L).priority(Priority.newInstance(1))
+            .placementConstraintExpression(
+                PlacementConstraints.and(
+                    PlacementConstraints
+                        .targetNodeAttribute(PlacementConstraints.NODE,
+                            NodeAttributeOpCode.NE,
+                            PlacementConstraints.PlacementTargets
+                                .nodeAttribute("python", "3")),
+                    PlacementConstraints
+                        .targetNodeAttribute(PlacementConstraints.NODE,
+                            NodeAttributeOpCode.EQ,
+                            PlacementConstraints.PlacementTargets
+                                .nodeAttribute("java", "1.8")))
+                    .build()).resourceSizing(
+            ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+            .build();
+    allocator = new SingleConstraintAppPlacementAllocator();
+    allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+    allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest1, false);
+    Set<NodeAttribute> attributes = new HashSet<>();
+    attributes.add(
+        NodeAttribute.newInstance("python", NodeAttributeType.STRING, "3"));
+    attributes.add(
+        NodeAttribute.newInstance("java", NodeAttributeType.STRING, "1.8"));
+    boolean result = allocator.canAllocate(NodeType.NODE_LOCAL,
+        TestUtils.getMockNodeWithAttributes("host1", "/rack1", 123, 1024,
+            attributes));
+    Assert.assertFalse("Allocation should fail as python=3 exists in node",
+        result);
+
+    // 2. verify and(python!=3:java=1.8) validation when node has python=2
+    // and java=1.8
+    allocator = new SingleConstraintAppPlacementAllocator();
+    allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+    allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest1, false);
+    attributes = new HashSet<>();
+    attributes.add(
+        NodeAttribute.newInstance("python", NodeAttributeType.STRING, "2"));
+    attributes.add(
+        NodeAttribute.newInstance("java", NodeAttributeType.STRING, "1.8"));
+    result = allocator.canAllocate(NodeType.NODE_LOCAL,
+        TestUtils.getMockNodeWithAttributes("host1", "/rack1", 123, 1024,
+            attributes));
+    Assert.assertTrue("Allocation should be success as python=2 exists in node",
+        result);
+
+    // 3. verify or(python!=3:java=1.8) validation when node has python=3
+    SchedulingRequest schedulingRequest2 =
+        SchedulingRequest.newBuilder().executionType(
+            ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+            .allocationRequestId(10L).priority(Priority.newInstance(1))
+            .placementConstraintExpression(
+                PlacementConstraints.or(
+                    PlacementConstraints
+                        .targetNodeAttribute(PlacementConstraints.NODE,
+                            NodeAttributeOpCode.NE,
+                            PlacementConstraints.PlacementTargets
+                                .nodeAttribute("python", "3")),
+                    PlacementConstraints
+                        .targetNodeAttribute(PlacementConstraints.NODE,
+                            NodeAttributeOpCode.EQ,
+                            PlacementConstraints.PlacementTargets
+                                .nodeAttribute("java", "1.8")))
+                    .build()).resourceSizing(
+            ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
+            .build();
+    allocator = new SingleConstraintAppPlacementAllocator();
+    allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+    allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest2, false);
+    attributes = new HashSet<>();
+    attributes.add(
+        NodeAttribute.newInstance("python", NodeAttributeType.STRING, "3"));
+    attributes.add(
+        NodeAttribute.newInstance("java", NodeAttributeType.STRING, "1.8"));
+    result = allocator.canAllocate(NodeType.NODE_LOCAL,
+        TestUtils.getMockNodeWithAttributes("host1", "/rack1", 123, 1024,
+            attributes));
+    Assert.assertTrue("Allocation should be success as java=1.8 exists in node",
+        result);
+
+    // 4. verify or(python!=3:java=1.8) validation when node has python=3
+    // and java=1.7.
+    allocator = new SingleConstraintAppPlacementAllocator();
+    allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+    allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest2, false);
+    attributes = new HashSet<>();
+    attributes.add(
+        NodeAttribute.newInstance("python", NodeAttributeType.STRING, "3"));
+    attributes.add(
+        NodeAttribute.newInstance("java", NodeAttributeType.STRING, "1.7"));
+    result = allocator.canAllocate(NodeType.NODE_LOCAL,
+        TestUtils.getMockNodeWithAttributes("host1", "/rack1", 123, 1024,
+            attributes));
+    Assert
+        .assertFalse("Allocation should fail as java=1.8 doesnt exist in node",
+            result);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org