You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/31 15:55:40 UTC

[10/37] hadoop git commit: YARN-7783. Add validation step to ensure constraints are not violated due to order in which a request is processed. (asuresh)

YARN-7783. Add validation step to ensure constraints are not violated due to order in which a request is processed. (asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a4c539fc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a4c539fc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a4c539fc

Branch: refs/heads/YARN-6592
Commit: a4c539fcdba817e313b2375abf2c4c9a1d13a4fd
Parents: 9b81cb0
Author: Arun Suresh <as...@apache.org>
Authored: Tue Jan 23 08:15:58 2018 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Wed Jan 31 01:30:17 2018 -0800

----------------------------------------------------------------------
 .../algorithm/DefaultPlacementAlgorithm.java    | 119 +++++++++++++++++--
 .../constraint/TestPlacementProcessor.java      |  49 ++++++++
 2 files changed, 155 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4c539fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
index 9887749..4e6473f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
@@ -69,13 +70,9 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
   public boolean attemptPlacementOnNode(ApplicationId appId,
       SchedulingRequest schedulingRequest, SchedulerNode schedulerNode)
       throws InvalidAllocationTagsQueryException {
-    int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations();
-    if (numAllocs > 0) {
-      if (PlacementConstraintsUtil.canSatisfyConstraints(appId,
-          schedulingRequest, schedulerNode,
-          constraintManager, tagsManager)) {
-        return true;
-      }
+    if (PlacementConstraintsUtil.canSatisfyConstraints(appId,
+        schedulingRequest, schedulerNode, constraintManager, tagsManager)) {
+      return true;
     }
     return false;
   }
@@ -93,6 +90,9 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
     int rePlacementCount = RE_ATTEMPT_COUNT;
     while (rePlacementCount > 0) {
       doPlacement(requests, resp, allNodes, rejectedRequests);
+      // Double check if placement constraints are really satisfied
+      validatePlacement(requests.getApplicationId(), resp,
+          rejectedRequests);
       if (rejectedRequests.size() == 0 || rePlacementCount == 1) {
         break;
       }
@@ -122,9 +122,14 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
         break;
       }
       SchedulingRequest schedulingRequest = requestIterator.next();
+      PlacedSchedulingRequest placedReq =
+          new PlacedSchedulingRequest(schedulingRequest);
+      placedReq.setPlacementAttempt(requests.getPlacementAttempt());
+      resp.getPlacedRequests().add(placedReq);
       CircularIterator<SchedulerNode> nodeIter =
           new CircularIterator(lastSatisfiedNode, nIter, allNodes);
-      int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations();
+      int numAllocs =
+          schedulingRequest.getResourceSizing().getNumAllocations();
       while (nodeIter.hasNext() && numAllocs > 0) {
         SchedulerNode node = nodeIter.next();
         try {
@@ -135,11 +140,7 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
                   requests.getApplicationId(), schedulingRequest, node)) {
             schedulingRequest.getResourceSizing()
                 .setNumAllocations(--numAllocs);
-            PlacedSchedulingRequest placedReq =
-                new PlacedSchedulingRequest(schedulingRequest);
-            placedReq.setPlacementAttempt(requests.getPlacementAttempt());
             placedReq.getNodes().add(node);
-            resp.getPlacedRequests().add(placedReq);
             numAllocs =
                 schedulingRequest.getResourceSizing().getNumAllocations();
             // Add temp-container tags for current placement cycle
@@ -156,6 +157,98 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
     // Add all requests whose numAllocations still > 0 to rejected list.
     requests.getSchedulingRequests().stream()
         .filter(sReq -> sReq.getResourceSizing().getNumAllocations() > 0)
-        .forEach(rejReq -> rejectedRequests.add(rejReq));
+        .forEach(rejReq -> rejectedRequests.add(cloneReq(rejReq)));
   }
+
+  /**
+   * During the placement phase, allocation tags are added to the node if the
+   * constraint is satisfied, But depending on the order in which the
+   * algorithm sees the request, it is possible that a constraint that happened
+   * to be valid during placement of an earlier-seen request, might not be
+   * valid after all subsequent requests have been placed.
+   *
+   * For eg:
+   *   Assume nodes n1, n2, n3, n4 and n5
+   *
+   *   Consider the 2 constraints:
+   *   1) "foo", anti-affinity with "foo"
+   *   2) "bar", anti-affinity with "foo"
+   *
+   *   And 2 requests
+   *   req1: NumAllocations = 4, allocTags = [foo]
+   *   req2: NumAllocations = 1, allocTags = [bar]
+   *
+   *   If "req1" is seen first, the algorithm can place the 4 containers in
+   *   n1, n2, n3 and n4. And when it gets to "req2", it will see that 4 nodes
+   *   with the "foo" tag and will place on n5.
+   *   But if "req2" is seem first, then "bar" will be placed on any node,
+   *   since no node currently has "foo", and when it gets to "req1", since
+   *   "foo" has not anti-affinity with "bar", the algorithm can end up placing
+   *   "foo" on a node with "bar" violating the second constraint.
+   *
+   * To prevent the above, we need a validation step: after the placements for a
+   * batch of requests are made, for each req, we remove its tags from the node
+   * and try to see of constraints are still satisfied if the tag were to be
+   * added back on the node.
+   *
+   *   When applied to the example above, after "req2" and "req1" are placed,
+   *   we remove the "bar" tag from the node and try to add it back on the node.
+   *   This time, constraint satisfaction will fail, since there is now a "foo"
+   *   tag on the node and "bar" cannot be added. The algorithm will then
+   *   retry placing "req2" on another node.
+   *
+   * @param applicationId
+   * @param resp
+   * @param rejectedRequests
+   */
+  private void validatePlacement(ApplicationId applicationId,
+      ConstraintPlacementAlgorithmOutput resp,
+      List<SchedulingRequest> rejectedRequests) {
+    Iterator<PlacedSchedulingRequest> pReqIter =
+        resp.getPlacedRequests().iterator();
+    while (pReqIter.hasNext()) {
+      PlacedSchedulingRequest pReq = pReqIter.next();
+      Iterator<SchedulerNode> nodeIter = pReq.getNodes().iterator();
+      // Assuming all reqs were satisfied.
+      int num = 0;
+      while (nodeIter.hasNext()) {
+        SchedulerNode node = nodeIter.next();
+        try {
+          // Remove just the tags for this placement.
+          this.tagsManager.removeTempTags(node.getNodeID(),
+              applicationId, pReq.getSchedulingRequest().getAllocationTags());
+          if (!attemptPlacementOnNode(
+              applicationId, pReq.getSchedulingRequest(), node)) {
+            nodeIter.remove();
+            num++;
+          } else {
+            // Add back the tags if everything is fine.
+            this.tagsManager.addTempTags(node.getNodeID(),
+                applicationId, pReq.getSchedulingRequest().getAllocationTags());
+          }
+        } catch (InvalidAllocationTagsQueryException e) {
+          LOG.warn("Got exception from TagManager !", e);
+        }
+      }
+      if (num > 0) {
+        SchedulingRequest sReq = cloneReq(pReq.getSchedulingRequest());
+        sReq.getResourceSizing().setNumAllocations(num);
+        rejectedRequests.add(sReq);
+      }
+      if (pReq.getNodes().isEmpty()) {
+        pReqIter.remove();
+      }
+    }
+  }
+
+  private static SchedulingRequest cloneReq(SchedulingRequest sReq) {
+    return SchedulingRequest.newInstance(
+        sReq.getAllocationRequestId(), sReq.getPriority(),
+        sReq.getExecutionType(), sReq.getAllocationTags(),
+        ResourceSizing.newInstance(
+            sReq.getResourceSizing().getNumAllocations(),
+            sReq.getResourceSizing().getResources()),
+        sReq.getPlacementConstraint());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4c539fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
index 65daeb8..8426b20 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
@@ -151,6 +151,55 @@ public class TestPlacementProcessor {
   }
 
   @Test(timeout = 300000)
+  public void testMutualAntiAffinityPlacement() throws Exception {
+    HashMap<NodeId, MockNM> nodes = new HashMap<>();
+    MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm1.getNodeId(), nm1);
+    MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm2.getNodeId(), nm2);
+    MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm3.getNodeId(), nm3);
+    MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm4.getNodeId(), nm4);
+    MockNM nm5 = new MockNM("h5:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm5.getNodeId(), nm5);
+    nm1.registerNode();
+    nm2.registerNode();
+    nm3.registerNode();
+    nm4.registerNode();
+    nm5.registerNode();
+
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    // Containers with allocationTag 'foo' are restricted to 1 per NODE
+    Map<Set<String>, PlacementConstraint> pcMap = new HashMap<>();
+    pcMap.put(Collections.singleton("foo"),
+        PlacementConstraints.build(
+            PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))));
+    pcMap.put(Collections.singleton("bar"),
+        PlacementConstraints.build(
+            PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))));
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, pcMap);
+    am1.addSchedulingRequest(
+        Arrays.asList(schedulingRequest(1, 1, 1, 512, "bar"),
+            schedulingRequest(1, 2, 1, 512, "foo"),
+            schedulingRequest(1, 3, 1, 512, "foo"),
+            schedulingRequest(1, 4, 1, 512, "foo"),
+            schedulingRequest(1, 5, 1, 512, "foo")));
+    AllocateResponse allocResponse = am1.schedule(); // send the request
+    List<Container> allocatedContainers = new ArrayList<>();
+    allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+
+    // kick the scheduler
+    waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 5);
+
+    Assert.assertEquals(5, allocatedContainers.size());
+    Set<NodeId> nodeIds = allocatedContainers.stream().map(x -> x.getNodeId())
+        .collect(Collectors.toSet());
+    // Ensure unique nodes (antiaffinity)
+    Assert.assertEquals(5, nodeIds.size());
+  }
+
+  @Test(timeout = 300000)
   public void testCardinalityPlacement() throws Exception {
     HashMap<NodeId, MockNM> nodes = new HashMap<>();
     MockNM nm1 = new MockNM("h1:1234", 8192, rm.getResourceTrackerService());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org