You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/17 21:59:00 UTC

[06/11] hadoop git commit: YARN-7612. Add Processor Framework for Rich Placement Constraints. (asuresh)

YARN-7612. Add Processor Framework for Rich Placement Constraints. (asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e802a6c0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e802a6c0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e802a6c0

Branch: refs/heads/YARN-6592
Commit: e802a6c06cb46462ad84b77e4df663d075c02316
Parents: f650d39
Author: Arun Suresh <as...@apache.org>
Authored: Fri Dec 22 15:51:20 2017 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Wed Jan 17 13:51:28 2018 -0800

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  26 ++
 .../src/main/resources/yarn-default.xml         |  30 ++
 .../ApplicationMasterService.java               |  15 +
 .../rmcontainer/RMContainerImpl.java            |   7 +-
 .../scheduler/capacity/CapacityScheduler.java   |   2 +
 .../constraint/processor/BatchedRequests.java   | 105 +++++
 .../processor/NodeCandidateSelector.java        |  38 ++
 .../processor/PlacementDispatcher.java          | 145 +++++++
 .../processor/PlacementProcessor.java           | 343 ++++++++++++++++
 .../processor/SamplePlacementAlgorithm.java     | 144 +++++++
 .../constraint/processor/package-info.java      |  29 ++
 .../yarn/server/resourcemanager/MockAM.java     |  26 ++
 .../yarn/server/resourcemanager/MockRM.java     |  14 +
 .../constraint/TestPlacementProcessor.java      | 394 +++++++++++++++++++
 14 files changed, 1316 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 1b6bd0e..03c24d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -529,6 +529,32 @@ public class YarnConfiguration extends Configuration {
   /** The class to use as the resource scheduler.*/
   public static final String RM_SCHEDULER = 
     RM_PREFIX + "scheduler.class";
+
+  /** Placement Algorithm. */
+  public static final String RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS =
+      RM_PREFIX + "placement-constraints.algorithm.class";
+
+  public static final String RM_PLACEMENT_CONSTRAINTS_ENABLED =
+      RM_PREFIX + "placement-constraints.enabled";
+
+  public static final boolean DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED = true;
+
+  public static final String RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS =
+      RM_PREFIX + "placement-constraints.retry-attempts";
+
+  public static final int DEFAULT_RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS = 3;
+
+  public static final String RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE =
+      RM_PREFIX + "placement-constraints.algorithm.pool-size";
+
+  public static final int DEFAULT_RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE =
+      1;
+
+  public static final String RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE =
+      RM_PREFIX + "placement-constraints.scheduler.pool-size";
+
+  public static final int DEFAULT_RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE =
+      1;
  
   public static final String DEFAULT_RM_SCHEDULER = 
       "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index d450eca..0285069 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -131,6 +131,36 @@
   </property>
 
   <property>
+    <description>Enable Constraint Placement.</description>
+    <name>yarn.resourcemanager.placement-constraints.enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>Number of times to retry placing of rejected SchedulingRequests</description>
+    <name>yarn.resourcemanager.placement-constraints.retry-attempts</name>
+    <value>3</value>
+  </property>
+
+  <property>
+    <description>Constraint Placement Algorithm to be used.</description>
+    <name>yarn.resourcemanager.placement-constraints.algorithm.class</name>
+    <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.SamplePlacementAlgorithm</value>
+  </property>
+
+  <property>
+    <description>Threadpool size for the Algorithm used for placement constraint processing.</description>
+    <name>yarn.resourcemanager.placement-constraints.algorithm.pool-size</name>
+    <value>1</value>
+  </property>
+
+  <property>
+    <description>Threadpool size for the Scheduler invocation phase of placement constraint processing.</description>
+    <name>yarn.resourcemanager.placement-constraints.scheduler.pool-size</name>
+    <value>1</value>
+  </property>
+
+  <property>
     <description>
       Comma separated class names of ApplicationMasterServiceProcessor
       implementations. The processors will be applied in the order

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 90c42be..aa1177d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.PlacementProcessor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
@@ -114,11 +115,25 @@ public class ApplicationMasterService extends AbstractService implements
         YarnConfiguration.RM_SCHEDULER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+    initializeProcessingChain(conf);
+  }
+
+  private void initializeProcessingChain(Configuration conf) {
     amsProcessingChain.init(rmContext, null);
+    boolean enablePlacementConstraints = conf.getBoolean(
+        YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED,
+        YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED);
+    if (enablePlacementConstraints) {
+      amsProcessingChain.addProcessor(new PlacementProcessor());
+    }
     List<ApplicationMasterServiceProcessor> processors = getProcessorList(conf);
     if (processors != null) {
       Collections.reverse(processors);
       for (ApplicationMasterServiceProcessor p : processors) {
+        // Ensure only single instance of PlacementProcessor is included
+        if (enablePlacementConstraints && p instanceof PlacementProcessor) {
+          continue;
+        }
         this.amsProcessingChain.addProcessor(p);
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 184cdfc..c873509 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -190,8 +190,7 @@ public class RMContainerImpl implements RMContainer {
   private boolean isExternallyAllocated;
   private SchedulerRequestKey allocatedSchedulerKey;
 
-  // TODO, set it when container allocated by scheduler (From SchedulingRequest)
-  private Set<String> allocationTags = null;
+  private volatile Set<String> allocationTags = null;
 
   public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey,
       ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
@@ -510,6 +509,10 @@ public class RMContainerImpl implements RMContainer {
     return allocationTags;
   }
 
+  public void setAllocationTags(Set<String> tags) {
+    this.allocationTags = tags;
+  }
+
   private static class BaseTransition implements
       SingleArcTransition<RMContainerImpl, RMContainerEvent> {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index d92ce58..f03d7d1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -2539,6 +2539,8 @@ public class CapacityScheduler extends
           SchedulerRequestKey.extractFrom(container),
           appAttempt.getApplicationAttemptId(), container.getNodeId(),
           appAttempt.getUser(), rmContext, false);
+      ((RMContainerImpl)rmContainer).setAllocationTags(
+          new HashSet<>(schedulingRequest.getAllocationTags()));
 
       allocated = new ContainerAllocationProposal<>(
           getSchedulerContainer(rmContainer, true),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java
new file mode 100644
index 0000000..fe92d2f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A grouping of Scheduling Requests which are sent to the PlacementAlgorithm
+ * to place as a batch. The placement algorithm tends to give more optimal
+ * placements if more requests are batched together.
+ */
+class BatchedRequests implements ConstraintPlacementAlgorithmInput {
+
+  // PlacementAlgorithmOutput attempt - the number of times the requests in this
+  // batch has been placed but was rejected by the scheduler.
+  private final int placementAttempt;
+
+  private final ApplicationId applicationId;
+  private final Collection<SchedulingRequest> requests;
+  private final Map<String, Set<NodeId>> blacklist = new HashMap<>();
+
+  BatchedRequests(ApplicationId applicationId,
+      Collection<SchedulingRequest> requests, int attempt) {
+    this.applicationId = applicationId;
+    this.requests = requests;
+    this.placementAttempt = attempt;
+  }
+
+  /**
+   * Get Application Id.
+   * @return Application Id.
+   */
+  ApplicationId getApplicationId() {
+    return applicationId;
+  }
+
+  /**
+   * Get Collection of SchedulingRequests in this batch.
+   * @return Collection of Scheduling Requests.
+   */
+  @Override
+  public Collection<SchedulingRequest> getSchedulingRequests() {
+    return requests;
+  }
+
+  /**
+   * Add a Scheduling request to the batch.
+   * @param req Scheduling Request.
+   */
+  void addToBatch(SchedulingRequest req) {
+    requests.add(req);
+  }
+
+  void addToBlacklist(Set<String> tags, SchedulerNode node) {
+    if (tags != null && !tags.isEmpty()) {
+      // We are currently assuming a single allocation tag
+      // per scheduler request currently.
+      blacklist.computeIfAbsent(tags.iterator().next(),
+          k -> new HashSet<>()).add(node.getNodeID());
+    }
+  }
+
+  /**
+   * Get placement attempt.
+   * @return PlacementAlgorithmOutput placement Attempt.
+   */
+  int getPlacementAttempt() {
+    return placementAttempt;
+  }
+
+  /**
+   * Get any blacklisted nodes associated with tag.
+   * @param tag Tag.
+   * @return Set of blacklisted Nodes.
+   */
+  Set<NodeId> getBlacklist(String tag) {
+    return blacklist.getOrDefault(tag, Collections.EMPTY_SET);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/NodeCandidateSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/NodeCandidateSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/NodeCandidateSelector.java
new file mode 100644
index 0000000..4299050
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/NodeCandidateSelector.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeFilter;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+import java.util.List;
+
+/**
+ * A read only implementation of the ClusterNodeTracker which exposes a method
+ * to simply return a filtered list of nodes.
+ */
+public interface NodeCandidateSelector {
+
+  /**
+   * Select a list of nodes given a filter.
+   * @param filter a NodeFilter.
+   * @return List of SchedulerNodes.
+   */
+  List<SchedulerNode> selectNodes(NodeFilter filter);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java
new file mode 100644
index 0000000..6a00ba8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * This class initializes the Constraint Placement Algorithm. It dispatches
+ * input to the algorithm and collects output from it.
+ */
+class PlacementDispatcher implements
+    ConstraintPlacementAlgorithmOutputCollector {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PlacementDispatcher.class);
+  private ConstraintPlacementAlgorithm algorithm;
+  private ExecutorService algorithmThreadPool;
+
+  private Map<ApplicationId, List<PlacedSchedulingRequest>>
+      placedRequests = new ConcurrentHashMap<>();
+  private Map<ApplicationId, List<SchedulingRequest>>
+      rejectedRequests = new ConcurrentHashMap<>();
+
+  public void init(RMContext rmContext,
+      ConstraintPlacementAlgorithm placementAlgorithm, int poolSize) {
+    LOG.info("Initializing Constraint Placement Planner:");
+    this.algorithm = placementAlgorithm;
+    this.algorithm.init(rmContext);
+    this.algorithmThreadPool = Executors.newFixedThreadPool(poolSize);
+  }
+
+  void dispatch(final BatchedRequests batchedRequests) {
+    final ConstraintPlacementAlgorithmOutputCollector collector = this;
+    Runnable placingTask = () -> {
+      LOG.debug("Got [{}] requests to place from application [{}].. " +
+              "Attempt count [{}]",
+          batchedRequests.getSchedulingRequests().size(),
+          batchedRequests.getApplicationId(),
+          batchedRequests.getPlacementAttempt());
+      algorithm.place(batchedRequests, collector);
+    };
+    this.algorithmThreadPool.submit(placingTask);
+  }
+
+  public List<PlacedSchedulingRequest> pullPlacedRequests(
+      ApplicationId applicationId) {
+    List<PlacedSchedulingRequest> placedReqs =
+        this.placedRequests.get(applicationId);
+    if (placedReqs != null && !placedReqs.isEmpty()) {
+      List<PlacedSchedulingRequest> retList = new ArrayList<>();
+      synchronized (placedReqs) {
+        if (placedReqs.size() > 0) {
+          retList.addAll(placedReqs);
+          placedReqs.clear();
+        }
+      }
+      return retList;
+    }
+    return Collections.EMPTY_LIST;
+  }
+
+  public List<SchedulingRequest> pullRejectedRequests(
+      ApplicationId applicationId) {
+    List<SchedulingRequest> rejectedReqs =
+        this.rejectedRequests.get(applicationId);
+    if (rejectedReqs != null && !rejectedReqs.isEmpty()) {
+      List<SchedulingRequest> retList = new ArrayList<>();
+      synchronized (rejectedReqs) {
+        if (rejectedReqs.size() > 0) {
+          retList.addAll(rejectedReqs);
+          rejectedReqs.clear();
+        }
+      }
+      return retList;
+    }
+    return Collections.EMPTY_LIST;
+  }
+
+  void clearApplicationState(ApplicationId applicationId) {
+    placedRequests.remove(applicationId);
+    rejectedRequests.remove(applicationId);
+  }
+
+  @Override
+  public void collect(ConstraintPlacementAlgorithmOutput placement) {
+    if (!placement.getPlacedRequests().isEmpty()) {
+      List<PlacedSchedulingRequest> processed =
+          placedRequests.computeIfAbsent(
+              placement.getApplicationId(), k -> new ArrayList<>());
+      synchronized (processed) {
+        LOG.debug(
+            "Planning Algorithm has placed for application [{}]" +
+                " the following [{}]", placement.getApplicationId(),
+            placement.getPlacedRequests());
+        for (PlacedSchedulingRequest esr :
+            placement.getPlacedRequests()) {
+          processed.add(esr);
+        }
+      }
+    }
+    if (!placement.getRejectedRequests().isEmpty()) {
+      List<SchedulingRequest> rejected =
+          rejectedRequests.computeIfAbsent(
+              placement.getApplicationId(), k -> new ArrayList());
+      LOG.warn(
+          "Planning Algorithm has rejected for application [{}]" +
+              " the following [{}]", placement.getApplicationId(),
+          placement.getRejectedRequests());
+      synchronized (rejected) {
+        rejected.addAll(placement.getRejectedRequests());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
new file mode 100644
index 0000000..d613d4e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
+import org.apache.hadoop.yarn.api.records.RejectionReason;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+/**
+ * An ApplicationMasterService Processor that performs Constrained placement of
+ * Scheduling Requests. It does the following:
+ * 1. All initialization.
+ * 2. Intercepts placement constraints from the register call and adds it to
+ *    the placement constraint manager.
+ * 3. Dispatches Scheduling Requests to the Planner.
+ */
+public class PlacementProcessor implements ApplicationMasterServiceProcessor {
+
+  /**
+   * Wrapper over the SchedulingResponse that wires in the placement attempt
+   * and last attempted Node.
+   */
+  static final class Response extends SchedulingResponse {
+
+    private final int placementAttempt;
+    private final SchedulerNode attemptedNode;
+
+    private Response(boolean isSuccess, ApplicationId applicationId,
+        SchedulingRequest schedulingRequest, int placementAttempt,
+        SchedulerNode attemptedNode) {
+      super(isSuccess, applicationId, schedulingRequest);
+      this.placementAttempt = placementAttempt;
+      this.attemptedNode = attemptedNode;
+    }
+  }
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PlacementProcessor.class);
+  private PlacementConstraintManager constraintManager;
+  private ApplicationMasterServiceProcessor nextAMSProcessor;
+
+  private AbstractYarnScheduler scheduler;
+  private ExecutorService schedulingThreadPool;
+  private int retryAttempts;
+  private Map<ApplicationId, List<BatchedRequests>> requestsToRetry =
+      new ConcurrentHashMap<>();
+  private Map<ApplicationId, List<SchedulingRequest>> requestsToReject =
+      new ConcurrentHashMap<>();
+
+  private PlacementDispatcher placementDispatcher;
+
+
+  @Override
+  public void init(ApplicationMasterServiceContext amsContext,
+      ApplicationMasterServiceProcessor nextProcessor) {
+    LOG.info("Initializing Constraint Placement Processor:");
+    this.nextAMSProcessor = nextProcessor;
+    this.constraintManager =
+        ((RMContextImpl)amsContext).getPlacementConstraintManager();
+
+    this.scheduler =
+        (AbstractYarnScheduler)((RMContextImpl)amsContext).getScheduler();
+    // Only the first class is considered - even if a comma separated
+    // list is provided. (This is for simplicity, since getInstances does a
+    // lot of good things by handling things correctly)
+    List<ConstraintPlacementAlgorithm> instances =
+        ((RMContextImpl) amsContext).getYarnConfiguration().getInstances(
+            YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS,
+            ConstraintPlacementAlgorithm.class);
+    ConstraintPlacementAlgorithm algorithm = null;
+    if (instances != null && !instances.isEmpty()) {
+      algorithm = instances.get(0);
+    } else {
+      algorithm = new SamplePlacementAlgorithm();
+    }
+    LOG.info("Planning Algorithm [{}]", algorithm.getClass().getName());
+
+    int algoPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt(
+        YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE,
+        YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE);
+    this.placementDispatcher = new PlacementDispatcher();
+    this.placementDispatcher.init(
+        ((RMContextImpl)amsContext), algorithm, algoPSize);
+    LOG.info("Planning Algorithm pool size [{}]", algoPSize);
+
+    int schedPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt(
+        YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE,
+        YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE);
+    this.schedulingThreadPool = Executors.newFixedThreadPool(schedPSize);
+    LOG.info("Scheduler pool size [{}]", schedPSize);
+
+    // Number of times a request that is not satisfied by the scheduler
+    // can be retried.
+    this.retryAttempts =
+        ((RMContextImpl) amsContext).getYarnConfiguration().getInt(
+            YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS,
+            YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS);
+    LOG.info("Num retry attempts [{}]", this.retryAttempts);
+  }
+
+  @Override
+  public void registerApplicationMaster(ApplicationAttemptId appAttemptId,
+      RegisterApplicationMasterRequest request,
+      RegisterApplicationMasterResponse response)
+      throws IOException, YarnException {
+    Map<Set<String>, PlacementConstraint> appPlacementConstraints =
+        request.getPlacementConstraints();
+    processPlacementConstraints(
+        appAttemptId.getApplicationId(), appPlacementConstraints);
+    nextAMSProcessor.registerApplicationMaster(appAttemptId, request, response);
+  }
+
+  private void processPlacementConstraints(ApplicationId applicationId,
+      Map<Set<String>, PlacementConstraint> appPlacementConstraints) {
+    if (appPlacementConstraints != null && !appPlacementConstraints.isEmpty()) {
+      LOG.info("Constraints added for application [{}] against tags [{}]",
+          applicationId, appPlacementConstraints);
+      constraintManager.registerApplication(
+          applicationId, appPlacementConstraints);
+    }
+  }
+
+  @Override
+  public void allocate(ApplicationAttemptId appAttemptId,
+      AllocateRequest request, AllocateResponse response) throws YarnException {
+    List<SchedulingRequest> schedulingRequests =
+        request.getSchedulingRequests();
+    dispatchRequestsForPlacement(appAttemptId, schedulingRequests);
+    reDispatchRetryableRequests(appAttemptId);
+    schedulePlacedRequests(appAttemptId);
+
+    nextAMSProcessor.allocate(appAttemptId, request, response);
+
+    handleRejectedRequests(appAttemptId, response);
+  }
+
+  private void dispatchRequestsForPlacement(ApplicationAttemptId appAttemptId,
+      List<SchedulingRequest> schedulingRequests) {
+    if (schedulingRequests != null && !schedulingRequests.isEmpty()) {
+      this.placementDispatcher.dispatch(
+          new BatchedRequests(appAttemptId.getApplicationId(),
+              schedulingRequests, 1));
+    }
+  }
+
+  private void reDispatchRetryableRequests(ApplicationAttemptId appAttId) {
+    List<BatchedRequests> reqsToRetry =
+        this.requestsToRetry.get(appAttId.getApplicationId());
+    if (reqsToRetry != null && !reqsToRetry.isEmpty()) {
+      synchronized (reqsToRetry) {
+        for (BatchedRequests bReq: reqsToRetry) {
+          this.placementDispatcher.dispatch(bReq);
+        }
+        reqsToRetry.clear();
+      }
+    }
+  }
+
+  private void schedulePlacedRequests(ApplicationAttemptId appAttemptId) {
+    ApplicationId applicationId = appAttemptId.getApplicationId();
+    List<PlacedSchedulingRequest> placedSchedulingRequests =
+        this.placementDispatcher.pullPlacedRequests(applicationId);
+    for (PlacedSchedulingRequest placedReq : placedSchedulingRequests) {
+      SchedulingRequest sReq = placedReq.getSchedulingRequest();
+      for (SchedulerNode node : placedReq.getNodes()) {
+        final SchedulingRequest sReqClone =
+            SchedulingRequest.newInstance(sReq.getAllocationRequestId(),
+                sReq.getPriority(), sReq.getExecutionType(),
+                sReq.getAllocationTags(),
+                ResourceSizing.newInstance(
+                    sReq.getResourceSizing().getResources()),
+                sReq.getPlacementConstraint());
+        SchedulerApplicationAttempt applicationAttempt =
+            this.scheduler.getApplicationAttempt(appAttemptId);
+        Runnable task = () -> {
+          boolean success =
+              scheduler.attemptAllocationOnNode(
+                  applicationAttempt, sReqClone, node);
+          if (!success) {
+            LOG.warn("Unsuccessful allocation attempt [{}] for [{}]",
+                placedReq.getPlacementAttempt(), sReqClone);
+          }
+          handleSchedulingResponse(
+              new Response(success, applicationId, sReqClone,
+              placedReq.getPlacementAttempt(), node));
+        };
+        this.schedulingThreadPool.submit(task);
+      }
+    }
+  }
+
+  private void handleRejectedRequests(ApplicationAttemptId appAttemptId,
+      AllocateResponse response) {
+    List<SchedulingRequest> rejectedRequests =
+        this.placementDispatcher.pullRejectedRequests(
+            appAttemptId.getApplicationId());
+    if (rejectedRequests != null && !rejectedRequests.isEmpty()) {
+      LOG.warn("Following requests of [{}] were rejected by" +
+              " the PlacementAlgorithmOutput Algorithm: {}",
+          appAttemptId.getApplicationId(), rejectedRequests);
+      ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response,
+          rejectedRequests.stream()
+              .map(sr -> RejectedSchedulingRequest.newInstance(
+                  RejectionReason.COULD_NOT_PLACE_ON_NODE, sr))
+              .collect(Collectors.toList()));
+    }
+    rejectedRequests =
+        this.requestsToReject.get(appAttemptId.getApplicationId());
+    if (rejectedRequests != null && !rejectedRequests.isEmpty()) {
+      synchronized (rejectedRequests) {
+        LOG.warn("Following requests of [{}] exhausted all retry attempts " +
+                "trying to schedule on placed node: {}",
+            appAttemptId.getApplicationId(), rejectedRequests);
+        ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response,
+            rejectedRequests.stream()
+                .map(sr -> RejectedSchedulingRequest.newInstance(
+                    RejectionReason.COULD_NOT_SCHEDULE_ON_NODE, sr))
+                .collect(Collectors.toList()));
+        rejectedRequests.clear();
+      }
+    }
+  }
+
+  @Override
+  public void finishApplicationMaster(ApplicationAttemptId appAttemptId,
+      FinishApplicationMasterRequest request,
+      FinishApplicationMasterResponse response) {
+    constraintManager.unregisterApplication(appAttemptId.getApplicationId());
+    placementDispatcher.clearApplicationState(appAttemptId.getApplicationId());
+    requestsToReject.remove(appAttemptId.getApplicationId());
+    requestsToRetry.remove(appAttemptId.getApplicationId());
+    nextAMSProcessor.finishApplicationMaster(appAttemptId, request, response);
+  }
+
+  private void handleSchedulingResponse(SchedulingResponse schedulerResponse) {
+    int placementAttempt = ((Response)schedulerResponse).placementAttempt;
+    // Retry this placement as it is not successful and we are still
+    // under max retry. The req is batched with other unsuccessful
+    // requests from the same app
+    if (!schedulerResponse.isSuccess() && placementAttempt < retryAttempts) {
+      List<BatchedRequests> reqsToRetry =
+          requestsToRetry.computeIfAbsent(
+              schedulerResponse.getApplicationId(),
+              k -> new ArrayList<>());
+      synchronized (reqsToRetry) {
+        addToRetryList(schedulerResponse, placementAttempt, reqsToRetry);
+      }
+      LOG.warn("Going to retry request for application [{}] after [{}]" +
+              " attempts: [{}]", schedulerResponse.getApplicationId(),
+          placementAttempt, schedulerResponse.getSchedulingRequest());
+    } else {
+      if (!schedulerResponse.isSuccess()) {
+        LOG.warn("Not retrying request for application [{}] after [{}]" +
+                " attempts: [{}]", schedulerResponse.getApplicationId(),
+            placementAttempt, schedulerResponse.getSchedulingRequest());
+        List<SchedulingRequest> reqsToReject =
+            requestsToReject.computeIfAbsent(
+                schedulerResponse.getApplicationId(),
+                k -> new ArrayList<>());
+        synchronized (reqsToReject) {
+          reqsToReject.add(schedulerResponse.getSchedulingRequest());
+        }
+      }
+    }
+  }
+
+  private void addToRetryList(SchedulingResponse schedulerResponse,
+      int placementAttempt, List<BatchedRequests> reqsToRetry) {
+    boolean isAdded = false;
+    for (BatchedRequests br : reqsToRetry) {
+      if (br.getPlacementAttempt() == placementAttempt + 1) {
+        br.addToBatch(schedulerResponse.getSchedulingRequest());
+        br.addToBlacklist(
+            schedulerResponse.getSchedulingRequest().getAllocationTags(),
+            ((Response) schedulerResponse).attemptedNode);
+        isAdded = true;
+        break;
+      }
+    }
+    if (!isAdded) {
+      BatchedRequests br =
+          new BatchedRequests(schedulerResponse.getApplicationId(),
+              Collections.singleton(
+                  schedulerResponse.getSchedulingRequest()),
+              placementAttempt + 1);
+      reqsToRetry.add(br);
+      br.addToBlacklist(
+          schedulerResponse.getSchedulingRequest().getAllocationTags(),
+          ((Response) schedulerResponse).attemptedNode);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java
new file mode 100644
index 0000000..8d49801
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SpecializedConstraintTransformer;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Sample Test algorithm. Assumes anti-affinity always
+ * It also assumes the numAllocations in resource sizing is always = 1
+ *
+ * NOTE: This is just a sample implementation. Not be actually used
+ */
+public class SamplePlacementAlgorithm implements ConstraintPlacementAlgorithm {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SamplePlacementAlgorithm.class);
+
+  private AllocationTagsManager tagsManager;
+  private PlacementConstraintManager constraintManager;
+  private NodeCandidateSelector nodeSelector;
+
+  @Override
+  public void init(RMContext rmContext) {
+    this.tagsManager = rmContext.getAllocationTagsManager();
+    this.constraintManager = rmContext.getPlacementConstraintManager();
+    this.nodeSelector =
+        filter -> ((AbstractYarnScheduler)(rmContext)
+            .getScheduler()).getNodes(filter);
+  }
+
+  @Override
+  public void place(ConstraintPlacementAlgorithmInput input,
+      ConstraintPlacementAlgorithmOutputCollector collector) {
+    BatchedRequests requests = (BatchedRequests)input;
+    ConstraintPlacementAlgorithmOutput resp =
+        new ConstraintPlacementAlgorithmOutput(requests.getApplicationId());
+    List<SchedulerNode> allNodes = nodeSelector.selectNodes(null);
+    Map<String, List<SchedulingRequest>> tagIndexedRequests = new HashMap<>();
+    requests.getSchedulingRequests()
+        .stream()
+        .filter(r -> r.getAllocationTags() != null)
+        .forEach(
+            req -> req.getAllocationTags().forEach(
+                tag -> tagIndexedRequests.computeIfAbsent(tag,
+                    k -> new ArrayList<>()).add(req))
+        );
+    for (Map.Entry<String, List<SchedulingRequest>> entry :
+        tagIndexedRequests.entrySet()) {
+      String tag = entry.getKey();
+      PlacementConstraint constraint =
+          constraintManager.getConstraint(requests.getApplicationId(),
+              Collections.singleton(tag));
+      if (constraint != null) {
+        // Currently works only for simple anti-affinity
+        // NODE scope target expressions
+        SpecializedConstraintTransformer transformer =
+            new SpecializedConstraintTransformer(constraint);
+        PlacementConstraint transform = transformer.transform();
+        TargetConstraint targetConstraint =
+            (TargetConstraint) transform.getConstraintExpr();
+        // Assume a single target expression tag;
+        // The Sample Algorithm assumes a constraint will always be a simple
+        // Target Constraint with a single entry in the target set.
+        // As mentioned in the class javadoc - This algorithm should be
+        // used mostly for testing and validating end-2-end workflow.
+        String targetTag =
+            targetConstraint.getTargetExpressions().iterator().next()
+            .getTargetValues().iterator().next();
+        // iterate over all nodes
+        Iterator<SchedulerNode> nodeIter = allNodes.iterator();
+        List<SchedulingRequest> schedulingRequests = entry.getValue();
+        Iterator<SchedulingRequest> reqIter = schedulingRequests.iterator();
+        while (reqIter.hasNext()) {
+          SchedulingRequest sReq = reqIter.next();
+          int numAllocs = sReq.getResourceSizing().getNumAllocations();
+          while (numAllocs > 0 && nodeIter.hasNext()) {
+            SchedulerNode node = nodeIter.next();
+            long nodeCardinality = 0;
+            try {
+              nodeCardinality = tagsManager.getNodeCardinality(
+                  node.getNodeID(), requests.getApplicationId(),
+                  targetTag);
+              if (nodeCardinality == 0 &&
+                  !requests.getBlacklist(tag).contains(node.getNodeID())) {
+                numAllocs--;
+                sReq.getResourceSizing().setNumAllocations(numAllocs);
+                PlacedSchedulingRequest placedReq =
+                    new PlacedSchedulingRequest(sReq);
+                placedReq.setPlacementAttempt(requests.getPlacementAttempt());
+                placedReq.getNodes().add(node);
+                resp.getPlacedRequests().add(placedReq);
+              }
+            } catch (InvalidAllocationTagsQueryException e) {
+              LOG.warn("Got exception from TagManager !", e);
+            }
+          }
+        }
+      }
+    }
+    // Add all requests whose numAllocations still > 0 to rejected list.
+    requests.getSchedulingRequests().stream()
+        .filter(sReq -> sReq.getResourceSizing().getNumAllocations() > 0)
+        .forEach(rejReq -> resp.getRejectedRequests().add(rejReq));
+    collector.collect(resp);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/package-info.java
new file mode 100644
index 0000000..7090154
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package o.a.h.yarn.server.resourcemanager.scheduler.constraint.processor
+ * contains classes related to scheduling containers using placement
+ * processor.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
index 12dfe18..975abe6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
@@ -21,7 +21,10 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -39,7 +42,9 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -57,6 +62,9 @@ public class MockAM {
   private ApplicationMasterProtocol amRMProtocol;
   private UserGroupInformation ugi;
   private volatile AllocateResponse lastResponse;
+  private Map<Set<String>, PlacementConstraint> placementConstraints =
+      new HashMap<>();
+  private List<SchedulingRequest> schedulingRequests = new ArrayList<>();
 
   private final List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
   private final List<ContainerId> releases = new ArrayList<ContainerId>();
@@ -93,6 +101,16 @@ public class MockAM {
     return registerAppAttempt(true);
   }
 
+  public void addPlacementConstraint(Set<String> tags,
+      PlacementConstraint constraint) {
+    placementConstraints.put(tags, constraint);
+  }
+
+  public MockAM addSchedulingRequest(List<SchedulingRequest> reqs) {
+    schedulingRequests.addAll(reqs);
+    return this;
+  }
+
   public RegisterApplicationMasterResponse registerAppAttempt(boolean wait)
       throws Exception {
     if (wait) {
@@ -104,6 +122,9 @@ public class MockAM {
     req.setHost("");
     req.setRpcPort(1);
     req.setTrackingUrl("");
+    if (!placementConstraints.isEmpty()) {
+      req.setPlacementConstraints(this.placementConstraints);
+    }
     if (ugi == null) {
       ugi = UserGroupInformation.createRemoteUser(
           attemptId.toString());
@@ -247,12 +268,17 @@ public class MockAM {
 
   }
 
+
   public AllocateResponse allocate(
       List<ResourceRequest> resourceRequest, List<ContainerId> releases)
       throws Exception {
     final AllocateRequest req =
         AllocateRequest.newInstance(0, 0F, resourceRequest,
           releases, null);
+    if (!schedulingRequests.isEmpty()) {
+      req.setSchedulingRequests(schedulingRequests);
+      schedulingRequests.clear();
+    }
     return allocate(req);
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 302f5b3..f0e4213 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -65,6 +66,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -1240,6 +1242,18 @@ public class MockRM extends ResourceManager {
     return am;
   }
 
+  public static MockAM launchAndRegisterAM(RMApp app, MockRM rm, MockNM nm,
+      Map<Set<String>, PlacementConstraint> constraints) throws Exception {
+    MockAM am = launchAM(app, rm, nm);
+    for (Map.Entry<Set<String>, PlacementConstraint> e :
+        constraints.entrySet()) {
+      am.addPlacementConstraint(e.getKey(), e.getValue());
+    }
+    am.registerAppAttempt();
+    rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+    return am;
+  }
+
   public ApplicationReport getApplicationReport(ApplicationId appId)
       throws YarnException, IOException {
     ApplicationClientProtocol client = getClientRMService();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
new file mode 100644
index 0000000..db8ae15
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
@@ -0,0 +1,394 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
+import org.apache.hadoop.yarn.api.records.RejectionReason;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
+
+/**
+ * This tests end2end workflow of the constraint placement framework.
+ */
+public class TestPlacementProcessor {
+
+  private static final int GB = 1024;
+
+  private static final Log LOG =
+      LogFactory.getLog(TestPlacementProcessor.class);
+  private MockRM rm;
+  private DrainDispatcher dispatcher;
+
+  @Before
+  public void createAndStartRM() {
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(
+        YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true);
+    conf.setInt(
+        YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, 1);
+    startRM(conf);
+  }
+
+  private void startRM(final YarnConfiguration conf) {
+    dispatcher = new DrainDispatcher();
+    rm = new MockRM(conf) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    rm.start();
+  }
+
+  @After
+  public void stopRM() {
+    if (rm != null) {
+      rm.stop();
+    }
+  }
+
+  @Test(timeout = 300000)
+  public void testPlacement() throws Exception {
+    HashMap<NodeId, MockNM> nodes = new HashMap<>();
+    MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm1.getNodeId(), nm1);
+    MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm2.getNodeId(), nm2);
+    MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm3.getNodeId(), nm3);
+    MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm4.getNodeId(), nm4);
+    nm1.registerNode();
+    nm2.registerNode();
+    nm3.registerNode();
+    nm4.registerNode();
+
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
+        Collections.singletonMap(
+            Collections.singleton("foo"),
+            PlacementConstraints.build(
+                PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))
+        ));
+    am1.addSchedulingRequest(
+        Arrays.asList(
+            schedulingRequest(1, 1, 1, 512, "foo"),
+            schedulingRequest(1, 2, 1, 512, "foo"),
+            schedulingRequest(1, 3, 1, 512, "foo"),
+            schedulingRequest(1, 5, 1, 512, "foo"))
+    );
+    AllocateResponse allocResponse = am1.schedule(); // send the request
+    List<Container> allocatedContainers = new ArrayList<>();
+    allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+
+    // kick the scheduler
+
+    while (allocatedContainers.size() < 4) {
+      nm1.nodeHeartbeat(true);
+      nm2.nodeHeartbeat(true);
+      nm3.nodeHeartbeat(true);
+      nm4.nodeHeartbeat(true);
+      LOG.info("Waiting for containers to be created for app 1...");
+      sleep(1000);
+      allocResponse = am1.schedule();
+      allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+    }
+
+    Assert.assertEquals(4, allocatedContainers.size());
+    Set<NodeId> nodeIds = allocatedContainers.stream()
+        .map(x -> x.getNodeId()).collect(Collectors.toSet());
+    // Ensure unique nodes
+    Assert.assertEquals(4, nodeIds.size());
+  }
+
+  @Test(timeout = 300000)
+  public void testSchedulerRejection() throws Exception {
+    HashMap<NodeId, MockNM> nodes = new HashMap<>();
+    MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm1.getNodeId(), nm1);
+    MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm2.getNodeId(), nm2);
+    MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm3.getNodeId(), nm3);
+    MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm4.getNodeId(), nm4);
+    nm1.registerNode();
+    nm2.registerNode();
+    nm3.registerNode();
+    nm4.registerNode();
+
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
+        Collections.singletonMap(
+            Collections.singleton("foo"),
+            PlacementConstraints.build(
+                PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))
+        ));
+    am1.addSchedulingRequest(
+        Arrays.asList(
+            schedulingRequest(1, 1, 1, 512, "foo"),
+            schedulingRequest(1, 2, 1, 512, "foo"),
+            schedulingRequest(1, 3, 1, 512, "foo"),
+            // Ask for a container larger than the node
+            schedulingRequest(1, 4, 1, 5120, "foo"))
+    );
+    AllocateResponse allocResponse = am1.schedule(); // send the request
+    List<Container> allocatedContainers = new ArrayList<>();
+    List<RejectedSchedulingRequest> rejectedReqs = new ArrayList<>();
+    int allocCount = 1;
+    allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+    rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
+
+    // kick the scheduler
+
+    while (allocCount < 11) {
+      nm1.nodeHeartbeat(true);
+      nm2.nodeHeartbeat(true);
+      nm3.nodeHeartbeat(true);
+      nm4.nodeHeartbeat(true);
+      LOG.info("Waiting for containers to be created for app 1...");
+      sleep(1000);
+      allocResponse = am1.schedule();
+      allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+      rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
+      allocCount++;
+      if (rejectedReqs.size() > 0 && allocatedContainers.size() > 2) {
+        break;
+      }
+    }
+
+    Assert.assertEquals(3, allocatedContainers.size());
+    Set<NodeId> nodeIds = allocatedContainers.stream()
+        .map(x -> x.getNodeId()).collect(Collectors.toSet());
+    // Ensure unique nodes
+    Assert.assertEquals(3, nodeIds.size());
+    RejectedSchedulingRequest rej = rejectedReqs.get(0);
+    Assert.assertEquals(4, rej.getRequest().getAllocationRequestId());
+    Assert.assertEquals(RejectionReason.COULD_NOT_SCHEDULE_ON_NODE,
+        rej.getReason());
+  }
+
+  @Test(timeout = 300000)
+  public void testRePlacementAfterSchedulerRejection() throws Exception {
+    stopRM();
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(
+        YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true);
+    conf.setInt(
+        YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, 2);
+    startRM(conf);
+
+    HashMap<NodeId, MockNM> nodes = new HashMap<>();
+    MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm1.getNodeId(), nm1);
+    MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm2.getNodeId(), nm2);
+    MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm3.getNodeId(), nm3);
+    MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm4.getNodeId(), nm4);
+    MockNM nm5 = new MockNM("h5:1234", 8192, rm.getResourceTrackerService());
+    nodes.put(nm5.getNodeId(), nm5);
+    nm1.registerNode();
+    nm2.registerNode();
+    nm3.registerNode();
+    nm4.registerNode();
+    // No not register nm5 yet..
+
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
+        Collections.singletonMap(
+            Collections.singleton("foo"),
+            PlacementConstraints.build(
+                PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))
+        ));
+    am1.addSchedulingRequest(
+        Arrays.asList(
+            schedulingRequest(1, 1, 1, 512, "foo"),
+            schedulingRequest(1, 2, 1, 512, "foo"),
+            schedulingRequest(1, 3, 1, 512, "foo"),
+            // Ask for a container larger than the node
+            schedulingRequest(1, 4, 1, 5120, "foo"))
+    );
+    AllocateResponse allocResponse = am1.schedule(); // send the request
+    List<Container> allocatedContainers = new ArrayList<>();
+    List<RejectedSchedulingRequest> rejectedReqs = new ArrayList<>();
+    int allocCount = 1;
+    allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+    rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
+
+    // Register node5 only after first allocate - so the initial placement
+    // for the large schedReq goes to some other node..
+    nm5.registerNode();
+
+    // kick the scheduler
+    while (allocCount < 11) {
+      nm1.nodeHeartbeat(true);
+      nm2.nodeHeartbeat(true);
+      nm3.nodeHeartbeat(true);
+      nm4.nodeHeartbeat(true);
+      nm5.nodeHeartbeat(true);
+      LOG.info("Waiting for containers to be created for app 1...");
+      sleep(1000);
+      allocResponse = am1.schedule();
+      allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+      rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
+      allocCount++;
+      if (allocatedContainers.size() > 3) {
+        break;
+      }
+    }
+
+    Assert.assertEquals(4, allocatedContainers.size());
+    Set<NodeId> nodeIds = allocatedContainers.stream()
+        .map(x -> x.getNodeId()).collect(Collectors.toSet());
+    // Ensure unique nodes
+    Assert.assertEquals(4, nodeIds.size());
+  }
+
+  @Test(timeout = 300000)
+  public void testPlacementRejection() throws Exception {
+    HashMap<NodeId, MockNM> nodes = new HashMap<>();
+    MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm1.getNodeId(), nm1);
+    MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm2.getNodeId(), nm2);
+    MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm3.getNodeId(), nm3);
+    MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
+    nodes.put(nm4.getNodeId(), nm4);
+    nm1.registerNode();
+    nm2.registerNode();
+    nm3.registerNode();
+    nm4.registerNode();
+
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
+        Collections.singletonMap(
+            Collections.singleton("foo"),
+            PlacementConstraints.build(
+                PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))
+        ));
+    am1.addSchedulingRequest(
+        Arrays.asList(
+            schedulingRequest(1, 1, 1, 512, "foo"),
+            schedulingRequest(1, 2, 1, 512, "foo"),
+            schedulingRequest(1, 3, 1, 512, "foo"),
+            schedulingRequest(1, 4, 1, 512, "foo"),
+            // Ask for more containers than nodes
+            schedulingRequest(1, 5, 1, 512, "foo"))
+    );
+    AllocateResponse allocResponse = am1.schedule(); // send the request
+    List<Container> allocatedContainers = new ArrayList<>();
+    List<RejectedSchedulingRequest> rejectedReqs = new ArrayList<>();
+    int allocCount = 1;
+    allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+    rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
+
+    // kick the scheduler
+
+    while (allocCount < 11) {
+      nm1.nodeHeartbeat(true);
+      nm2.nodeHeartbeat(true);
+      nm3.nodeHeartbeat(true);
+      nm4.nodeHeartbeat(true);
+      LOG.info("Waiting for containers to be created for app 1...");
+      sleep(1000);
+      allocResponse = am1.schedule();
+      allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+      rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
+      allocCount++;
+      if (rejectedReqs.size() > 0 && allocatedContainers.size() > 3) {
+        break;
+      }
+    }
+
+    Assert.assertEquals(4, allocatedContainers.size());
+    Set<NodeId> nodeIds = allocatedContainers.stream()
+        .map(x -> x.getNodeId()).collect(Collectors.toSet());
+    // Ensure unique nodes
+    Assert.assertEquals(4, nodeIds.size());
+    RejectedSchedulingRequest rej = rejectedReqs.get(0);
+    Assert.assertEquals(RejectionReason.COULD_NOT_PLACE_ON_NODE,
+        rej.getReason());
+  }
+
+  private static SchedulingRequest schedulingRequest(
+      int priority, long allocReqId, int cores, int mem, String... tags) {
+    return schedulingRequest(priority, allocReqId, cores, mem,
+        ExecutionType.GUARANTEED, tags);
+  }
+
+  private static SchedulingRequest schedulingRequest(
+      int priority, long allocReqId, int cores, int mem,
+      ExecutionType execType, String... tags) {
+    return SchedulingRequest.newBuilder()
+        .priority(Priority.newInstance(priority))
+        .allocationRequestId(allocReqId)
+        .allocationTags(new HashSet<>(Arrays.asList(tags)))
+        .executionType(ExecutionTypeRequest.newInstance(execType, true))
+        .resourceSizing(
+            ResourceSizing.newInstance(1, Resource.newInstance(mem, cores)))
+        .build();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org