You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/25 05:19:35 UTC

[29/31] hadoop git commit: YARN-7788. Factor out management of temp tags from AllocationTagsManager. (Arun Suresh via kkaranasos)

YARN-7788. Factor out management of temp tags from AllocationTagsManager. (Arun Suresh via kkaranasos)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8055216f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8055216f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8055216f

Branch: refs/heads/YARN-6592
Commit: 8055216fccc9eb238855cc0a477571e222e87159
Parents: 05af8e7
Author: Konstantinos Karanasos <kk...@apache.org>
Authored: Mon Jan 22 23:51:02 2018 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Wed Jan 24 21:17:06 2018 -0800

----------------------------------------------------------------------
 .../constraint/AllocationTagsManager.java       | 110 +++---------
 .../algorithm/DefaultPlacementAlgorithm.java    |   8 +-
 .../algorithm/LocalAllocationTagsManager.java   | 167 +++++++++++++++++++
 .../constraint/TestAllocationTagsManager.java   |  82 ---------
 .../TestLocalAllocationTagsManager.java         | 139 +++++++++++++++
 5 files changed, 336 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8055216f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
index 962e548..7ad5e8c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
@@ -24,17 +24,14 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
-import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.log4j.Logger;
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -61,9 +58,6 @@ public class AllocationTagsManager {
   // Application's tags to Rack
   private Map<ApplicationId, TypeToCountedTags> perAppRackMappings =
       new HashMap<>();
-  // Application's Temporary containers mapping
-  private Map<ApplicationId, Map<NodeId, Map<ContainerId, Set<String>>>>
-      appTempMappings = new HashMap<>();
 
   // Global tags to node mapping (used to fast return aggregated tags
   // cardinality across apps)
@@ -76,7 +70,7 @@ public class AllocationTagsManager {
    * Currently used both for NodeId to Tag, Count and Rack to Tag, Count
    */
   @VisibleForTesting
-  static class TypeToCountedTags<T> {
+  public static class TypeToCountedTags<T> {
     // Map<Type, Map<Tag, Count>>
     private Map<T, Map<String, Long>> typeToTagsWithCount = new HashMap<>();
 
@@ -214,7 +208,7 @@ public class AllocationTagsManager {
   }
 
   @VisibleForTesting
-  Map<ApplicationId, TypeToCountedTags> getPerAppNodeMappings() {
+  public Map<ApplicationId, TypeToCountedTags> getPerAppNodeMappings() {
     return perAppNodeMappings;
   }
 
@@ -233,12 +227,6 @@ public class AllocationTagsManager {
     return globalRackMapping;
   }
 
-  @VisibleForTesting
-  public Map<NodeId, Map<ContainerId, Set<String>>> getAppTempMappings(
-      ApplicationId applicationId) {
-    return appTempMappings.get(applicationId);
-  }
-
   public AllocationTagsManager(RMContext context) {
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     readLock = lock.readLock();
@@ -246,39 +234,6 @@ public class AllocationTagsManager {
     rmContext = context;
   }
 
-  //
-
-  /**
-   * Method adds a temporary fake-container tag to Node mapping.
-   * Used by the constrained placement algorithm to keep track of containers
-   * that are currently placed on nodes but are not yet allocated.
-   * @param nodeId
-   * @param applicationId
-   * @param allocationTags
-   */
-  public void addTempContainer(NodeId nodeId, ApplicationId applicationId,
-      Set<String> allocationTags) {
-    ContainerId tmpContainer = ContainerId.newContainerId(
-        ApplicationAttemptId.newInstance(applicationId, 1), System.nanoTime());
-
-    writeLock.lock();
-    try {
-      Map<NodeId, Map<ContainerId, Set<String>>> appTempMapping =
-          appTempMappings.computeIfAbsent(applicationId, k -> new HashMap<>());
-      Map<ContainerId, Set<String>> containerTempMapping =
-          appTempMapping.computeIfAbsent(nodeId, k -> new HashMap<>());
-      containerTempMapping.put(tmpContainer, allocationTags);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Added TEMP container=" + tmpContainer + " with tags=["
-            + StringUtils.join(allocationTags, ",") + "]");
-      }
-    } finally {
-      writeLock.unlock();
-    }
-
-    addContainer(nodeId, tmpContainer, allocationTags);
-  }
-
   /**
    * Notify container allocated on a node.
    *
@@ -297,6 +252,15 @@ public class AllocationTagsManager {
     }
     ApplicationId applicationId =
         containerId.getApplicationAttemptId().getApplicationId();
+    addTags(nodeId, applicationId, allocationTags);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Added container=" + containerId + " with tags=["
+          + StringUtils.join(allocationTags, ",") + "]");
+    }
+  }
+
+  public void addTags(NodeId nodeId, ApplicationId applicationId,
+      Set<String> allocationTags) {
     writeLock.lock();
     try {
       TypeToCountedTags perAppTagsMapping = perAppNodeMappings
@@ -312,11 +276,6 @@ public class AllocationTagsManager {
       perAppRackTagsMapping.addTags(nodeRack, allocationTags);
       globalNodeMapping.addTags(nodeId, allocationTags);
       globalRackMapping.addTags(nodeRack, allocationTags);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Added container=" + containerId + " with tags=["
-            + StringUtils.join(allocationTags, ",") + "]");
-      }
     } finally {
       writeLock.unlock();
     }
@@ -339,6 +298,21 @@ public class AllocationTagsManager {
     ApplicationId applicationId =
         containerId.getApplicationAttemptId().getApplicationId();
 
+    removeTags(nodeId, applicationId, allocationTags);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removed container=" + containerId + " with tags=["
+          + StringUtils.join(allocationTags, ",") + "]");
+    }
+  }
+
+  /**
+   * Helper method to just remove the tags associated with a container.
+   * @param nodeId
+   * @param applicationId
+   * @param allocationTags
+   */
+  public void removeTags(NodeId nodeId, ApplicationId applicationId,
+      Set<String> allocationTags) {
     writeLock.lock();
     try {
       TypeToCountedTags perAppTagsMapping =
@@ -364,43 +338,11 @@ public class AllocationTagsManager {
       if (perAppRackTagsMapping.isEmpty()) {
         perAppRackMappings.remove(applicationId);
       }
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Removed container=" + containerId + " with tags=["
-            + StringUtils.join(allocationTags, ",") + "]");
-      }
     } finally {
       writeLock.unlock();
     }
   }
 
-  /**
-   * Method removes temporary containers associated with an application
-   * Used by the placement algorithm to clean temporary tags at the end of
-   * a placement cycle.
-   * @param applicationId Application Id.
-   */
-  public void cleanTempContainers(ApplicationId applicationId) {
-
-    if (!appTempMappings.get(applicationId).isEmpty()) {
-      appTempMappings.get(applicationId).entrySet().stream().forEach(nodeE -> {
-        nodeE.getValue().entrySet().stream().forEach(containerE -> {
-          removeContainer(nodeE.getKey(), containerE.getKey(),
-              containerE.getValue());
-        });
-      });
-      writeLock.lock();
-      try {
-        appTempMappings.remove(applicationId);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Removed TEMP containers of app=" + applicationId);
-        }
-      } finally {
-        writeLock.unlock();
-      }
-    }
-  }
-
 
   /**
    * Get Node cardinality for a specific tag.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8055216f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
index cf2ed15..9887749 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil;
@@ -53,13 +52,14 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
   // Number of times to re-attempt placing a single scheduling request.
   private static final int RE_ATTEMPT_COUNT = 2;
 
-  private AllocationTagsManager tagsManager;
+  private LocalAllocationTagsManager tagsManager;
   private PlacementConstraintManager constraintManager;
   private NodeCandidateSelector nodeSelector;
 
   @Override
   public void init(RMContext rmContext) {
-    this.tagsManager = rmContext.getAllocationTagsManager();
+    this.tagsManager = new LocalAllocationTagsManager(
+        rmContext.getAllocationTagsManager());
     this.constraintManager = rmContext.getPlacementConstraintManager();
     this.nodeSelector =
         filter -> ((AbstractYarnScheduler) (rmContext).getScheduler())
@@ -143,7 +143,7 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
             numAllocs =
                 schedulingRequest.getResourceSizing().getNumAllocations();
             // Add temp-container tags for current placement cycle
-            this.tagsManager.addTempContainer(node.getNodeID(),
+            this.tagsManager.addTempTags(node.getNodeID(),
                 requests.getApplicationId(),
                 schedulingRequest.getAllocationTags());
             lastSatisfiedNode = node;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8055216f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java
new file mode 100644
index 0000000..9472719
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.LongBinaryOperator;
+
+class LocalAllocationTagsManager extends AllocationTagsManager {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(LocalAllocationTagsManager.class);
+
+  private final AllocationTagsManager tagsManager;
+
+  // Application's Temporary containers mapping
+  private Map<ApplicationId, Map<NodeId, Map<String, AtomicInteger>>>
+      appTempMappings = new HashMap<>();
+
+  LocalAllocationTagsManager(
+      AllocationTagsManager allocationTagsManager) {
+    super(null);
+    this.tagsManager = allocationTagsManager;
+  }
+
+  void addTempTags(NodeId nodeId,
+      ApplicationId applicationId, Set<String> allocationTags) {
+    Map<NodeId, Map<String, AtomicInteger>> appTempMapping =
+        appTempMappings.computeIfAbsent(applicationId, k -> new HashMap<>());
+    Map<String, AtomicInteger> containerTempMapping =
+        appTempMapping.computeIfAbsent(nodeId, k -> new HashMap<>());
+    for (String tag : allocationTags) {
+      containerTempMapping.computeIfAbsent(tag,
+          k -> new AtomicInteger(0)).incrementAndGet();
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Added TEMP container with tags=["
+          + StringUtils.join(allocationTags, ",") + "]");
+    }
+    tagsManager.addTags(nodeId, applicationId, allocationTags);
+  }
+
+  void removeTempTags(NodeId nodeId, ApplicationId applicationId,
+      Set<String> allocationTags) {
+    Map<NodeId, Map<String, AtomicInteger>> appTempMapping =
+        appTempMappings.get(applicationId);
+    if (appTempMapping != null) {
+      Map<String, AtomicInteger> containerTempMap =
+          appTempMapping.get(nodeId);
+      if (containerTempMap != null) {
+        for (String tag : allocationTags) {
+          AtomicInteger count = containerTempMap.get(tag);
+          if (count != null) {
+            if (count.decrementAndGet() <= 0) {
+              containerTempMap.remove(tag);
+            }
+          }
+        }
+      }
+    }
+    if (allocationTags != null) {
+      removeTags(nodeId, applicationId, allocationTags);
+    }
+  }
+
+  /**
+   * Method removes temporary containers associated with an application
+   * Used by the placement algorithm to clean temporary tags at the end of
+   * a placement cycle.
+   * @param applicationId Application Id.
+   */
+  public void cleanTempContainers(ApplicationId applicationId) {
+
+    if (!appTempMappings.get(applicationId).isEmpty()) {
+      appTempMappings.get(applicationId).entrySet().stream().forEach(nodeE -> {
+        nodeE.getValue().entrySet().stream().forEach(tagE -> {
+          for (int i = 0; i < tagE.getValue().get(); i++) {
+            removeTags(nodeE.getKey(), applicationId,
+                Collections.singleton(tagE.getKey()));
+          }
+        });
+      });
+      appTempMappings.remove(applicationId);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Removed TEMP containers of app=" + applicationId);
+      }
+    }
+  }
+
+  @Override
+  public void addContainer(NodeId nodeId, ContainerId containerId,
+      Set<String> allocationTags) {
+    tagsManager.addContainer(nodeId, containerId, allocationTags);
+  }
+
+  @Override
+  public void removeContainer(NodeId nodeId, ContainerId containerId,
+      Set<String> allocationTags) {
+    tagsManager.removeContainer(nodeId, containerId, allocationTags);
+  }
+
+  @Override
+  public void removeTags(NodeId nodeId, ApplicationId applicationId,
+      Set<String> allocationTags) {
+    tagsManager.removeTags(nodeId, applicationId, allocationTags);
+  }
+
+  @Override
+  public long getNodeCardinality(NodeId nodeId, ApplicationId applicationId,
+      String tag) throws InvalidAllocationTagsQueryException {
+    return tagsManager.getNodeCardinality(nodeId, applicationId, tag);
+  }
+
+  @Override
+  public long getRackCardinality(String rack, ApplicationId applicationId,
+      String tag) throws InvalidAllocationTagsQueryException {
+    return tagsManager.getRackCardinality(rack, applicationId, tag);
+  }
+
+  @Override
+  public boolean allocationTagExistsOnNode(NodeId nodeId,
+      ApplicationId applicationId, String tag)
+      throws InvalidAllocationTagsQueryException {
+    return tagsManager.allocationTagExistsOnNode(nodeId, applicationId, tag);
+  }
+
+  @Override
+  public long getNodeCardinalityByOp(NodeId nodeId,
+      ApplicationId applicationId, Set<String> tags, LongBinaryOperator op)
+      throws InvalidAllocationTagsQueryException {
+    return tagsManager.getNodeCardinalityByOp(nodeId, applicationId, tags, op);
+  }
+
+  @Override
+  public long getRackCardinalityByOp(String rack, ApplicationId applicationId,
+      Set<String> tags, LongBinaryOperator op)
+      throws InvalidAllocationTagsQueryException {
+    return tagsManager.getRackCardinalityByOp(rack, applicationId, tags, op);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8055216f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
index 7afe4ef..76f451e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
@@ -23,7 +23,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
 import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -363,87 +362,6 @@ public class TestAllocationTagsManager {
   }
 
   @Test
-  public void testTempContainerAllocations()
-      throws InvalidAllocationTagsQueryException {
-    /**
-     * Construct both TEMP and normal containers: Node1: TEMP container_1_1
-     * (mapper/reducer/app_1) container_1_2 (service/app_1)
-     *
-     * Node2: container_1_3 (reducer/app_1) TEMP container_2_1 (service/app_2)
-     */
-
-    AllocationTagsManager atm = new AllocationTagsManager(rmContext);
-
-    // 3 Containers from app1
-    atm.addTempContainer(NodeId.fromString("host1:123"),
-        TestUtils.getMockApplicationId(1),
-        ImmutableSet.of("mapper", "reducer"));
-
-    atm.addContainer(NodeId.fromString("host1:123"),
-        TestUtils.getMockContainerId(1, 2), ImmutableSet.of("service"));
-
-    atm.addContainer(NodeId.fromString("host2:123"),
-        TestUtils.getMockContainerId(1, 3), ImmutableSet.of("reducer"));
-
-    // 1 Container from app2
-    atm.addTempContainer(NodeId.fromString("host2:123"),
-        TestUtils.getMockApplicationId(2), ImmutableSet.of("service"));
-
-    // Expect tag mappings to be present including temp Tags
-    Assert.assertEquals(1,
-        atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
-            TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
-            Long::sum));
-
-    Assert.assertEquals(1,
-        atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
-            TestUtils.getMockApplicationId(1), ImmutableSet.of("service"),
-            Long::sum));
-
-    Assert.assertEquals(1,
-        atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-            TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
-            Long::sum));
-
-    // Do a temp Tag cleanup on app2
-    atm.cleanTempContainers(TestUtils.getMockApplicationId(2));
-    Assert.assertEquals(0,
-        atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-            TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
-            Long::sum));
-    // Expect app1 to be unaffected
-    Assert.assertEquals(1,
-        atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
-            TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
-            Long::sum));
-    // Do a cleanup on app1 as well
-    atm.cleanTempContainers(TestUtils.getMockApplicationId(1));
-    Assert.assertEquals(0,
-        atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
-            TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
-            Long::sum));
-
-    // Non temp-tags should be unaffected
-    Assert.assertEquals(1,
-        atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
-            TestUtils.getMockApplicationId(1), ImmutableSet.of("service"),
-            Long::sum));
-
-    Assert.assertEquals(0,
-        atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-            TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
-            Long::sum));
-
-    // Expect app2 with no containers, and app1 with 2 containers across 2 nodes
-    Assert.assertEquals(2,
-        atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(1))
-            .getTypeToTagsWithCount().size());
-
-    Assert.assertNull(
-        atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(2)));
-  }
-
-  @Test
   public void testQueryCardinalityWithIllegalParameters()
       throws InvalidAllocationTagsQueryException {
     /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8055216f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/TestLocalAllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/TestLocalAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/TestLocalAllocationTagsManager.java
new file mode 100644
index 0000000..0b9657f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/TestLocalAllocationTagsManager.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Tests the LocalAllocationTagsManager.
+ */
+public class TestLocalAllocationTagsManager {
+
+  private RMContext rmContext;
+
+  @Before
+  public void setup() {
+    MockRM rm = new MockRM();
+    rm.start();
+    MockNodes.resetHostIds();
+    List<RMNode> rmNodes =
+        MockNodes.newNodes(2, 4, Resource.newInstance(4096, 4));
+    for (RMNode rmNode : rmNodes) {
+      rm.getRMContext().getRMNodes().putIfAbsent(rmNode.getNodeID(), rmNode);
+    }
+    rmContext = rm.getRMContext();
+  }
+
+  @Test
+  public void testTempContainerAllocations()
+      throws InvalidAllocationTagsQueryException {
+    /**
+     * Construct both TEMP and normal containers: Node1: TEMP container_1_1
+     * (mapper/reducer/app_1) container_1_2 (service/app_1)
+     *
+     * Node2: container_1_3 (reducer/app_1) TEMP container_2_1 (service/app_2)
+     */
+
+    AllocationTagsManager atm = new AllocationTagsManager(rmContext);
+    LocalAllocationTagsManager ephAtm =
+        new LocalAllocationTagsManager(atm);
+
+    // 3 Containers from app1
+    ephAtm.addTempTags(NodeId.fromString("host1:123"),
+        TestUtils.getMockApplicationId(1),
+        ImmutableSet.of("mapper", "reducer"));
+
+    atm.addContainer(NodeId.fromString("host1:123"),
+        TestUtils.getMockContainerId(1, 2), ImmutableSet.of("service"));
+
+    atm.addContainer(NodeId.fromString("host2:123"),
+        TestUtils.getMockContainerId(1, 3), ImmutableSet.of("reducer"));
+
+    // 1 Container from app2
+    ephAtm.addTempTags(NodeId.fromString("host2:123"),
+        TestUtils.getMockApplicationId(2), ImmutableSet.of("service"));
+
+    // Expect tag mappings to be present including temp Tags
+    Assert.assertEquals(1,
+        atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
+            TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
+            Long::sum));
+
+    Assert.assertEquals(1,
+        atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
+            TestUtils.getMockApplicationId(1), ImmutableSet.of("service"),
+            Long::sum));
+
+    Assert.assertEquals(1,
+        atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
+            TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
+            Long::sum));
+
+    // Do a temp Tag cleanup on app2
+    ephAtm.cleanTempContainers(TestUtils.getMockApplicationId(2));
+    Assert.assertEquals(0,
+        atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
+            TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
+            Long::sum));
+    // Expect app1 to be unaffected
+    Assert.assertEquals(1,
+        atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
+            TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
+            Long::sum));
+    // Do a cleanup on app1 as well
+    ephAtm.cleanTempContainers(TestUtils.getMockApplicationId(1));
+    Assert.assertEquals(0,
+        atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
+            TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
+            Long::sum));
+
+    // Non temp-tags should be unaffected
+    Assert.assertEquals(1,
+        atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
+            TestUtils.getMockApplicationId(1), ImmutableSet.of("service"),
+            Long::sum));
+
+    Assert.assertEquals(0,
+        atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
+            TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
+            Long::sum));
+
+    // Expect app2 with no containers, and app1 with 2 containers across 2 nodes
+    Assert.assertEquals(2,
+        atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(1))
+            .getTypeToTagsWithCount().size());
+
+    Assert.assertNull(
+        atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(2)));
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org