You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/24 22:48:16 UTC

[15/42] hadoop git commit: YARN-7613. Implement Basic algorithm for constraint based placement. (Panagiotis Garefalakis via asuresh)

YARN-7613. Implement Basic algorithm for constraint based placement. (Panagiotis Garefalakis via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/59f0261a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/59f0261a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/59f0261a

Branch: refs/heads/YARN-6592
Commit: 59f0261a1ddd119dceb524f2c31224d50da3bff1
Parents: c245807
Author: Arun Suresh <as...@apache.org>
Authored: Wed Dec 27 22:59:22 2017 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Wed Jan 24 14:20:34 2018 -0800

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |   4 +
 .../src/main/resources/yarn-default.xml         |   8 +-
 .../rmcontainer/RMContainerImpl.java            |  10 +-
 .../constraint/AllocationTagsManager.java       | 121 ++++++++++---
 .../algorithm/DefaultPlacementAlgorithm.java    | 172 +++++++++++++++++++
 .../iterators/PopularTagsIterator.java          |  71 ++++++++
 .../algorithm/iterators/SerialIterator.java     |  53 ++++++
 .../algorithm/iterators/package-info.java       |  29 ++++
 .../constraint/algorithm/package-info.java      |  29 ++++
 .../constraint/processor/BatchedRequests.java   |  45 ++++-
 .../processor/PlacementProcessor.java           |  32 ++--
 .../processor/SamplePlacementAlgorithm.java     | 144 ----------------
 .../constraint/TestAllocationTagsManager.java   | 156 ++++++++++++-----
 .../TestBatchedRequestsIterators.java           |  82 +++++++++
 .../constraint/TestPlacementProcessor.java      |   4 +-
 15 files changed, 721 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/59f0261a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 61eb4f8..bfda5bc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -536,6 +536,10 @@ public class YarnConfiguration extends Configuration {
   public static final String RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS =
       RM_PREFIX + "placement-constraints.algorithm.class";
 
+  /** Used for BasicPlacementAlgorithm - default SERIAL. **/
+  public static final String RM_PLACEMENT_CONSTRAINTS_ALGORITHM_ITERATOR =
+      RM_PREFIX + "placement-constraints.algorithm.iterator";
+
   public static final String RM_PLACEMENT_CONSTRAINTS_ENABLED =
       RM_PREFIX + "placement-constraints.enabled";
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59f0261a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index df4e8b6..6e3c8b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -145,7 +145,13 @@
   <property>
     <description>Constraint Placement Algorithm to be used.</description>
     <name>yarn.resourcemanager.placement-constraints.algorithm.class</name>
-    <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.SamplePlacementAlgorithm</value>
+    <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm</value>
+  </property>
+
+  <property>
+    <description>Placement Algorithm Requests Iterator to be used.</description>
+    <name>yarn.resourcemanager.placement-constraints.algorithm.iterator</name>
+    <value>SERIAL</value>
   </property>
 
   <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59f0261a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index c873509..2c4ef7b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -579,9 +579,8 @@ public class RMContainerImpl implements RMContainer {
     public void transition(RMContainerImpl container, RMContainerEvent event) {
       // Notify placementManager
       container.rmContext.getAllocationTagsManager().addContainer(
-          container.getNodeId(),
-          container.getApplicationAttemptId().getApplicationId(),
-          container.getContainerId(), container.getAllocationTags());
+          container.getNodeId(), container.getContainerId(),
+          container.getAllocationTags());
 
       container.eventHandler.handle(new RMAppAttemptEvent(
           container.appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED));
@@ -696,9 +695,8 @@ public class RMContainerImpl implements RMContainer {
     public void transition(RMContainerImpl container, RMContainerEvent event) {
       // Notify placementManager
       container.rmContext.getAllocationTagsManager().removeContainer(
-          container.getNodeId(),
-          container.getApplicationAttemptId().getApplicationId(),
-          container.getContainerId(), container.getAllocationTags());
+          container.getNodeId(), container.getContainerId(),
+          container.getAllocationTags());
 
       RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59f0261a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
index 7b0b959..4bb3e79 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
@@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -54,24 +55,27 @@ public class AllocationTagsManager {
   private final RMContext rmContext;
 
   // Application's tags to Node
-  private Map<ApplicationId, NodeToCountedTags> perAppNodeMappings =
+  private Map<ApplicationId, TypeToCountedTags> perAppNodeMappings =
       new HashMap<>();
   // Application's tags to Rack
-  private Map<ApplicationId, NodeToCountedTags> perAppRackMappings =
+  private Map<ApplicationId, TypeToCountedTags> perAppRackMappings =
       new HashMap<>();
+  // Application's Temporary containers mapping
+  private Map<ApplicationId, Map<NodeId, Map<ContainerId, Set<String>>>>
+      appTempMappings = new HashMap<>();
 
   // Global tags to node mapping (used to fast return aggregated tags
   // cardinality across apps)
-  private NodeToCountedTags<NodeId> globalNodeMapping = new NodeToCountedTags();
+  private TypeToCountedTags<NodeId> globalNodeMapping = new TypeToCountedTags();
   // Global tags to Rack mapping
-  private NodeToCountedTags<String> globalRackMapping = new NodeToCountedTags();
+  private TypeToCountedTags<String> globalRackMapping = new TypeToCountedTags();
 
   /**
    * Generic store mapping type <T> to counted tags.
    * Currently used both for NodeId to Tag, Count and Rack to Tag, Count
    */
   @VisibleForTesting
-  static class NodeToCountedTags<T> {
+  static class TypeToCountedTags<T> {
     // Map<Type, Map<Tag, Count>>
     private Map<T, Map<String, Long>> typeToTagsWithCount = new HashMap<>();
 
@@ -209,25 +213,31 @@ public class AllocationTagsManager {
   }
 
   @VisibleForTesting
-  Map<ApplicationId, NodeToCountedTags> getPerAppNodeMappings() {
+  Map<ApplicationId, TypeToCountedTags> getPerAppNodeMappings() {
     return perAppNodeMappings;
   }
 
   @VisibleForTesting
-  Map<ApplicationId, NodeToCountedTags> getPerAppRackMappings() {
+  Map<ApplicationId, TypeToCountedTags> getPerAppRackMappings() {
     return perAppRackMappings;
   }
 
   @VisibleForTesting
-  NodeToCountedTags getGlobalNodeMapping() {
+  TypeToCountedTags getGlobalNodeMapping() {
     return globalNodeMapping;
   }
 
   @VisibleForTesting
-  NodeToCountedTags getGlobalRackMapping() {
+  TypeToCountedTags getGlobalRackMapping() {
     return globalRackMapping;
   }
 
+  @VisibleForTesting
+  public Map<NodeId, Map<ContainerId, Set<String>>> getAppTempMappings(
+      ApplicationId applicationId) {
+    return appTempMappings.get(applicationId);
+  }
+
   public AllocationTagsManager(RMContext context) {
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     readLock = lock.readLock();
@@ -235,18 +245,52 @@ public class AllocationTagsManager {
     rmContext = context;
   }
 
+  //
+
+  /**
+   * Method adds a temporary fake-container tag to Node mapping.
+   * Used by the constrained placement algorithm to keep track of containers
+   * that are currently placed on nodes but are not yet allocated.
+   * @param nodeId
+   * @param applicationId
+   * @param allocationTags
+   */
+  public void addTempContainer(NodeId nodeId, ApplicationId applicationId,
+      Set<String> allocationTags) {
+    ContainerId tmpContainer = ContainerId.newContainerId(
+        ApplicationAttemptId.newInstance(applicationId, 1), System.nanoTime());
+
+    writeLock.lock();
+    try {
+      Map<NodeId, Map<ContainerId, Set<String>>> appTempMapping =
+          appTempMappings.computeIfAbsent(applicationId, k -> new HashMap<>());
+      Map<ContainerId, Set<String>> containerTempMapping =
+          appTempMapping.computeIfAbsent(nodeId, k -> new HashMap<>());
+      containerTempMapping.put(tmpContainer, allocationTags);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Added TEMP container=" + tmpContainer + " with tags=["
+            + StringUtils.join(allocationTags, ",") + "]");
+      }
+    } finally {
+      writeLock.unlock();
+    }
+
+    addContainer(nodeId, tmpContainer, allocationTags);
+  }
+
   /**
    * Notify container allocated on a node.
    *
    * @param nodeId         allocated node.
-   * @param applicationId  applicationId
    * @param containerId    container id.
    * @param allocationTags allocation tags, see
    *                       {@link SchedulingRequest#getAllocationTags()}
    *                       application_id will be added to allocationTags.
    */
-  public void addContainer(NodeId nodeId, ApplicationId applicationId,
-      ContainerId containerId, Set<String> allocationTags) {
+  public void addContainer(NodeId nodeId, ContainerId containerId,
+      Set<String> allocationTags) {
+    ApplicationId applicationId =
+        containerId.getApplicationAttemptId().getApplicationId();
     String applicationIdTag =
         AllocationTagsNamespaces.APP_ID + applicationId.toString();
 
@@ -260,10 +304,10 @@ public class AllocationTagsManager {
 
     writeLock.lock();
     try {
-      NodeToCountedTags perAppTagsMapping = perAppNodeMappings
-          .computeIfAbsent(applicationId, k -> new NodeToCountedTags());
-      NodeToCountedTags perAppRackTagsMapping = perAppRackMappings
-          .computeIfAbsent(applicationId, k -> new NodeToCountedTags());
+      TypeToCountedTags perAppTagsMapping = perAppNodeMappings
+          .computeIfAbsent(applicationId, k -> new TypeToCountedTags());
+      TypeToCountedTags perAppRackTagsMapping = perAppRackMappings
+          .computeIfAbsent(applicationId, k -> new TypeToCountedTags());
       // Covering test-cases where context is mocked
       String nodeRack = (rmContext.getRMNodes() != null
           && rmContext.getRMNodes().get(nodeId) != null)
@@ -294,12 +338,13 @@ public class AllocationTagsManager {
    * Notify container removed.
    *
    * @param nodeId         nodeId
-   * @param applicationId  applicationId
    * @param containerId    containerId.
    * @param allocationTags allocation tags for given container
    */
-  public void removeContainer(NodeId nodeId, ApplicationId applicationId,
+  public void removeContainer(NodeId nodeId,
       ContainerId containerId, Set<String> allocationTags) {
+    ApplicationId applicationId =
+        containerId.getApplicationAttemptId().getApplicationId();
     String applicationIdTag =
         AllocationTagsNamespaces.APP_ID + applicationId.toString();
     boolean useSet = false;
@@ -313,9 +358,9 @@ public class AllocationTagsManager {
 
     writeLock.lock();
     try {
-      NodeToCountedTags perAppTagsMapping =
+      TypeToCountedTags perAppTagsMapping =
           perAppNodeMappings.get(applicationId);
-      NodeToCountedTags perAppRackTagsMapping =
+      TypeToCountedTags perAppRackTagsMapping =
           perAppRackMappings.get(applicationId);
       if (perAppTagsMapping == null) {
         return;
@@ -354,6 +399,34 @@ public class AllocationTagsManager {
   }
 
   /**
+   * Method removes temporary containers associated with an application
+   * Used by the placement algorithm to clean temporary tags at the end of
+   * a placement cycle.
+   * @param applicationId Application Id.
+   */
+  public void cleanTempContainers(ApplicationId applicationId) {
+
+    if (!appTempMappings.get(applicationId).isEmpty()) {
+      appTempMappings.get(applicationId).entrySet().stream().forEach(nodeE -> {
+        nodeE.getValue().entrySet().stream().forEach(containerE -> {
+          removeContainer(nodeE.getKey(), containerE.getKey(),
+              containerE.getValue());
+        });
+      });
+      writeLock.lock();
+      try {
+        appTempMappings.remove(applicationId);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Removed TEMP containers of app=" + applicationId);
+        }
+      } finally {
+        writeLock.unlock();
+      }
+    }
+  }
+
+
+  /**
    * Get Node cardinality for a specific tag.
    * When applicationId is null, method returns aggregated cardinality
    *
@@ -378,7 +451,7 @@ public class AllocationTagsManager {
             "Must specify nodeId/tag to query cardinality");
       }
 
-      NodeToCountedTags mapping;
+      TypeToCountedTags mapping;
       if (applicationId != null) {
         mapping = perAppNodeMappings.get(applicationId);
       } else {
@@ -419,7 +492,7 @@ public class AllocationTagsManager {
             "Must specify rack/tag to query cardinality");
       }
 
-      NodeToCountedTags mapping;
+      TypeToCountedTags mapping;
       if (applicationId != null) {
         mapping = perAppRackMappings.get(applicationId);
       } else {
@@ -492,7 +565,7 @@ public class AllocationTagsManager {
             "Must specify nodeId/tags/op to query cardinality");
       }
 
-      NodeToCountedTags mapping;
+      TypeToCountedTags mapping;
       if (applicationId != null) {
         mapping = perAppNodeMappings.get(applicationId);
       } else {
@@ -540,7 +613,7 @@ public class AllocationTagsManager {
             "Must specify rack/tags/op to query cardinality");
       }
 
-      NodeToCountedTags mapping;
+      TypeToCountedTags mapping;
       if (applicationId != null) {
         mapping = perAppRackMappings.get(applicationId);
       } else {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59f0261a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
new file mode 100644
index 0000000..395c156
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.BatchedRequests;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.NodeCandidateSelector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Basic placement algorithm.
+ * Supports different Iterators at SchedulingRequest level including:
+ * Serial, PopularTags
+ */
+public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DefaultPlacementAlgorithm.class);
+
+  private AllocationTagsManager tagsManager;
+  private PlacementConstraintManager constraintManager;
+  private NodeCandidateSelector nodeSelector;
+
+  @Override
+  public void init(RMContext rmContext) {
+    this.tagsManager = rmContext.getAllocationTagsManager();
+    this.constraintManager = rmContext.getPlacementConstraintManager();
+    this.nodeSelector =
+        filter -> ((AbstractYarnScheduler) (rmContext).getScheduler())
+            .getNodes(filter);
+  }
+
+  /**
+   * TODO: Method will be moved to PlacementConstraintsUtil class (YARN-7682)
+   * @param applicationId
+   * @param allocationTags
+   * @param nodeId
+   * @param tagsManager
+   * @return boolean
+   * @throws InvalidAllocationTagsQueryException
+   */
+  public boolean canAssign(ApplicationId applicationId,
+      Set<String> allocationTags, NodeId nodeId,
+      AllocationTagsManager tagsManager)
+      throws InvalidAllocationTagsQueryException {
+    PlacementConstraint constraint =
+        constraintManager.getConstraint(applicationId, allocationTags);
+    if (constraint == null) {
+      return true;
+    }
+    // TODO: proper transformations
+    // Currently works only for simple anti-affinity
+    // NODE scope target expressions
+    PlacementConstraintTransformations.SpecializedConstraintTransformer transformer =
+        new PlacementConstraintTransformations.SpecializedConstraintTransformer(
+            constraint);
+    PlacementConstraint transform = transformer.transform();
+    PlacementConstraint.TargetConstraint targetConstraint =
+        (PlacementConstraint.TargetConstraint) transform.getConstraintExpr();
+    // Assume a single target expression tag;
+    // The Sample Algorithm assumes a constraint will always be a simple
+    // Target Constraint with a single entry in the target set.
+    // As mentioned in the class javadoc - This algorithm should be
+    // used mostly for testing and validating end-2-end workflow.
+    String targetTag = targetConstraint.getTargetExpressions().iterator().next()
+        .getTargetValues().iterator().next();
+    // TODO: Assuming anti-affinity constraint
+    long nodeCardinality =
+        tagsManager.getNodeCardinality(nodeId, applicationId, targetTag);
+    if (nodeCardinality != 0) {
+      return false;
+    }
+    // return true if it is a valid placement
+    return true;
+  }
+
+  public boolean attemptPlacementOnNode(ApplicationId appId,
+      SchedulingRequest schedulingRequest, SchedulerNode schedulerNode)
+      throws InvalidAllocationTagsQueryException {
+    int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations();
+    if (numAllocs > 0) {
+      if (canAssign(appId,
+          schedulingRequest.getAllocationTags(), schedulerNode.getNodeID(),
+          tagsManager)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+
+  @Override
+  public void place(ConstraintPlacementAlgorithmInput input,
+      ConstraintPlacementAlgorithmOutputCollector collector) {
+    BatchedRequests requests = (BatchedRequests) input;
+    ConstraintPlacementAlgorithmOutput resp =
+        new ConstraintPlacementAlgorithmOutput(requests.getApplicationId());
+    List<SchedulerNode> allNodes = nodeSelector.selectNodes(null);
+
+    Iterator<SchedulingRequest> requestIterator = requests.iterator();
+    while (requestIterator.hasNext()) {
+      SchedulingRequest schedulingRequest = requestIterator.next();
+      Iterator<SchedulerNode> nodeIter = allNodes.iterator();
+      int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations();
+      while (nodeIter.hasNext() && numAllocs > 0) {
+        SchedulerNode node = nodeIter.next();
+        try {
+          if (attemptPlacementOnNode(requests.getApplicationId(),
+              schedulingRequest, node)) {
+            schedulingRequest.getResourceSizing()
+                .setNumAllocations(--numAllocs);
+            PlacedSchedulingRequest placedReq =
+                new PlacedSchedulingRequest(schedulingRequest);
+            placedReq.setPlacementAttempt(requests.getPlacementAttempt());
+            placedReq.getNodes().add(node);
+            resp.getPlacedRequests().add(placedReq);
+            numAllocs =
+                schedulingRequest.getResourceSizing().getNumAllocations();
+            // Add temp-container tags for current placement cycle
+            this.tagsManager.addTempContainer(node.getNodeID(),
+                requests.getApplicationId(),
+                schedulingRequest.getAllocationTags());
+          }
+        } catch (InvalidAllocationTagsQueryException e) {
+          LOG.warn("Got exception from TagManager !", e);
+        }
+      }
+    }
+    // Add all requests whose numAllocations still > 0 to rejected list.
+    requests.getSchedulingRequests().stream()
+        .filter(sReq -> sReq.getResourceSizing().getNumAllocations() > 0)
+        .forEach(rejReq -> resp.getRejectedRequests().add(rejReq));
+    collector.collect(resp);
+    // Clean current temp-container tags
+    this.tagsManager.cleanTempContainers(requests.getApplicationId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59f0261a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/PopularTagsIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/PopularTagsIterator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/PopularTagsIterator.java
new file mode 100644
index 0000000..ca3e351
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/PopularTagsIterator.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.iterators;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+
+/**
+ * Traverse Scheduling requests with the most popular tags (count) first.
+ * Currently the count is per Batch but could use TagManager for global count.
+ */
+public class PopularTagsIterator implements Iterator<SchedulingRequest> {
+
+  private final List<SchedulingRequest> schedulingRequestList;
+  private int cursor;
+
+  public PopularTagsIterator(Collection<SchedulingRequest> schedulingRequests) {
+    this.schedulingRequestList = new ArrayList<>(schedulingRequests);
+    // Most popular First
+    Collections.sort(schedulingRequestList,
+        (o1, o2) -> (int) getTagPopularity(o2) - (int) getTagPopularity(o1));
+
+    this.cursor = 0;
+  }
+
+  private long getTagPopularity(SchedulingRequest o1) {
+    long max = 0;
+    for (String tag : o1.getAllocationTags()) {
+      long count = schedulingRequestList.stream()
+          .filter(req -> req.getAllocationTags().contains(tag)).count();
+      if (count > max) {
+        max = count;
+      }
+    }
+    return max;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return (cursor < schedulingRequestList.size());
+  }
+
+  @Override
+  public SchedulingRequest next() {
+    if (hasNext()) {
+      return schedulingRequestList.get(cursor++);
+    }
+    throw new NoSuchElementException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59f0261a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/SerialIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/SerialIterator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/SerialIterator.java
new file mode 100644
index 0000000..68733a2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/SerialIterator.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.iterators;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+
+/**
+ * Traverse Scheduling Requests in the same order as they arrive
+ */
+public class SerialIterator implements Iterator<SchedulingRequest> {
+
+  private final List<SchedulingRequest> schedulingRequestList;
+  private int cursor;
+
+  public SerialIterator(Collection<SchedulingRequest> schedulingRequests) {
+    this.schedulingRequestList = new ArrayList<>(schedulingRequests);
+    this.cursor = 0;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return (cursor < schedulingRequestList.size());
+  }
+
+  @Override
+  public SchedulingRequest next() {
+    if (hasNext()) {
+      return schedulingRequestList.get(cursor++);
+    }
+    throw new NoSuchElementException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59f0261a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/package-info.java
new file mode 100644
index 0000000..c84671e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement
+ * contains classes related to scheduling containers using placement
+ * constraints.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.iterators;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59f0261a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/package-info.java
new file mode 100644
index 0000000..bb82077
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement
+ * contains classes related to scheduling containers using placement
+ * constraints.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59f0261a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java
index fe92d2f..8b04860 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java
@@ -21,12 +21,15 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.iterators.PopularTagsIterator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.iterators.SerialIterator;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput;
 
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
@@ -35,7 +38,8 @@ import java.util.Set;
  * to place as a batch. The placement algorithm tends to give more optimal
  * placements if more requests are batched together.
  */
-class BatchedRequests implements ConstraintPlacementAlgorithmInput {
+public class BatchedRequests
+    implements ConstraintPlacementAlgorithmInput, Iterable<SchedulingRequest> {
 
   // PlacementAlgorithmOutput attempt - the number of times the requests in this
   // batch has been placed but was rejected by the scheduler.
@@ -44,19 +48,46 @@ class BatchedRequests implements ConstraintPlacementAlgorithmInput {
   private final ApplicationId applicationId;
   private final Collection<SchedulingRequest> requests;
   private final Map<String, Set<NodeId>> blacklist = new HashMap<>();
+  private IteratorType iteratorType;
 
-  BatchedRequests(ApplicationId applicationId,
+  /**
+   * Iterator Type.
+   */
+  public enum IteratorType {
+    SERIAL,
+    POPULAR_TAGS
+  }
+
+  public BatchedRequests(IteratorType type, ApplicationId applicationId,
       Collection<SchedulingRequest> requests, int attempt) {
+    this.iteratorType = type;
     this.applicationId = applicationId;
     this.requests = requests;
     this.placementAttempt = attempt;
   }
 
   /**
+   * Exposes SchedulingRequest Iterator interface which can be used
+   * to traverse requests using different heuristics i.e. Tag Popularity
+   * @return SchedulingRequest Iterator.
+   */
+  @Override
+  public Iterator<SchedulingRequest> iterator() {
+    switch (this.iteratorType) {
+    case SERIAL:
+      return new SerialIterator(requests);
+    case POPULAR_TAGS:
+      return new PopularTagsIterator(requests);
+    default:
+      return null;
+    }
+  }
+
+  /**
    * Get Application Id.
    * @return Application Id.
    */
-  ApplicationId getApplicationId() {
+  public ApplicationId getApplicationId() {
     return applicationId;
   }
 
@@ -73,11 +104,11 @@ class BatchedRequests implements ConstraintPlacementAlgorithmInput {
    * Add a Scheduling request to the batch.
    * @param req Scheduling Request.
    */
-  void addToBatch(SchedulingRequest req) {
+  public void addToBatch(SchedulingRequest req) {
     requests.add(req);
   }
 
-  void addToBlacklist(Set<String> tags, SchedulerNode node) {
+  public void addToBlacklist(Set<String> tags, SchedulerNode node) {
     if (tags != null && !tags.isEmpty()) {
       // We are currently assuming a single allocation tag
       // per scheduler request currently.
@@ -90,7 +121,7 @@ class BatchedRequests implements ConstraintPlacementAlgorithmInput {
    * Get placement attempt.
    * @return PlacementAlgorithmOutput placement Attempt.
    */
-  int getPlacementAttempt() {
+  public int getPlacementAttempt() {
     return placementAttempt;
   }
 
@@ -99,7 +130,7 @@ class BatchedRequests implements ConstraintPlacementAlgorithmInput {
    * @param tag Tag.
    * @return Set of blacklisted Nodes.
    */
-  Set<NodeId> getBlacklist(String tag) {
+  public Set<NodeId> getBlacklist(String tag) {
     return blacklist.getOrDefault(tag, Collections.EMPTY_SET);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59f0261a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
index d613d4e..8e9c79c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
@@ -35,8 +35,10 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse;
@@ -98,6 +100,7 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
   private Map<ApplicationId, List<SchedulingRequest>> requestsToReject =
       new ConcurrentHashMap<>();
 
+  private BatchedRequests.IteratorType iteratorType;
   private PlacementDispatcher placementDispatcher;
 
 
@@ -122,9 +125,20 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
     if (instances != null && !instances.isEmpty()) {
       algorithm = instances.get(0);
     } else {
-      algorithm = new SamplePlacementAlgorithm();
+      algorithm = new DefaultPlacementAlgorithm();
+    }
+    LOG.info("Placement Algorithm [{}]", algorithm.getClass().getName());
+
+    String iteratorName = ((RMContextImpl) amsContext).getYarnConfiguration()
+        .get(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_ITERATOR,
+            BatchedRequests.IteratorType.SERIAL.name());
+    LOG.info("Placement Algorithm Iterator[{}]", iteratorName);
+    try {
+      iteratorType = BatchedRequests.IteratorType.valueOf(iteratorName);
+    } catch (IllegalArgumentException e) {
+      throw new YarnRuntimeException(
+          "Could not instantiate Placement Algorithm Iterator: ", e);
     }
-    LOG.info("Planning Algorithm [{}]", algorithm.getClass().getName());
 
     int algoPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt(
         YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE,
@@ -188,9 +202,8 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
   private void dispatchRequestsForPlacement(ApplicationAttemptId appAttemptId,
       List<SchedulingRequest> schedulingRequests) {
     if (schedulingRequests != null && !schedulingRequests.isEmpty()) {
-      this.placementDispatcher.dispatch(
-          new BatchedRequests(appAttemptId.getApplicationId(),
-              schedulingRequests, 1));
+      this.placementDispatcher.dispatch(new BatchedRequests(iteratorType,
+          appAttemptId.getApplicationId(), schedulingRequests, 1));
     }
   }
 
@@ -329,11 +342,10 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
       }
     }
     if (!isAdded) {
-      BatchedRequests br =
-          new BatchedRequests(schedulerResponse.getApplicationId(),
-              Collections.singleton(
-                  schedulerResponse.getSchedulingRequest()),
-              placementAttempt + 1);
+      BatchedRequests br = new BatchedRequests(iteratorType,
+          schedulerResponse.getApplicationId(),
+          Collections.singleton(schedulerResponse.getSchedulingRequest()),
+          placementAttempt + 1);
       reqsToRetry.add(br);
       br.addToBlacklist(
           schedulerResponse.getSchedulingRequest().getAllocationTags(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59f0261a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java
deleted file mode 100644
index 8d49801..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
-
-import org.apache.hadoop.yarn.api.records.SchedulingRequest;
-import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
-import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetConstraint;
-import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SpecializedConstraintTransformer;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Sample Test algorithm. Assumes anti-affinity always
- * It also assumes the numAllocations in resource sizing is always = 1
- *
- * NOTE: This is just a sample implementation. Not be actually used
- */
-public class SamplePlacementAlgorithm implements ConstraintPlacementAlgorithm {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(SamplePlacementAlgorithm.class);
-
-  private AllocationTagsManager tagsManager;
-  private PlacementConstraintManager constraintManager;
-  private NodeCandidateSelector nodeSelector;
-
-  @Override
-  public void init(RMContext rmContext) {
-    this.tagsManager = rmContext.getAllocationTagsManager();
-    this.constraintManager = rmContext.getPlacementConstraintManager();
-    this.nodeSelector =
-        filter -> ((AbstractYarnScheduler)(rmContext)
-            .getScheduler()).getNodes(filter);
-  }
-
-  @Override
-  public void place(ConstraintPlacementAlgorithmInput input,
-      ConstraintPlacementAlgorithmOutputCollector collector) {
-    BatchedRequests requests = (BatchedRequests)input;
-    ConstraintPlacementAlgorithmOutput resp =
-        new ConstraintPlacementAlgorithmOutput(requests.getApplicationId());
-    List<SchedulerNode> allNodes = nodeSelector.selectNodes(null);
-    Map<String, List<SchedulingRequest>> tagIndexedRequests = new HashMap<>();
-    requests.getSchedulingRequests()
-        .stream()
-        .filter(r -> r.getAllocationTags() != null)
-        .forEach(
-            req -> req.getAllocationTags().forEach(
-                tag -> tagIndexedRequests.computeIfAbsent(tag,
-                    k -> new ArrayList<>()).add(req))
-        );
-    for (Map.Entry<String, List<SchedulingRequest>> entry :
-        tagIndexedRequests.entrySet()) {
-      String tag = entry.getKey();
-      PlacementConstraint constraint =
-          constraintManager.getConstraint(requests.getApplicationId(),
-              Collections.singleton(tag));
-      if (constraint != null) {
-        // Currently works only for simple anti-affinity
-        // NODE scope target expressions
-        SpecializedConstraintTransformer transformer =
-            new SpecializedConstraintTransformer(constraint);
-        PlacementConstraint transform = transformer.transform();
-        TargetConstraint targetConstraint =
-            (TargetConstraint) transform.getConstraintExpr();
-        // Assume a single target expression tag;
-        // The Sample Algorithm assumes a constraint will always be a simple
-        // Target Constraint with a single entry in the target set.
-        // As mentioned in the class javadoc - This algorithm should be
-        // used mostly for testing and validating end-2-end workflow.
-        String targetTag =
-            targetConstraint.getTargetExpressions().iterator().next()
-            .getTargetValues().iterator().next();
-        // iterate over all nodes
-        Iterator<SchedulerNode> nodeIter = allNodes.iterator();
-        List<SchedulingRequest> schedulingRequests = entry.getValue();
-        Iterator<SchedulingRequest> reqIter = schedulingRequests.iterator();
-        while (reqIter.hasNext()) {
-          SchedulingRequest sReq = reqIter.next();
-          int numAllocs = sReq.getResourceSizing().getNumAllocations();
-          while (numAllocs > 0 && nodeIter.hasNext()) {
-            SchedulerNode node = nodeIter.next();
-            long nodeCardinality = 0;
-            try {
-              nodeCardinality = tagsManager.getNodeCardinality(
-                  node.getNodeID(), requests.getApplicationId(),
-                  targetTag);
-              if (nodeCardinality == 0 &&
-                  !requests.getBlacklist(tag).contains(node.getNodeID())) {
-                numAllocs--;
-                sReq.getResourceSizing().setNumAllocations(numAllocs);
-                PlacedSchedulingRequest placedReq =
-                    new PlacedSchedulingRequest(sReq);
-                placedReq.setPlacementAttempt(requests.getPlacementAttempt());
-                placedReq.getNodes().add(node);
-                resp.getPlacedRequests().add(placedReq);
-              }
-            } catch (InvalidAllocationTagsQueryException e) {
-              LOG.warn("Got exception from TagManager !", e);
-            }
-          }
-        }
-      }
-    }
-    // Add all requests whose numAllocations still > 0 to rejected list.
-    requests.getSchedulingRequests().stream()
-        .filter(sReq -> sReq.getResourceSizing().getNumAllocations() > 0)
-        .forEach(rejReq -> resp.getRejectedRequests().add(rejReq));
-    collector.collect(resp);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59f0261a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
index 0ce1614..f1d5663 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
@@ -75,24 +75,24 @@ public class TestAllocationTagsManager {
 
     // 3 Containers from app1
     atm.addContainer(NodeId.fromString("host1:123"),
-        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+        TestUtils.getMockContainerId(1, 1),
         ImmutableSet.of("mapper", "reducer"));
 
     atm.addContainer(NodeId.fromString("host2:123"),
-        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
+        TestUtils.getMockContainerId(1, 2),
         ImmutableSet.of("mapper", "reducer"));
 
     atm.addContainer(NodeId.fromString("host1:123"),
-        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
+        TestUtils.getMockContainerId(1, 3),
         ImmutableSet.of("service"));
 
     atm.addContainer(NodeId.fromString("host2:123"),
-        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
+        TestUtils.getMockContainerId(1, 4),
         ImmutableSet.of("reducer"));
 
     // 1 Container from app2
     atm.addContainer(NodeId.fromString("host2:123"),
-        TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
+        TestUtils.getMockContainerId(2, 3),
         ImmutableSet.of("service"));
 
     // Get Node Cardinality of app1 on node1, with tag "mapper"
@@ -170,24 +170,21 @@ public class TestAllocationTagsManager {
 
     // Finish all containers:
     atm.removeContainer(NodeId.fromString("host1:123"),
-        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+        TestUtils.getMockContainerId(1, 1),
         ImmutableSet.of("mapper", "reducer"));
 
     atm.removeContainer(NodeId.fromString("host2:123"),
-        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
+        TestUtils.getMockContainerId(1, 2),
         ImmutableSet.of("mapper", "reducer"));
 
     atm.removeContainer(NodeId.fromString("host1:123"),
-        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
-        ImmutableSet.of("service"));
+        TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service"));
 
     atm.removeContainer(NodeId.fromString("host2:123"),
-        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
-        ImmutableSet.of("reducer"));
+        TestUtils.getMockContainerId(1, 4), ImmutableSet.of("reducer"));
 
     atm.removeContainer(NodeId.fromString("host2:123"),
-        TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
-        ImmutableSet.of("service"));
+        TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service"));
 
     // Expect all cardinality to be 0
     // Get Cardinality of app1 on node1, with tag "mapper"
@@ -270,25 +267,22 @@ public class TestAllocationTagsManager {
 
     // 3 Containers from app1
     atm.addContainer(NodeId.fromString("host1:123"),
-        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+        TestUtils.getMockContainerId(1, 1),
         ImmutableSet.of("mapper", "reducer"));
 
     atm.addContainer(NodeId.fromString("host2:123"),
-        TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 2),
+        TestUtils.getMockContainerId(2, 2),
         ImmutableSet.of("mapper", "reducer"));
 
     atm.addContainer(NodeId.fromString("host1:123"),
-        TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 4),
-        ImmutableSet.of("reducer"));
+        TestUtils.getMockContainerId(2, 4), ImmutableSet.of("reducer"));
 
     atm.addContainer(NodeId.fromString("host2:123"),
-        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
-        ImmutableSet.of("service"));
+        TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service"));
 
     // 1 Container from app2
     atm.addContainer(NodeId.fromString("host2:123"),
-        TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
-        ImmutableSet.of("service"));
+        TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service"));
 
     // Get Rack Cardinality of app1 on rack0, with tag "mapper"
     Assert.assertEquals(1, atm.getRackCardinality("rack0",
@@ -325,45 +319,39 @@ public class TestAllocationTagsManager {
 
     // Add a bunch of containers
     atm.addContainer(NodeId.fromString("host1:123"),
-        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+        TestUtils.getMockContainerId(1, 1),
         ImmutableSet.of("mapper", "reducer"));
 
     atm.addContainer(NodeId.fromString("host2:123"),
-        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
+        TestUtils.getMockContainerId(1, 2),
         ImmutableSet.of("mapper", "reducer"));
 
     atm.addContainer(NodeId.fromString("host1:123"),
-        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
-        ImmutableSet.of("service"));
+        TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service"));
 
     atm.addContainer(NodeId.fromString("host2:123"),
-        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
-        ImmutableSet.of("reducer"));
+        TestUtils.getMockContainerId(1, 4), ImmutableSet.of("reducer"));
 
     atm.addContainer(NodeId.fromString("host2:123"),
-        TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
-        ImmutableSet.of("service"));
+        TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service"));
 
     // Remove all these containers
     atm.removeContainer(NodeId.fromString("host1:123"),
-        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+        TestUtils.getMockContainerId(1, 1),
         ImmutableSet.of("mapper", "reducer"));
 
     atm.removeContainer(NodeId.fromString("host2:123"),
-        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
+        TestUtils.getMockContainerId(1, 2),
         ImmutableSet.of("mapper", "reducer"));
 
     atm.removeContainer(NodeId.fromString("host1:123"),
-        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
-        ImmutableSet.of("service"));
+        TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service"));
 
     atm.removeContainer(NodeId.fromString("host2:123"),
-        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
-        ImmutableSet.of("reducer"));
+        TestUtils.getMockContainerId(1, 4), ImmutableSet.of("reducer"));
 
     atm.removeContainer(NodeId.fromString("host2:123"),
-        TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
-        ImmutableSet.of("service"));
+        TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service"));
 
     // Check internal data structure
     Assert.assertEquals(0,
@@ -375,6 +363,87 @@ public class TestAllocationTagsManager {
   }
 
   @Test
+  public void testTempContainerAllocations()
+      throws InvalidAllocationTagsQueryException {
+    /**
+     * Construct both TEMP and normal containers: Node1: TEMP container_1_1
+     * (mapper/reducer/app_1) container_1_2 (service/app_1)
+     *
+     * Node2: container_1_3 (reducer/app_1) TEMP container_2_1 (service/app_2)
+     */
+
+    AllocationTagsManager atm = new AllocationTagsManager(rmContext);
+
+    // 3 Containers from app1
+    atm.addTempContainer(NodeId.fromString("host1:123"),
+        TestUtils.getMockApplicationId(1),
+        ImmutableSet.of("mapper", "reducer"));
+
+    atm.addContainer(NodeId.fromString("host1:123"),
+        TestUtils.getMockContainerId(1, 2), ImmutableSet.of("service"));
+
+    atm.addContainer(NodeId.fromString("host2:123"),
+        TestUtils.getMockContainerId(1, 3), ImmutableSet.of("reducer"));
+
+    // 1 Container from app2
+    atm.addTempContainer(NodeId.fromString("host2:123"),
+        TestUtils.getMockApplicationId(2), ImmutableSet.of("service"));
+
+    // Expect tag mappings to be present including temp Tags
+    Assert.assertEquals(1,
+        atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
+            TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
+            Long::sum));
+
+    Assert.assertEquals(1,
+        atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
+            TestUtils.getMockApplicationId(1), ImmutableSet.of("service"),
+            Long::sum));
+
+    Assert.assertEquals(1,
+        atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
+            TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
+            Long::sum));
+
+    // Do a temp Tag cleanup on app2
+    atm.cleanTempContainers(TestUtils.getMockApplicationId(2));
+    Assert.assertEquals(0,
+        atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
+            TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
+            Long::sum));
+    // Expect app1 to be unaffected
+    Assert.assertEquals(1,
+        atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
+            TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
+            Long::sum));
+    // Do a cleanup on app1 as well
+    atm.cleanTempContainers(TestUtils.getMockApplicationId(1));
+    Assert.assertEquals(0,
+        atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
+            TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
+            Long::sum));
+
+    // Non temp-tags should be unaffected
+    Assert.assertEquals(1,
+        atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
+            TestUtils.getMockApplicationId(1), ImmutableSet.of("service"),
+            Long::sum));
+
+    Assert.assertEquals(0,
+        atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
+            TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
+            Long::sum));
+
+    // Expect app2 with no containers, and app1 with 2 containers across 2 nodes
+    Assert.assertEquals(2,
+        atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(1))
+            .getTypeToTagsWithCount().size());
+
+    Assert.assertNull(
+        atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(2)));
+  }
+
+  @Test
   public void testQueryCardinalityWithIllegalParameters()
       throws InvalidAllocationTagsQueryException {
     /**
@@ -385,24 +454,21 @@ public class TestAllocationTagsManager {
 
     // Add a bunch of containers
     atm.addContainer(NodeId.fromString("host1:123"),
-        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+        TestUtils.getMockContainerId(1, 1),
         ImmutableSet.of("mapper", "reducer"));
 
     atm.addContainer(NodeId.fromString("host2:123"),
-        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
+        TestUtils.getMockContainerId(1, 2),
         ImmutableSet.of("mapper", "reducer"));
 
     atm.addContainer(NodeId.fromString("host1:123"),
-        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
-        ImmutableSet.of("service"));
+        TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service"));
 
     atm.addContainer(NodeId.fromString("host2:123"),
-        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
-        ImmutableSet.of("reducer"));
+        TestUtils.getMockContainerId(1, 4), ImmutableSet.of("reducer"));
 
     atm.addContainer(NodeId.fromString("host2:123"),
-        TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
-        ImmutableSet.of("service"));
+        TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service"));
 
     // No node-id
     boolean caughtException = false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59f0261a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestBatchedRequestsIterators.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestBatchedRequestsIterators.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestBatchedRequestsIterators.java
new file mode 100644
index 0000000..0e7b715
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestBatchedRequestsIterators.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.TestPlacementProcessor.schedulingRequest;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.BatchedRequests;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test Request Iterator.
+ */
+public class TestBatchedRequestsIterators {
+
+  @Test
+  public void testSerialIterator() throws Exception {
+    List<SchedulingRequest> schedulingRequestList =
+        Arrays.asList(schedulingRequest(1, 1, 1, 512, "foo"),
+            schedulingRequest(1, 2, 1, 512, "foo"),
+            schedulingRequest(1, 3, 1, 512, "foo"),
+            schedulingRequest(1, 4, 1, 512, "foo"));
+
+    BatchedRequests batchedRequests = new BatchedRequests(
+        BatchedRequests.IteratorType.SERIAL, null, schedulingRequestList, 1);
+
+    Iterator<SchedulingRequest> requestIterator = batchedRequests.iterator();
+    long prevAllocId = 0;
+    while (requestIterator.hasNext()) {
+      SchedulingRequest request = requestIterator.next();
+      Assert.assertTrue(request.getAllocationRequestId() > prevAllocId);
+      prevAllocId = request.getAllocationRequestId();
+    }
+  }
+
+  @Test
+  public void testPopularTagsIterator() throws Exception {
+    List<SchedulingRequest> schedulingRequestList =
+        Arrays.asList(schedulingRequest(1, 1, 1, 512, "pri", "foo"),
+            schedulingRequest(1, 2, 1, 512, "bar"),
+            schedulingRequest(1, 3, 1, 512, "foo", "pri"),
+            schedulingRequest(1, 4, 1, 512, "test"),
+            schedulingRequest(1, 5, 1, 512, "pri", "bar"));
+
+    BatchedRequests batchedRequests =
+        new BatchedRequests(BatchedRequests.IteratorType.POPULAR_TAGS, null,
+            schedulingRequestList, 1);
+
+    Iterator<SchedulingRequest> requestIterator = batchedRequests.iterator();
+    long recCcount = 0;
+    while (requestIterator.hasNext()) {
+      SchedulingRequest request = requestIterator.next();
+      if (recCcount < 3) {
+        Assert.assertTrue(request.getAllocationTags().contains("pri"));
+      } else {
+        Assert.assertTrue(request.getAllocationTags().contains("bar")
+            || request.getAllocationTags().contains("test"));
+      }
+      recCcount++;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59f0261a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
index db8ae15..87dd5b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
@@ -373,13 +373,13 @@ public class TestPlacementProcessor {
         rej.getReason());
   }
 
-  private static SchedulingRequest schedulingRequest(
+  protected static SchedulingRequest schedulingRequest(
       int priority, long allocReqId, int cores, int mem, String... tags) {
     return schedulingRequest(priority, allocReqId, cores, mem,
         ExecutionType.GUARANTEED, tags);
   }
 
-  private static SchedulingRequest schedulingRequest(
+  protected static SchedulingRequest schedulingRequest(
       int priority, long allocReqId, int cores, int mem,
       ExecutionType execType, String... tags) {
     return SchedulingRequest.newBuilder()


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org