You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/02/02 18:29:08 UTC

hadoop git commit: YARN-7839. Modify PlacementAlgorithm to Check node capacity before placing request on node. (Panagiotis Garefalakis via asuresh)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 460d77bd6 -> 6e5ba9366


YARN-7839. Modify PlacementAlgorithm to Check node capacity before placing request on node. (Panagiotis Garefalakis via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6e5ba936
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6e5ba936
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6e5ba936

Branch: refs/heads/trunk
Commit: 6e5ba9366fc05719906ff2789b1a0fd26001182b
Parents: 460d77b
Author: Arun Suresh <as...@apache.org>
Authored: Fri Feb 2 10:28:22 2018 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Fri Feb 2 10:28:22 2018 -0800

----------------------------------------------------------------------
 .../scheduler/capacity/CapacityScheduler.java   |  4 -
 .../algorithm/DefaultPlacementAlgorithm.java    | 61 ++++++++++----
 .../api/ConstraintPlacementAlgorithmOutput.java |  5 +-
 .../SchedulingRequestWithPlacementAttempt.java  | 52 ++++++++++++
 .../constraint/processor/BatchedRequests.java   |  2 +-
 .../processor/PlacementDispatcher.java          | 12 +--
 .../processor/PlacementProcessor.java           | 28 +++++--
 .../constraint/TestPlacementProcessor.java      | 87 +++++++++++++++++++-
 8 files changed, 215 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e5ba936/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index cb01351..d3aa5cb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -2609,10 +2609,6 @@ public class CapacityScheduler extends
             " but only 1 will be attempted !!");
       }
       if (!appAttempt.isStopped()) {
-        Resource resource =
-            schedulingRequest.getResourceSizing().getResources();
-        schedulingRequest.getResourceSizing().setResources(
-            getNormalizedResource(resource));
         ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode>
             resourceCommitRequest = createResourceCommitRequest(
             appAttempt, schedulingRequest, schedulerNode);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e5ba936/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
index 4e6473f..710e6c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
@@ -18,10 +18,15 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceSizing;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -35,8 +40,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.Co
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.BatchedRequests;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.NodeCandidateSelector;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,25 +64,31 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
   private LocalAllocationTagsManager tagsManager;
   private PlacementConstraintManager constraintManager;
   private NodeCandidateSelector nodeSelector;
+  private ResourceCalculator resourceCalculator;
 
   @Override
   public void init(RMContext rmContext) {
     this.tagsManager = new LocalAllocationTagsManager(
         rmContext.getAllocationTagsManager());
     this.constraintManager = rmContext.getPlacementConstraintManager();
+    this.resourceCalculator = rmContext.getScheduler().getResourceCalculator();
     this.nodeSelector =
         filter -> ((AbstractYarnScheduler) (rmContext).getScheduler())
             .getNodes(filter);
   }
 
-  public boolean attemptPlacementOnNode(ApplicationId appId,
-      SchedulingRequest schedulingRequest, SchedulerNode schedulerNode)
+  boolean attemptPlacementOnNode(ApplicationId appId,
+      Resource availableResources, SchedulingRequest schedulingRequest,
+      SchedulerNode schedulerNode, boolean ignoreResourceCheck)
       throws InvalidAllocationTagsQueryException {
-    if (PlacementConstraintsUtil.canSatisfyConstraints(appId,
-        schedulingRequest, schedulerNode, constraintManager, tagsManager)) {
-      return true;
-    }
-    return false;
+    boolean fitsInNode = ignoreResourceCheck ||
+        Resources.fitsIn(resourceCalculator,
+            schedulingRequest.getResourceSizing().getResources(),
+            availableResources);
+    boolean constraintsSatisfied =
+        PlacementConstraintsUtil.canSatisfyConstraints(appId,
+        schedulingRequest, schedulerNode, constraintManager, tagsManager);
+    return fitsInNode && constraintsSatisfied;
   }
 
 
@@ -82,17 +96,19 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
   public void place(ConstraintPlacementAlgorithmInput input,
       ConstraintPlacementAlgorithmOutputCollector collector) {
     BatchedRequests requests = (BatchedRequests) input;
+    int placementAttempt = requests.getPlacementAttempt();
     ConstraintPlacementAlgorithmOutput resp =
         new ConstraintPlacementAlgorithmOutput(requests.getApplicationId());
     List<SchedulerNode> allNodes = nodeSelector.selectNodes(null);
 
     List<SchedulingRequest> rejectedRequests = new ArrayList<>();
+    Map<NodeId, Resource> availResources = new HashMap<>();
     int rePlacementCount = RE_ATTEMPT_COUNT;
     while (rePlacementCount > 0) {
-      doPlacement(requests, resp, allNodes, rejectedRequests);
+      doPlacement(requests, resp, allNodes, rejectedRequests, availResources);
       // Double check if placement constraints are really satisfied
       validatePlacement(requests.getApplicationId(), resp,
-          rejectedRequests);
+          rejectedRequests, availResources);
       if (rejectedRequests.size() == 0 || rePlacementCount == 1) {
         break;
       }
@@ -103,7 +119,10 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
       rePlacementCount--;
     }
 
-    resp.getRejectedRequests().addAll(rejectedRequests);
+    resp.getRejectedRequests().addAll(
+        rejectedRequests.stream().map(
+            x -> new SchedulingRequestWithPlacementAttempt(
+                placementAttempt, x)).collect(Collectors.toList()));
     collector.collect(resp);
     // Clean current temp-container tags
     this.tagsManager.cleanTempContainers(requests.getApplicationId());
@@ -112,7 +131,8 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
   private void doPlacement(BatchedRequests requests,
       ConstraintPlacementAlgorithmOutput resp,
       List<SchedulerNode> allNodes,
-      List<SchedulingRequest> rejectedRequests) {
+      List<SchedulingRequest> rejectedRequests,
+      Map<NodeId, Resource> availableResources) {
     Iterator<SchedulingRequest> requestIterator = requests.iterator();
     Iterator<SchedulerNode> nIter = allNodes.iterator();
     SchedulerNode lastSatisfiedNode = null;
@@ -135,11 +155,17 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
         try {
           String tag = schedulingRequest.getAllocationTags() == null ? "" :
               schedulingRequest.getAllocationTags().iterator().next();
+          Resource unallocatedResource =
+              availableResources.computeIfAbsent(node.getNodeID(),
+                  x -> Resource.newInstance(node.getUnallocatedResource()));
           if (!requests.getBlacklist(tag).contains(node.getNodeID()) &&
               attemptPlacementOnNode(
-                  requests.getApplicationId(), schedulingRequest, node)) {
+                  requests.getApplicationId(), unallocatedResource,
+                  schedulingRequest, node, false)) {
             schedulingRequest.getResourceSizing()
                 .setNumAllocations(--numAllocs);
+            Resources.addTo(unallocatedResource,
+                schedulingRequest.getResourceSizing().getResources());
             placedReq.getNodes().add(node);
             numAllocs =
                 schedulingRequest.getResourceSizing().getNumAllocations();
@@ -200,10 +226,12 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
    * @param applicationId
    * @param resp
    * @param rejectedRequests
+   * @param availableResources
    */
   private void validatePlacement(ApplicationId applicationId,
       ConstraintPlacementAlgorithmOutput resp,
-      List<SchedulingRequest> rejectedRequests) {
+      List<SchedulingRequest> rejectedRequests,
+      Map<NodeId, Resource> availableResources) {
     Iterator<PlacedSchedulingRequest> pReqIter =
         resp.getPlacedRequests().iterator();
     while (pReqIter.hasNext()) {
@@ -217,10 +245,13 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
           // Remove just the tags for this placement.
           this.tagsManager.removeTempTags(node.getNodeID(),
               applicationId, pReq.getSchedulingRequest().getAllocationTags());
-          if (!attemptPlacementOnNode(
-              applicationId, pReq.getSchedulingRequest(), node)) {
+          Resource availOnNode = availableResources.get(node.getNodeID());
+          if (!attemptPlacementOnNode(applicationId, availOnNode,
+              pReq.getSchedulingRequest(), node, true)) {
             nodeIter.remove();
             num++;
+            Resources.subtractFrom(availOnNode,
+                pReq.getSchedulingRequest().getResourceSizing().getResources());
           } else {
             // Add back the tags if everything is fine.
             this.tagsManager.addTempTags(node.getNodeID(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e5ba936/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithmOutput.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithmOutput.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithmOutput.java
index 9571f0e..952bfbf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithmOutput.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithmOutput.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -41,14 +40,14 @@ public class ConstraintPlacementAlgorithmOutput {
   private final List<PlacedSchedulingRequest> placedRequests =
       new ArrayList<>();
 
-  private final List<SchedulingRequest> rejectedRequests =
+  private final List<SchedulingRequestWithPlacementAttempt> rejectedRequests =
       new ArrayList<>();
 
   public List<PlacedSchedulingRequest> getPlacedRequests() {
     return placedRequests;
   }
 
-  public List<SchedulingRequest> getRejectedRequests() {
+  public List<SchedulingRequestWithPlacementAttempt> getRejectedRequests() {
     return rejectedRequests;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e5ba936/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/SchedulingRequestWithPlacementAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/SchedulingRequestWithPlacementAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/SchedulingRequestWithPlacementAttempt.java
new file mode 100644
index 0000000..e14d235
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/SchedulingRequestWithPlacementAttempt.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api;
+
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+
+/**
+ * Simple holder class encapsulating a SchedulingRequest
+ * with a placement attempt.
+ */
+public class SchedulingRequestWithPlacementAttempt {
+
+  private final int placementAttempt;
+  private final SchedulingRequest schedulingRequest;
+
+  public SchedulingRequestWithPlacementAttempt(int placementAttempt,
+      SchedulingRequest schedulingRequest) {
+    this.placementAttempt = placementAttempt;
+    this.schedulingRequest = schedulingRequest;
+  }
+
+  public int getPlacementAttempt() {
+    return placementAttempt;
+  }
+
+  public SchedulingRequest getSchedulingRequest() {
+    return schedulingRequest;
+  }
+
+  @Override
+  public String toString() {
+    return "SchedulingRequestWithPlacementAttempt{" +
+        "placementAttempt=" + placementAttempt +
+        ", schedulingRequest=" + schedulingRequest +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e5ba936/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java
index 8e39b63..6badfee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java
@@ -109,7 +109,7 @@ public class BatchedRequests
   }
 
   public void addToBlacklist(Set<String> tags, SchedulerNode node) {
-    if (tags != null && !tags.isEmpty()) {
+    if (tags != null && !tags.isEmpty() && node != null) {
       // We are currently assuming a single allocation tag
       // per scheduler request currently.
       blacklist.computeIfAbsent(tags.iterator().next(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e5ba936/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java
index 6a00ba8..849eb21 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java
@@ -18,12 +18,12 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +49,7 @@ class PlacementDispatcher implements
 
   private Map<ApplicationId, List<PlacedSchedulingRequest>>
       placedRequests = new ConcurrentHashMap<>();
-  private Map<ApplicationId, List<SchedulingRequest>>
+  private Map<ApplicationId, List<SchedulingRequestWithPlacementAttempt>>
       rejectedRequests = new ConcurrentHashMap<>();
 
   public void init(RMContext rmContext,
@@ -90,12 +90,12 @@ class PlacementDispatcher implements
     return Collections.EMPTY_LIST;
   }
 
-  public List<SchedulingRequest> pullRejectedRequests(
+  public List<SchedulingRequestWithPlacementAttempt> pullRejectedRequests(
       ApplicationId applicationId) {
-    List<SchedulingRequest> rejectedReqs =
+    List<SchedulingRequestWithPlacementAttempt> rejectedReqs =
         this.rejectedRequests.get(applicationId);
     if (rejectedReqs != null && !rejectedReqs.isEmpty()) {
-      List<SchedulingRequest> retList = new ArrayList<>();
+      List<SchedulingRequestWithPlacementAttempt> retList = new ArrayList<>();
       synchronized (rejectedReqs) {
         if (rejectedReqs.size() > 0) {
           retList.addAll(rejectedReqs);
@@ -130,7 +130,7 @@ class PlacementDispatcher implements
       }
     }
     if (!placement.getRejectedRequests().isEmpty()) {
-      List<SchedulingRequest> rejected =
+      List<SchedulingRequestWithPlacementAttempt> rejected =
           rejectedRequests.computeIfAbsent(
               placement.getApplicationId(), k -> new ArrayList());
       LOG.warn(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e5ba936/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
index 2a6b889..9ce38f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
 import org.apache.hadoop.yarn.api.records.RejectionReason;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceSizing;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.Placem
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@@ -208,6 +210,12 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
   private void dispatchRequestsForPlacement(ApplicationAttemptId appAttemptId,
       List<SchedulingRequest> schedulingRequests) {
     if (schedulingRequests != null && !schedulingRequests.isEmpty()) {
+      // Normalize the Requests before dispatching
+      schedulingRequests.forEach(req -> {
+        Resource reqResource = req.getResourceSizing().getResources();
+        req.getResourceSizing()
+            .setResources(this.scheduler.getNormalizedResource(reqResource));
+      });
       this.placementDispatcher.dispatch(new BatchedRequests(iteratorType,
           appAttemptId.getApplicationId(), schedulingRequests, 1));
     }
@@ -261,20 +269,28 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
 
   private void handleRejectedRequests(ApplicationAttemptId appAttemptId,
       AllocateResponse response) {
-    List<SchedulingRequest> rejectedRequests =
+    List<SchedulingRequestWithPlacementAttempt> rejectedAlgoRequests =
         this.placementDispatcher.pullRejectedRequests(
             appAttemptId.getApplicationId());
-    if (rejectedRequests != null && !rejectedRequests.isEmpty()) {
+    if (rejectedAlgoRequests != null && !rejectedAlgoRequests.isEmpty()) {
       LOG.warn("Following requests of [{}] were rejected by" +
               " the PlacementAlgorithmOutput Algorithm: {}",
-          appAttemptId.getApplicationId(), rejectedRequests);
+          appAttemptId.getApplicationId(), rejectedAlgoRequests);
+      rejectedAlgoRequests.stream()
+          .filter(req -> req.getPlacementAttempt() < retryAttempts)
+          .forEach(req -> handleSchedulingResponse(
+              new Response(false, appAttemptId.getApplicationId(),
+                  req.getSchedulingRequest(), req.getPlacementAttempt(),
+                  null)));
       ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response,
-          rejectedRequests.stream()
+          rejectedAlgoRequests.stream()
+              .filter(req -> req.getPlacementAttempt() >= retryAttempts)
               .map(sr -> RejectedSchedulingRequest.newInstance(
-                  RejectionReason.COULD_NOT_PLACE_ON_NODE, sr))
+                  RejectionReason.COULD_NOT_PLACE_ON_NODE,
+                  sr.getSchedulingRequest()))
               .collect(Collectors.toList()));
     }
-    rejectedRequests =
+    List<SchedulingRequest> rejectedRequests =
         this.requestsToReject.get(appAttemptId.getApplicationId());
     if (rejectedRequests != null && !rejectedRequests.isEmpty()) {
       synchronized (rejectedRequests) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e5ba936/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
index a530230..c4c0b5d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
@@ -371,6 +371,91 @@ public class TestPlacementProcessor {
 
   @Test(timeout = 300000)
   public void testSchedulerRejection() throws Exception {
+    stopRM();
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[] {"a", "b"});
+    csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 15.0f);
+    csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".b", 85.0f);
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(
+        YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true);
+    startRM(conf);
+
+    HashMap<NodeId, MockNM> nodes = new HashMap<>();
+    MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm1.getNodeId(), nm1);
+    MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm2.getNodeId(), nm2);
+    MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm3.getNodeId(), nm3);
+    MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm4.getNodeId(), nm4);
+    nm1.registerNode();
+    nm2.registerNode();
+    nm3.registerNode();
+    nm4.registerNode();
+
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a");
+    // Containers with allocationTag 'foo' are restricted to 1 per NODE
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
+        Collections.singletonMap(
+            Collections.singleton("foo"),
+            PlacementConstraints.build(
+                PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))
+        ));
+    am1.addSchedulingRequest(
+        Arrays.asList(
+            schedulingRequest(1, 1, 1, 512, "foo"),
+            schedulingRequest(1, 2, 1, 512, "foo"),
+            schedulingRequest(1, 3, 1, 512, "foo"),
+            // Ask for a container larger than the node
+            schedulingRequest(1, 4, 1, 512, "foo"))
+    );
+    AllocateResponse allocResponse = am1.schedule(); // send the request
+    List<Container> allocatedContainers = new ArrayList<>();
+    List<RejectedSchedulingRequest> rejectedReqs = new ArrayList<>();
+    int allocCount = 1;
+    allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+    rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
+
+    // kick the scheduler
+    while (allocCount < 11) {
+      nm1.nodeHeartbeat(true);
+      nm2.nodeHeartbeat(true);
+      nm3.nodeHeartbeat(true);
+      nm4.nodeHeartbeat(true);
+      LOG.info("Waiting for containers to be created for app 1...");
+      sleep(1000);
+      allocResponse = am1.schedule();
+      allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+      rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
+      allocCount++;
+      if (rejectedReqs.size() > 0 && allocatedContainers.size() > 2) {
+        break;
+      }
+    }
+
+    Assert.assertEquals(3, allocatedContainers.size());
+    Set<NodeId> nodeIds = allocatedContainers.stream()
+        .map(x -> x.getNodeId()).collect(Collectors.toSet());
+    // Ensure unique nodes
+    Assert.assertEquals(3, nodeIds.size());
+    RejectedSchedulingRequest rej = rejectedReqs.get(0);
+    Assert.assertEquals(4, rej.getRequest().getAllocationRequestId());
+    Assert.assertEquals(RejectionReason.COULD_NOT_SCHEDULE_ON_NODE,
+        rej.getReason());
+
+    QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
+    // Verify Metrics
+    verifyMetrics(metrics, 12288, 12, 4096, 4, 4);
+  }
+
+  @Test(timeout = 300000)
+  public void testNodeCapacityRejection() throws Exception {
     HashMap<NodeId, MockNM> nodes = new HashMap<>();
     MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
     nodes.put(nm1.getNodeId(), nm1);
@@ -432,7 +517,7 @@ public class TestPlacementProcessor {
     Assert.assertEquals(3, nodeIds.size());
     RejectedSchedulingRequest rej = rejectedReqs.get(0);
     Assert.assertEquals(4, rej.getRequest().getAllocationRequestId());
-    Assert.assertEquals(RejectionReason.COULD_NOT_SCHEDULE_ON_NODE,
+    Assert.assertEquals(RejectionReason.COULD_NOT_PLACE_ON_NODE,
         rej.getReason());
 
     QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org