You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/01/23 09:56:12 UTC

[42/50] [abbrv] hadoop git commit: YARN-7522. Introduce AllocationTagsManager to associate allocation tags to nodes. (Wangda Tan via asuresh)

YARN-7522. Introduce AllocationTagsManager to associate allocation tags to nodes. (Wangda Tan via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e205cab1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e205cab1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e205cab1

Branch: refs/heads/YARN-6592
Commit: e205cab124ef08dabbfeb010f8232f002192f185
Parents: 8e3ac22
Author: Arun Suresh <as...@apache.org>
Authored: Fri Dec 8 00:24:00 2017 -0800
Committer: Sunil G <su...@apache.org>
Committed: Tue Jan 23 15:20:23 2018 +0530

----------------------------------------------------------------------
 .../resourcemanager/RMActiveServiceContext.java |  15 +
 .../yarn/server/resourcemanager/RMContext.java  |   5 +
 .../server/resourcemanager/RMContextImpl.java   |  12 +
 .../server/resourcemanager/ResourceManager.java |   9 +
 .../constraint/AllocationTagsManager.java       | 431 +++++++++++++++++++
 .../constraint/AllocationTagsNamespaces.java    |  31 ++
 .../InvalidAllocationTagsQueryException.java    |  35 ++
 .../rmcontainer/RMContainer.java                |   8 +
 .../rmcontainer/RMContainerImpl.java            |  21 +
 .../constraint/TestAllocationTagsManager.java   | 328 ++++++++++++++
 .../rmcontainer/TestRMContainerImpl.java        | 124 ++++++
 .../scheduler/capacity/TestUtils.java           |   9 +
 .../scheduler/fifo/TestFifoScheduler.java       |   5 +
 13 files changed, 1033 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e205cab1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index 9dc5945..6ee3a4c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
+import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@@ -107,6 +108,7 @@ public class RMActiveServiceContext {
 
   private RMAppLifetimeMonitor rmAppLifetimeMonitor;
   private QueueLimitCalculator queueLimitCalculator;
+  private AllocationTagsManager allocationTagsManager;
 
   public RMActiveServiceContext() {
     queuePlacementManager = new PlacementManager();
@@ -398,6 +400,19 @@ public class RMActiveServiceContext {
 
   @Private
   @Unstable
+  public AllocationTagsManager getAllocationTagsManager() {
+    return allocationTagsManager;
+  }
+
+  @Private
+  @Unstable
+  public void setAllocationTagsManager(
+      AllocationTagsManager allocationTagsManager) {
+    this.allocationTagsManager = allocationTagsManager;
+  }
+
+  @Private
+  @Unstable
   public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() {
     return rmDelegatedNodeLabelsUpdater;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e205cab1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index ec94030..62899d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
+import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@@ -166,4 +167,8 @@ public interface RMContext extends ApplicationMasterServiceContext {
   void setResourceProfilesManager(ResourceProfilesManager mgr);
 
   String getAppProxyUrl(Configuration conf, ApplicationId applicationId);
+
+  AllocationTagsManager getAllocationTagsManager();
+
+  void setAllocationTagsManager(AllocationTagsManager allocationTagsManager);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e205cab1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 80a9109..315fdc1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
+import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@@ -504,6 +505,17 @@ public class RMContextImpl implements RMContext {
   }
 
   @Override
+  public AllocationTagsManager getAllocationTagsManager() {
+    return activeServiceContext.getAllocationTagsManager();
+  }
+
+  @Override
+  public void setAllocationTagsManager(
+      AllocationTagsManager allocationTagsManager) {
+    activeServiceContext.setAllocationTagsManager(allocationTagsManager);
+  }
+
+  @Override
   public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() {
     return activeServiceContext.getRMDelegatedNodeLabelsUpdater();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e205cab1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 8641842..8e289ea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Pu
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.CombinedSystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@@ -491,6 +492,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
       throws InstantiationException, IllegalAccessException {
     return new RMNodeLabelsManager();
   }
+
+  protected AllocationTagsManager createAllocationTagsManager() {
+    return new AllocationTagsManager();
+  }
   
   protected DelegationTokenRenewer createDelegationTokenRenewer() {
     return new DelegationTokenRenewer();
@@ -617,6 +622,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
       addService(nlm);
       rmContext.setNodeLabelManager(nlm);
 
+      AllocationTagsManager allocationTagsManager =
+          createAllocationTagsManager();
+      rmContext.setAllocationTagsManager(allocationTagsManager);
+
       RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater =
           createRMDelegatedNodeLabelsUpdater();
       if (delegatedNodeLabelsUpdater != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e205cab1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java
new file mode 100644
index 0000000..b67fab9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java
@@ -0,0 +1,431 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.constraint;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.LongBinaryOperator;
+
+/**
+ * Support storing maps between container-tags/applications and
+ * nodes. This will be required by affinity/anti-affinity implementation and
+ * cardinality.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class AllocationTagsManager {
+
+  private static final Logger LOG = Logger.getLogger(
+      AllocationTagsManager.class);
+
+  private ReentrantReadWriteLock.ReadLock readLock;
+  private ReentrantReadWriteLock.WriteLock writeLock;
+
+  // Application's tags to node
+  private Map<ApplicationId, NodeToCountedTags> perAppMappings =
+      new HashMap<>();
+
+  // Global tags to node mapping (used to fast return aggregated tags
+  // cardinality across apps)
+  private NodeToCountedTags globalMapping = new NodeToCountedTags();
+
+  /**
+   * Store node to counted tags.
+   */
+  @VisibleForTesting
+  static class NodeToCountedTags {
+    // Map<NodeId, Map<Tag, Count>>
+    private Map<NodeId, Map<String, Long>> nodeToTagsWithCount =
+        new HashMap<>();
+
+    // protected by external locks
+    private void addTagsToNode(NodeId nodeId, Set<String> tags) {
+      Map<String, Long> innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId,
+          k -> new HashMap<>());
+
+      for (String tag : tags) {
+        Long count = innerMap.get(tag);
+        if (count == null) {
+          innerMap.put(tag, 1L);
+        } else{
+          innerMap.put(tag, count + 1);
+        }
+      }
+    }
+
+    // protected by external locks
+    private void addTagToNode(NodeId nodeId, String tag) {
+      Map<String, Long> innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId,
+          k -> new HashMap<>());
+
+      Long count = innerMap.get(tag);
+      if (count == null) {
+        innerMap.put(tag, 1L);
+      } else{
+        innerMap.put(tag, count + 1);
+      }
+    }
+
+    private void removeTagFromInnerMap(Map<String, Long> innerMap, String tag) {
+      Long count = innerMap.get(tag);
+      if (count > 1) {
+        innerMap.put(tag, count - 1);
+      } else {
+        if (count <= 0) {
+          LOG.warn(
+              "Trying to remove tags from node, however the count already"
+                  + " becomes 0 or less, it could be a potential bug.");
+        }
+        innerMap.remove(tag);
+      }
+    }
+
+    private void removeTagsFromNode(NodeId nodeId, Set<String> tags) {
+      Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
+      if (innerMap == null) {
+        LOG.warn("Failed to find node=" + nodeId
+            + " while trying to remove tags, please double check.");
+        return;
+      }
+
+      for (String tag : tags) {
+        removeTagFromInnerMap(innerMap, tag);
+      }
+
+      if (innerMap.isEmpty()) {
+        nodeToTagsWithCount.remove(nodeId);
+      }
+    }
+
+    private void removeTagFromNode(NodeId nodeId, String tag) {
+      Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
+      if (innerMap == null) {
+        LOG.warn("Failed to find node=" + nodeId
+            + " while trying to remove tags, please double check.");
+        return;
+      }
+
+      removeTagFromInnerMap(innerMap, tag);
+
+      if (innerMap.isEmpty()) {
+        nodeToTagsWithCount.remove(nodeId);
+      }
+    }
+
+    private long getCardinality(NodeId nodeId, String tag) {
+      Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
+      if (innerMap == null) {
+        return 0;
+      }
+      Long value = innerMap.get(tag);
+      return value == null ? 0 : value;
+    }
+
+    private long getCardinality(NodeId nodeId, Set<String> tags,
+        LongBinaryOperator op) {
+      Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
+      if (innerMap == null) {
+        return 0;
+      }
+
+      long returnValue = 0;
+      boolean firstTag = true;
+
+      if (tags != null && !tags.isEmpty()) {
+        for (String tag : tags) {
+          Long value = innerMap.get(tag);
+          if (value == null) {
+            value = 0L;
+          }
+
+          if (firstTag) {
+            returnValue = value;
+            firstTag = false;
+            continue;
+          }
+
+          returnValue = op.applyAsLong(returnValue, value);
+        }
+      } else {
+        // Similar to above if, but only iterate values for better performance
+        for (long value : innerMap.values()) {
+          // For the first value, we will not apply op
+          if (firstTag) {
+            returnValue = value;
+            firstTag = false;
+            continue;
+          }
+          returnValue = op.applyAsLong(returnValue, value);
+        }
+      }
+      return returnValue;
+    }
+
+    private boolean isEmpty() {
+      return nodeToTagsWithCount.isEmpty();
+    }
+
+    @VisibleForTesting
+    public Map<NodeId, Map<String, Long>> getNodeToTagsWithCount() {
+      return nodeToTagsWithCount;
+    }
+  }
+
+  @VisibleForTesting
+  Map<ApplicationId, NodeToCountedTags> getPerAppMappings() {
+    return perAppMappings;
+  }
+
+  @VisibleForTesting
+  NodeToCountedTags getGlobalMapping() {
+    return globalMapping;
+  }
+
+  public AllocationTagsManager() {
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    readLock = lock.readLock();
+    writeLock = lock.writeLock();
+  }
+
+  /**
+   * Notify container allocated on a node.
+   *
+   * @param nodeId         allocated node.
+   * @param applicationId  applicationId
+   * @param containerId    container id.
+   * @param allocationTags allocation tags, see
+   *                       {@link SchedulingRequest#getAllocationTags()}
+   *                       application_id will be added to allocationTags.
+   */
+  public void addContainer(NodeId nodeId, ApplicationId applicationId,
+      ContainerId containerId, Set<String> allocationTags) {
+    String applicationIdTag =
+        AllocationTagsNamespaces.APP_ID + applicationId.toString();
+
+    boolean useSet = false;
+    if (allocationTags != null && !allocationTags.isEmpty()) {
+      // Copy before edit it.
+      allocationTags = new HashSet<>(allocationTags);
+      allocationTags.add(applicationIdTag);
+      useSet = true;
+    }
+
+    writeLock.lock();
+    try {
+      NodeToCountedTags perAppTagsMapping = perAppMappings.computeIfAbsent(
+          applicationId, k -> new NodeToCountedTags());
+
+      if (useSet) {
+        perAppTagsMapping.addTagsToNode(nodeId, allocationTags);
+        globalMapping.addTagsToNode(nodeId, allocationTags);
+      } else {
+        perAppTagsMapping.addTagToNode(nodeId, applicationIdTag);
+        globalMapping.addTagToNode(nodeId, applicationIdTag);
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Added container=" + containerId + " with tags=[" + StringUtils
+                .join(allocationTags, ",") + "]");
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Notify container removed.
+   *
+   * @param nodeId         nodeId
+   * @param applicationId  applicationId
+   * @param containerId    containerId.
+   * @param allocationTags allocation tags for given container
+   */
+  public void removeContainer(NodeId nodeId, ApplicationId applicationId,
+      ContainerId containerId, Set<String> allocationTags) {
+    String applicationIdTag =
+        AllocationTagsNamespaces.APP_ID + applicationId.toString();
+    boolean useSet = false;
+
+    if (allocationTags != null && !allocationTags.isEmpty()) {
+      // Copy before edit it.
+      allocationTags = new HashSet<>(allocationTags);
+      allocationTags.add(applicationIdTag);
+      useSet = true;
+    }
+
+    writeLock.lock();
+    try {
+      NodeToCountedTags perAppTagsMapping = perAppMappings.get(applicationId);
+      if (perAppTagsMapping == null) {
+        return;
+      }
+
+      if (useSet) {
+        perAppTagsMapping.removeTagsFromNode(nodeId, allocationTags);
+        globalMapping.removeTagsFromNode(nodeId, allocationTags);
+      } else {
+        perAppTagsMapping.removeTagFromNode(nodeId, applicationIdTag);
+        globalMapping.removeTagFromNode(nodeId, applicationIdTag);
+      }
+
+      if (perAppTagsMapping.isEmpty()) {
+        perAppMappings.remove(applicationId);
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Removed container=" + containerId + " with tags=[" + StringUtils
+                .join(allocationTags, ",") + "]");
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Get cardinality for following conditions. External can pass-in a binary op
+   * to implement customized logic.   *
+   * @param nodeId        nodeId, required.
+   * @param applicationId applicationId. When null is specified, return
+   *                      aggregated cardinality among all nodes.
+   * @param tag           allocation tag, see
+   *                      {@link SchedulingRequest#getAllocationTags()},
+   *                      When multiple tags specified. Returns cardinality
+   *                      depends on op. If a specified tag doesn't exist,
+   *                      0 will be its cardinality.
+   *                      When null/empty tags specified, all tags
+   *                      (of the node/app) will be considered.
+   * @return cardinality of specified query on the node.
+   * @throws InvalidAllocationTagsQueryException when illegal query
+   *                                            parameter specified
+   */
+  public long getNodeCardinality(NodeId nodeId, ApplicationId applicationId,
+      String tag) throws InvalidAllocationTagsQueryException {
+    readLock.lock();
+
+    try {
+      if (nodeId == null) {
+        throw new InvalidAllocationTagsQueryException(
+            "Must specify nodeId/tags/op to query cardinality");
+      }
+
+      NodeToCountedTags mapping;
+      if (applicationId != null) {
+        mapping = perAppMappings.get(applicationId);
+      } else{
+        mapping = globalMapping;
+      }
+
+      if (mapping == null) {
+        return 0;
+      }
+
+      return mapping.getCardinality(nodeId, tag);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Check if given tag exists on node.
+   *
+   * @param nodeId        nodeId, required.
+   * @param applicationId applicationId. When null is specified, return
+   *                      aggregated cardinality among all nodes.
+   * @param tag           allocation tag, see
+   *                      {@link SchedulingRequest#getAllocationTags()},
+   *                      When multiple tags specified. Returns cardinality
+   *                      depends on op. If a specified tag doesn't exist,
+   *                      0 will be its cardinality.
+   *                      When null/empty tags specified, all tags
+   *                      (of the node/app) will be considered.
+   * @return cardinality of specified query on the node.
+   * @throws InvalidAllocationTagsQueryException when illegal query
+   *                                            parameter specified
+   */
+  public boolean allocationTagExistsOnNode(NodeId nodeId,
+      ApplicationId applicationId, String tag)
+      throws InvalidAllocationTagsQueryException {
+    return getNodeCardinality(nodeId, applicationId, tag) > 0;
+  }
+
+  /**
+   * Get cardinality for following conditions. External can pass-in a binary op
+   * to implement customized logic.
+   *
+   * @param nodeId        nodeId, required.
+   * @param applicationId applicationId. When null is specified, return
+   *                      aggregated cardinality among all nodes.
+   * @param tags          allocation tags, see
+   *                      {@link SchedulingRequest#getAllocationTags()},
+   *                      When multiple tags specified. Returns cardinality
+   *                      depends on op. If a specified tag doesn't exist, 0
+   *                      will be its cardinality. When null/empty tags
+   *                      specified, all tags (of the node/app) will be
+   *                      considered.
+   * @param op            operator. Such as Long::max, Long::sum, etc. Required.
+   *                      This sparameter only take effect when #values >= 2.
+   * @return cardinality of specified query on the node.
+   * @throws InvalidAllocationTagsQueryException when illegal query
+   *                                            parameter specified
+   */
+  public long getNodeCardinalityByOp(NodeId nodeId, ApplicationId applicationId,
+      Set<String> tags, LongBinaryOperator op)
+      throws InvalidAllocationTagsQueryException {
+    readLock.lock();
+
+    try {
+      if (nodeId == null || op == null) {
+        throw new InvalidAllocationTagsQueryException(
+            "Must specify nodeId/tags/op to query cardinality");
+      }
+
+      NodeToCountedTags mapping;
+      if (applicationId != null) {
+        mapping = perAppMappings.get(applicationId);
+      } else{
+        mapping = globalMapping;
+      }
+
+      if (mapping == null) {
+        return 0;
+      }
+
+      return mapping.getCardinality(nodeId, tags, op);
+    } finally {
+      readLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e205cab1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java
new file mode 100644
index 0000000..893ff1c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java
@@ -0,0 +1,31 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.constraint;
+
+/**
+ * Predefined namespaces for tags
+ *
+ * Same as namespace  of resource types. Namespaces of placement tags are start
+ * with alphabets and ended with "/"
+ */
+public class AllocationTagsNamespaces {
+  public static final String APP_ID = "yarn_app_id/";
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e205cab1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/InvalidAllocationTagsQueryException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/InvalidAllocationTagsQueryException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/InvalidAllocationTagsQueryException.java
new file mode 100644
index 0000000..5519e39
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/InvalidAllocationTagsQueryException.java
@@ -0,0 +1,35 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.constraint;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Exception when invalid parameter specified to do placement tags related
+ * queries.
+ */
+public class InvalidAllocationTagsQueryException extends YarnException {
+  private static final long serialVersionUID = 12312831974894L;
+
+  public InvalidAllocationTagsQueryException(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e205cab1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
index f3cbf63..8f751b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -30,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@@ -115,4 +117,10 @@ public interface RMContainer extends EventHandler<RMContainerEvent>,
   boolean completed();
 
   NodeId getNodeId();
+
+  /**
+   * Return {@link SchedulingRequest#getAllocationTags()} specified by AM.
+   * @return allocation tags, could be null/empty
+   */
+  Set<String> getAllocationTags();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e205cab1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index e26689e..184cdfc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -189,6 +190,9 @@ public class RMContainerImpl implements RMContainer {
   private boolean isExternallyAllocated;
   private SchedulerRequestKey allocatedSchedulerKey;
 
+  // TODO, set it when container allocated by scheduler (From SchedulingRequest)
+  private Set<String> allocationTags = null;
+
   public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey,
       ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
       RMContext rmContext) {
@@ -501,6 +505,11 @@ public class RMContainerImpl implements RMContainer {
     return nodeId;
   }
 
+  @Override
+  public Set<String> getAllocationTags() {
+    return allocationTags;
+  }
+
   private static class BaseTransition implements
       SingleArcTransition<RMContainerImpl, RMContainerEvent> {
 
@@ -565,6 +574,12 @@ public class RMContainerImpl implements RMContainer {
 
     @Override
     public void transition(RMContainerImpl container, RMContainerEvent event) {
+      // Notify placementManager
+      container.rmContext.getAllocationTagsManager().addContainer(
+          container.getNodeId(),
+          container.getApplicationAttemptId().getApplicationId(),
+          container.getContainerId(), container.getAllocationTags());
+
       container.eventHandler.handle(new RMAppAttemptEvent(
           container.appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED));
     }
@@ -676,6 +691,12 @@ public class RMContainerImpl implements RMContainer {
 
     @Override
     public void transition(RMContainerImpl container, RMContainerEvent event) {
+      // Notify placementManager
+      container.rmContext.getAllocationTagsManager().removeContainer(
+          container.getNodeId(),
+          container.getApplicationAttemptId().getApplicationId(),
+          container.getContainerId(), container.getAllocationTags());
+
       RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event;
 
       container.finishTime = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e205cab1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/TestAllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/TestAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/TestAllocationTagsManager.java
new file mode 100644
index 0000000..0358792
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/TestAllocationTagsManager.java
@@ -0,0 +1,328 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.constraint;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test functionality of AllocationTagsManager.
+ */
+public class TestAllocationTagsManager {
+  @Test
+  public void testAllocationTagsManagerSimpleCases()
+      throws InvalidAllocationTagsQueryException {
+    AllocationTagsManager atm = new AllocationTagsManager();
+
+    /**
+     * Construct test case:
+     * Node1:
+     *    container_1_1 (mapper/reducer/app_1)
+     *    container_1_3 (service/app_1)
+     *
+     * Node2:
+     *    container_1_2 (mapper/reducer/app_1)
+     *    container_1_4 (reducer/app_1)
+     *    container_2_1 (service/app_2)
+     */
+
+    // 3 Containers from app1
+    atm.addContainer(NodeId.fromString("node1:1234"),
+        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+        ImmutableSet.of("mapper", "reducer"));
+
+    atm.addContainer(NodeId.fromString("node2:1234"),
+        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
+        ImmutableSet.of("mapper", "reducer"));
+
+    atm.addContainer(NodeId.fromString("node1:1234"),
+        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
+        ImmutableSet.of("service"));
+
+    atm.addContainer(NodeId.fromString("node2:1234"),
+        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
+        ImmutableSet.of("reducer"));
+
+    // 1 Container from app2
+    atm.addContainer(NodeId.fromString("node2:1234"),
+        TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
+        ImmutableSet.of("service"));
+
+    // Get Cardinality of app1 on node1, with tag "mapper"
+    Assert.assertEquals(1,
+        atm.getNodeCardinalityByOp(NodeId.fromString("node1:1234"),
+            TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
+            Long::max));
+
+    // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=min
+    Assert.assertEquals(1,
+        atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+            TestUtils.getMockApplicationId(1),
+            ImmutableSet.of("mapper", "reducer"), Long::min));
+
+    // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=max
+    Assert.assertEquals(2,
+        atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+            TestUtils.getMockApplicationId(1),
+            ImmutableSet.of("mapper", "reducer"), Long::max));
+
+    // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=sum
+    Assert.assertEquals(3,
+        atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+            TestUtils.getMockApplicationId(1),
+            ImmutableSet.of("mapper", "reducer"), Long::sum));
+
+    // Get Cardinality by passing single tag.
+    Assert.assertEquals(1,
+        atm.getNodeCardinality(NodeId.fromString("node2:1234"),
+            TestUtils.getMockApplicationId(1), "mapper"));
+
+    Assert.assertEquals(2,
+        atm.getNodeCardinality(NodeId.fromString("node2:1234"),
+            TestUtils.getMockApplicationId(1), "reducer"));
+
+    // Get Cardinality of app1 on node2, with tag "no_existed/reducer", op=min
+    Assert.assertEquals(0,
+        atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+            TestUtils.getMockApplicationId(1),
+            ImmutableSet.of("no_existed", "reducer"), Long::min));
+
+    // Get Cardinality of app1 on node2, with tag "<applicationId>", op=max
+    // (Expect this returns #containers from app1 on node2)
+    Assert.assertEquals(2,
+        atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+            TestUtils.getMockApplicationId(1), ImmutableSet
+                .of(AllocationTagsNamespaces.APP_ID + TestUtils
+                    .getMockApplicationId(1).toString()), Long::max));
+
+    // Get Cardinality of app1 on node2, with empty tag set, op=max
+    Assert.assertEquals(2,
+        atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+            TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max));
+
+    // Get Cardinality of all apps on node2, with empty tag set, op=sum
+    Assert.assertEquals(7,
+        atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), null,
+            ImmutableSet.of(), Long::sum));
+
+    // Get Cardinality of app_1 on node2, with empty tag set, op=sum
+    Assert.assertEquals(5,
+        atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+            TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum));
+
+    // Get Cardinality of app_1 on node2, with empty tag set, op=sum
+    Assert.assertEquals(2,
+        atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+            TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum));
+
+    // Finish all containers:
+    atm.removeContainer(NodeId.fromString("node1:1234"),
+        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+        ImmutableSet.of("mapper", "reducer"));
+
+    atm.removeContainer(NodeId.fromString("node2:1234"),
+        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
+        ImmutableSet.of("mapper", "reducer"));
+
+    atm.removeContainer(NodeId.fromString("node1:1234"),
+        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
+        ImmutableSet.of("service"));
+
+    atm.removeContainer(NodeId.fromString("node2:1234"),
+        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
+        ImmutableSet.of("reducer"));
+
+    atm.removeContainer(NodeId.fromString("node2:1234"),
+        TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
+        ImmutableSet.of("service"));
+
+    // Expect all cardinality to be 0
+    // Get Cardinality of app1 on node1, with tag "mapper"
+    Assert.assertEquals(0,
+        atm.getNodeCardinalityByOp(NodeId.fromString("node1:1234"),
+            TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
+            Long::max));
+
+    // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=min
+    Assert.assertEquals(0,
+        atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+            TestUtils.getMockApplicationId(1),
+            ImmutableSet.of("mapper", "reducer"), Long::min));
+
+    // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=max
+    Assert.assertEquals(0,
+        atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+            TestUtils.getMockApplicationId(1),
+            ImmutableSet.of("mapper", "reducer"), Long::max));
+
+    // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=sum
+    Assert.assertEquals(0,
+        atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+            TestUtils.getMockApplicationId(1),
+            ImmutableSet.of("mapper", "reducer"), Long::sum));
+
+    // Get Cardinality of app1 on node2, with tag "<applicationId>", op=max
+    // (Expect this returns #containers from app1 on node2)
+    Assert.assertEquals(0,
+        atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+            TestUtils.getMockApplicationId(1),
+            ImmutableSet.of(TestUtils.getMockApplicationId(1).toString()),
+            Long::max));
+
+    Assert.assertEquals(0,
+        atm.getNodeCardinality(NodeId.fromString("node2:1234"),
+            TestUtils.getMockApplicationId(1),
+            TestUtils.getMockApplicationId(1).toString()));
+
+    // Get Cardinality of app1 on node2, with empty tag set, op=max
+    Assert.assertEquals(0,
+        atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+            TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max));
+
+    // Get Cardinality of all apps on node2, with empty tag set, op=sum
+    Assert.assertEquals(0,
+        atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), null,
+            ImmutableSet.of(), Long::sum));
+
+    // Get Cardinality of app_1 on node2, with empty tag set, op=sum
+    Assert.assertEquals(0,
+        atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+            TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum));
+
+    // Get Cardinality of app_1 on node2, with empty tag set, op=sum
+    Assert.assertEquals(0,
+        atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+            TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum));
+  }
+
+  @Test
+  public void testAllocationTagsManagerMemoryAfterCleanup()
+      throws InvalidAllocationTagsQueryException {
+    /**
+     * Make sure YARN cleans up all memory once container/app finishes.
+     */
+
+    AllocationTagsManager atm = new AllocationTagsManager();
+
+    // Add a bunch of containers
+    atm.addContainer(NodeId.fromString("node1:1234"),
+        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+        ImmutableSet.of("mapper", "reducer"));
+
+    atm.addContainer(NodeId.fromString("node2:1234"),
+        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
+        ImmutableSet.of("mapper", "reducer"));
+
+    atm.addContainer(NodeId.fromString("node1:1234"),
+        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
+        ImmutableSet.of("service"));
+
+    atm.addContainer(NodeId.fromString("node2:1234"),
+        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
+        ImmutableSet.of("reducer"));
+
+    atm.addContainer(NodeId.fromString("node2:1234"),
+        TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
+        ImmutableSet.of("service"));
+
+    // Remove all these containers
+    atm.removeContainer(NodeId.fromString("node1:1234"),
+        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+        ImmutableSet.of("mapper", "reducer"));
+
+    atm.removeContainer(NodeId.fromString("node2:1234"),
+        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
+        ImmutableSet.of("mapper", "reducer"));
+
+    atm.removeContainer(NodeId.fromString("node1:1234"),
+        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
+        ImmutableSet.of("service"));
+
+    atm.removeContainer(NodeId.fromString("node2:1234"),
+        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
+        ImmutableSet.of("reducer"));
+
+    atm.removeContainer(NodeId.fromString("node2:1234"),
+        TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
+        ImmutableSet.of("service"));
+
+    // Check internal data structure
+    Assert.assertEquals(0,
+        atm.getGlobalMapping().getNodeToTagsWithCount().size());
+    Assert.assertEquals(0, atm.getPerAppMappings().size());
+  }
+
+  @Test
+  public void testQueryCardinalityWithIllegalParameters()
+      throws InvalidAllocationTagsQueryException {
+    /**
+     * Make sure YARN cleans up all memory once container/app finishes.
+     */
+
+    AllocationTagsManager atm = new AllocationTagsManager();
+
+    // Add a bunch of containers
+    atm.addContainer(NodeId.fromString("node1:1234"),
+        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+        ImmutableSet.of("mapper", "reducer"));
+
+    atm.addContainer(NodeId.fromString("node2:1234"),
+        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
+        ImmutableSet.of("mapper", "reducer"));
+
+    atm.addContainer(NodeId.fromString("node1:1234"),
+        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
+        ImmutableSet.of("service"));
+
+    atm.addContainer(NodeId.fromString("node2:1234"),
+        TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
+        ImmutableSet.of("reducer"));
+
+    atm.addContainer(NodeId.fromString("node2:1234"),
+        TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
+        ImmutableSet.of("service"));
+
+    // No node-id
+    boolean caughtException = false;
+    try {
+      atm.getNodeCardinalityByOp(null, TestUtils.getMockApplicationId(2),
+          ImmutableSet.of("mapper"), Long::min);
+    } catch (InvalidAllocationTagsQueryException e) {
+      caughtException = true;
+    }
+    Assert.assertTrue("should fail because of nodeId specified",
+        caughtException);
+
+    // No op
+    caughtException = false;
+    try {
+      atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+          TestUtils.getMockApplicationId(2), ImmutableSet.of("mapper"), null);
+    } catch (InvalidAllocationTagsQueryException e) {
+      caughtException = true;
+    }
+    Assert.assertTrue("should fail because of nodeId specified",
+        caughtException);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e205cab1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
index 6c189b3..27ff311 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@@ -109,6 +110,8 @@ public class TestRMContainerImpl {
     when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
     when(rmContext.getRMApps()).thenReturn(rmApps);
     when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
+    AllocationTagsManager ptm = mock(AllocationTagsManager.class);
+    when(rmContext.getAllocationTagsManager()).thenReturn(ptm);
     YarnConfiguration conf = new YarnConfiguration();
     conf.setBoolean(
         YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
@@ -209,6 +212,8 @@ public class TestRMContainerImpl {
     when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
     when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
     when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
+    AllocationTagsManager ptm = mock(AllocationTagsManager.class);
+    when(rmContext.getAllocationTagsManager()).thenReturn(ptm);
 
     YarnConfiguration conf = new YarnConfiguration();
     conf.setBoolean(
@@ -367,4 +372,123 @@ public class TestRMContainerImpl {
     verify(publisher, times(1)).containerCreated(any(RMContainer.class), anyLong());
     verify(publisher, times(1)).containerFinished(any(RMContainer.class), anyLong());
   }
+
+  @Test
+  public void testContainerTransitionNotifyPlacementTagsManager()
+      throws Exception {
+    DrainDispatcher drainDispatcher = new DrainDispatcher();
+    EventHandler<RMAppAttemptEvent> appAttemptEventHandler = mock(
+        EventHandler.class);
+    EventHandler generic = mock(EventHandler.class);
+    drainDispatcher.register(RMAppAttemptEventType.class,
+        appAttemptEventHandler);
+    drainDispatcher.register(RMNodeEventType.class, generic);
+    drainDispatcher.init(new YarnConfiguration());
+    drainDispatcher.start();
+    NodeId nodeId = BuilderUtils.newNodeId("host", 3425);
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 1);
+    ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
+    ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class);
+
+    Resource resource = BuilderUtils.newResource(512, 1);
+    Priority priority = BuilderUtils.newPriority(5);
+
+    Container container = BuilderUtils.newContainer(containerId, nodeId,
+        "host:3465", resource, priority, null);
+    ConcurrentMap<ApplicationId, RMApp> rmApps =
+        spy(new ConcurrentHashMap<ApplicationId, RMApp>());
+    RMApp rmApp = mock(RMApp.class);
+    when(rmApp.getRMAppAttempt(Matchers.any())).thenReturn(null);
+    Mockito.doReturn(rmApp).when(rmApps).get(Matchers.any());
+
+    RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
+    SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
+    AllocationTagsManager tagsManager = new AllocationTagsManager();
+    RMContext rmContext = mock(RMContext.class);
+    when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
+    when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
+    when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
+    when(rmContext.getRMApps()).thenReturn(rmApps);
+    when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
+    when(rmContext.getAllocationTagsManager()).thenReturn(tagsManager);
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(
+        YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
+        true);
+    when(rmContext.getYarnConfiguration()).thenReturn(conf);
+
+    /* First container: ALLOCATED -> KILLED */
+    RMContainer rmContainer = new RMContainerImpl(container,
+        SchedulerRequestKey.extractFrom(container), appAttemptId,
+        nodeId, "user", rmContext);
+
+    Assert.assertEquals(0,
+        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+
+    rmContainer.handle(new RMContainerEvent(containerId,
+        RMContainerEventType.START));
+
+    Assert.assertEquals(1,
+        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+
+    rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus
+        .newInstance(containerId, ContainerState.COMPLETE, "", 0),
+        RMContainerEventType.KILL));
+
+    Assert.assertEquals(0,
+        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+
+    /* Second container: ACQUIRED -> FINISHED */
+    rmContainer = new RMContainerImpl(container,
+        SchedulerRequestKey.extractFrom(container), appAttemptId,
+        nodeId, "user", rmContext);
+
+    Assert.assertEquals(0,
+        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+
+    rmContainer.handle(new RMContainerEvent(containerId,
+        RMContainerEventType.START));
+
+    Assert.assertEquals(1,
+        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+
+    rmContainer.handle(
+        new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED));
+
+    rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus
+        .newInstance(containerId, ContainerState.COMPLETE, "", 0),
+        RMContainerEventType.FINISHED));
+
+    Assert.assertEquals(0,
+        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+
+    /* Third container: RUNNING -> FINISHED */
+    rmContainer = new RMContainerImpl(container,
+        SchedulerRequestKey.extractFrom(container), appAttemptId,
+        nodeId, "user", rmContext);
+
+    Assert.assertEquals(0,
+        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+
+    rmContainer.handle(new RMContainerEvent(containerId,
+        RMContainerEventType.START));
+
+    Assert.assertEquals(1,
+        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+
+    rmContainer.handle(
+        new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED));
+
+    rmContainer.handle(
+        new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED));
+
+    rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus
+        .newInstance(containerId, ContainerState.COMPLETE, "", 0),
+        RMContainerEventType.FINISHED));
+
+    Assert.assertEquals(0,
+        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e205cab1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index e3326c7..61a5555 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -135,6 +136,9 @@ public class TestUtils {
         new DefaultResourceCalculator());
     rmContext.setScheduler(mockScheduler);
 
+    AllocationTagsManager ptm = mock(AllocationTagsManager.class);
+    rmContext.setAllocationTagsManager(ptm);
+
     return rmContext;
   }
   
@@ -234,6 +238,11 @@ public class TestUtils {
     doReturn(id).when(containerId).getContainerId();
     return containerId;
   }
+
+  public static ContainerId getMockContainerId(int appId, int containerId) {
+    ApplicationAttemptId attemptId = getMockApplicationAttemptId(appId, 1);
+    return ContainerId.newContainerId(attemptId, containerId);
+  }
   
   public static Container getMockContainer(
       ContainerId containerId, NodeId nodeId, 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e205cab1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
index 3f97b59..4b902a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -234,6 +235,8 @@ public class TestFifoScheduler {
     FifoScheduler scheduler = new FifoScheduler();
     RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
         null, containerTokenSecretManager, nmTokenSecretManager, null, scheduler);
+    AllocationTagsManager ptm = mock(AllocationTagsManager.class);
+    rmContext.setAllocationTagsManager(ptm);
     rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
     rmContext.setRMApplicationHistoryWriter(
         mock(RMApplicationHistoryWriter.class));
@@ -312,12 +315,14 @@ public class TestFifoScheduler {
     FifoScheduler scheduler = new FifoScheduler();
     RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
         null, containerTokenSecretManager, nmTokenSecretManager, null, scheduler);
+    AllocationTagsManager ptm = mock(AllocationTagsManager.class);
     rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
     rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class));
     ((RMContextImpl) rmContext).setYarnConfiguration(new YarnConfiguration());
     NullRMNodeLabelsManager nlm = new NullRMNodeLabelsManager();
     nlm.init(new Configuration());
     rmContext.setNodeLabelManager(nlm);
+    rmContext.setAllocationTagsManager(ptm);
 
     scheduler.setRMContext(rmContext);
     ((RMContextImpl) rmContext).setScheduler(scheduler);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org