You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/25 05:19:35 UTC
[29/31] hadoop git commit: YARN-7788. Factor out management of temp
tags from AllocationTagsManager. (Arun Suresh via kkaranasos)
YARN-7788. Factor out management of temp tags from AllocationTagsManager. (Arun Suresh via kkaranasos)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8055216f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8055216f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8055216f
Branch: refs/heads/YARN-6592
Commit: 8055216fccc9eb238855cc0a477571e222e87159
Parents: 05af8e7
Author: Konstantinos Karanasos <kk...@apache.org>
Authored: Mon Jan 22 23:51:02 2018 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Wed Jan 24 21:17:06 2018 -0800
----------------------------------------------------------------------
.../constraint/AllocationTagsManager.java | 110 +++---------
.../algorithm/DefaultPlacementAlgorithm.java | 8 +-
.../algorithm/LocalAllocationTagsManager.java | 167 +++++++++++++++++++
.../constraint/TestAllocationTagsManager.java | 82 ---------
.../TestLocalAllocationTagsManager.java | 139 +++++++++++++++
5 files changed, 336 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8055216f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
index 962e548..7ad5e8c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
@@ -24,17 +24,14 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
-import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.log4j.Logger;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -61,9 +58,6 @@ public class AllocationTagsManager {
// Application's tags to Rack
private Map<ApplicationId, TypeToCountedTags> perAppRackMappings =
new HashMap<>();
- // Application's Temporary containers mapping
- private Map<ApplicationId, Map<NodeId, Map<ContainerId, Set<String>>>>
- appTempMappings = new HashMap<>();
// Global tags to node mapping (used to fast return aggregated tags
// cardinality across apps)
@@ -76,7 +70,7 @@ public class AllocationTagsManager {
* Currently used both for NodeId to Tag, Count and Rack to Tag, Count
*/
@VisibleForTesting
- static class TypeToCountedTags<T> {
+ public static class TypeToCountedTags<T> {
// Map<Type, Map<Tag, Count>>
private Map<T, Map<String, Long>> typeToTagsWithCount = new HashMap<>();
@@ -214,7 +208,7 @@ public class AllocationTagsManager {
}
@VisibleForTesting
- Map<ApplicationId, TypeToCountedTags> getPerAppNodeMappings() {
+ public Map<ApplicationId, TypeToCountedTags> getPerAppNodeMappings() {
return perAppNodeMappings;
}
@@ -233,12 +227,6 @@ public class AllocationTagsManager {
return globalRackMapping;
}
- @VisibleForTesting
- public Map<NodeId, Map<ContainerId, Set<String>>> getAppTempMappings(
- ApplicationId applicationId) {
- return appTempMappings.get(applicationId);
- }
-
public AllocationTagsManager(RMContext context) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
@@ -246,39 +234,6 @@ public class AllocationTagsManager {
rmContext = context;
}
- //
-
- /**
- * Method adds a temporary fake-container tag to Node mapping.
- * Used by the constrained placement algorithm to keep track of containers
- * that are currently placed on nodes but are not yet allocated.
- * @param nodeId
- * @param applicationId
- * @param allocationTags
- */
- public void addTempContainer(NodeId nodeId, ApplicationId applicationId,
- Set<String> allocationTags) {
- ContainerId tmpContainer = ContainerId.newContainerId(
- ApplicationAttemptId.newInstance(applicationId, 1), System.nanoTime());
-
- writeLock.lock();
- try {
- Map<NodeId, Map<ContainerId, Set<String>>> appTempMapping =
- appTempMappings.computeIfAbsent(applicationId, k -> new HashMap<>());
- Map<ContainerId, Set<String>> containerTempMapping =
- appTempMapping.computeIfAbsent(nodeId, k -> new HashMap<>());
- containerTempMapping.put(tmpContainer, allocationTags);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Added TEMP container=" + tmpContainer + " with tags=["
- + StringUtils.join(allocationTags, ",") + "]");
- }
- } finally {
- writeLock.unlock();
- }
-
- addContainer(nodeId, tmpContainer, allocationTags);
- }
-
/**
* Notify container allocated on a node.
*
@@ -297,6 +252,15 @@ public class AllocationTagsManager {
}
ApplicationId applicationId =
containerId.getApplicationAttemptId().getApplicationId();
+ addTags(nodeId, applicationId, allocationTags);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added container=" + containerId + " with tags=["
+ + StringUtils.join(allocationTags, ",") + "]");
+ }
+ }
+
+ public void addTags(NodeId nodeId, ApplicationId applicationId,
+ Set<String> allocationTags) {
writeLock.lock();
try {
TypeToCountedTags perAppTagsMapping = perAppNodeMappings
@@ -312,11 +276,6 @@ public class AllocationTagsManager {
perAppRackTagsMapping.addTags(nodeRack, allocationTags);
globalNodeMapping.addTags(nodeId, allocationTags);
globalRackMapping.addTags(nodeRack, allocationTags);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Added container=" + containerId + " with tags=["
- + StringUtils.join(allocationTags, ",") + "]");
- }
} finally {
writeLock.unlock();
}
@@ -339,6 +298,21 @@ public class AllocationTagsManager {
ApplicationId applicationId =
containerId.getApplicationAttemptId().getApplicationId();
+ removeTags(nodeId, applicationId, allocationTags);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removed container=" + containerId + " with tags=["
+ + StringUtils.join(allocationTags, ",") + "]");
+ }
+ }
+
+ /**
+ * Helper method to just remove the tags associated with a container.
+ * @param nodeId
+ * @param applicationId
+ * @param allocationTags
+ */
+ public void removeTags(NodeId nodeId, ApplicationId applicationId,
+ Set<String> allocationTags) {
writeLock.lock();
try {
TypeToCountedTags perAppTagsMapping =
@@ -364,43 +338,11 @@ public class AllocationTagsManager {
if (perAppRackTagsMapping.isEmpty()) {
perAppRackMappings.remove(applicationId);
}
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Removed container=" + containerId + " with tags=["
- + StringUtils.join(allocationTags, ",") + "]");
- }
} finally {
writeLock.unlock();
}
}
- /**
- * Method removes temporary containers associated with an application
- * Used by the placement algorithm to clean temporary tags at the end of
- * a placement cycle.
- * @param applicationId Application Id.
- */
- public void cleanTempContainers(ApplicationId applicationId) {
-
- if (!appTempMappings.get(applicationId).isEmpty()) {
- appTempMappings.get(applicationId).entrySet().stream().forEach(nodeE -> {
- nodeE.getValue().entrySet().stream().forEach(containerE -> {
- removeContainer(nodeE.getKey(), containerE.getKey(),
- containerE.getValue());
- });
- });
- writeLock.lock();
- try {
- appTempMappings.remove(applicationId);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Removed TEMP containers of app=" + applicationId);
- }
- } finally {
- writeLock.unlock();
- }
- }
- }
-
/**
* Get Node cardinality for a specific tag.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8055216f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
index cf2ed15..9887749 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil;
@@ -53,13 +52,14 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
// Number of times to re-attempt placing a single scheduling request.
private static final int RE_ATTEMPT_COUNT = 2;
- private AllocationTagsManager tagsManager;
+ private LocalAllocationTagsManager tagsManager;
private PlacementConstraintManager constraintManager;
private NodeCandidateSelector nodeSelector;
@Override
public void init(RMContext rmContext) {
- this.tagsManager = rmContext.getAllocationTagsManager();
+ this.tagsManager = new LocalAllocationTagsManager(
+ rmContext.getAllocationTagsManager());
this.constraintManager = rmContext.getPlacementConstraintManager();
this.nodeSelector =
filter -> ((AbstractYarnScheduler) (rmContext).getScheduler())
@@ -143,7 +143,7 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
numAllocs =
schedulingRequest.getResourceSizing().getNumAllocations();
// Add temp-container tags for current placement cycle
- this.tagsManager.addTempContainer(node.getNodeID(),
+ this.tagsManager.addTempTags(node.getNodeID(),
requests.getApplicationId(),
schedulingRequest.getAllocationTags());
lastSatisfiedNode = node;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8055216f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java
new file mode 100644
index 0000000..9472719
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.LongBinaryOperator;
+
+class LocalAllocationTagsManager extends AllocationTagsManager {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(LocalAllocationTagsManager.class);
+
+ private final AllocationTagsManager tagsManager;
+
+ // Application's Temporary containers mapping
+ private Map<ApplicationId, Map<NodeId, Map<String, AtomicInteger>>>
+ appTempMappings = new HashMap<>();
+
+ LocalAllocationTagsManager(
+ AllocationTagsManager allocationTagsManager) {
+ super(null);
+ this.tagsManager = allocationTagsManager;
+ }
+
+ void addTempTags(NodeId nodeId,
+ ApplicationId applicationId, Set<String> allocationTags) {
+ Map<NodeId, Map<String, AtomicInteger>> appTempMapping =
+ appTempMappings.computeIfAbsent(applicationId, k -> new HashMap<>());
+ Map<String, AtomicInteger> containerTempMapping =
+ appTempMapping.computeIfAbsent(nodeId, k -> new HashMap<>());
+ for (String tag : allocationTags) {
+ containerTempMapping.computeIfAbsent(tag,
+ k -> new AtomicInteger(0)).incrementAndGet();
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added TEMP container with tags=["
+ + StringUtils.join(allocationTags, ",") + "]");
+ }
+ tagsManager.addTags(nodeId, applicationId, allocationTags);
+ }
+
+ void removeTempTags(NodeId nodeId, ApplicationId applicationId,
+ Set<String> allocationTags) {
+ Map<NodeId, Map<String, AtomicInteger>> appTempMapping =
+ appTempMappings.get(applicationId);
+ if (appTempMapping != null) {
+ Map<String, AtomicInteger> containerTempMap =
+ appTempMapping.get(nodeId);
+ if (containerTempMap != null) {
+ for (String tag : allocationTags) {
+ AtomicInteger count = containerTempMap.get(tag);
+ if (count != null) {
+ if (count.decrementAndGet() <= 0) {
+ containerTempMap.remove(tag);
+ }
+ }
+ }
+ }
+ }
+ if (allocationTags != null) {
+ removeTags(nodeId, applicationId, allocationTags);
+ }
+ }
+
+ /**
+ * Method removes temporary containers associated with an application
+ * Used by the placement algorithm to clean temporary tags at the end of
+ * a placement cycle.
+ * @param applicationId Application Id.
+ */
+ public void cleanTempContainers(ApplicationId applicationId) {
+
+ if (!appTempMappings.get(applicationId).isEmpty()) {
+ appTempMappings.get(applicationId).entrySet().stream().forEach(nodeE -> {
+ nodeE.getValue().entrySet().stream().forEach(tagE -> {
+ for (int i = 0; i < tagE.getValue().get(); i++) {
+ removeTags(nodeE.getKey(), applicationId,
+ Collections.singleton(tagE.getKey()));
+ }
+ });
+ });
+ appTempMappings.remove(applicationId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removed TEMP containers of app=" + applicationId);
+ }
+ }
+ }
+
+ @Override
+ public void addContainer(NodeId nodeId, ContainerId containerId,
+ Set<String> allocationTags) {
+ tagsManager.addContainer(nodeId, containerId, allocationTags);
+ }
+
+ @Override
+ public void removeContainer(NodeId nodeId, ContainerId containerId,
+ Set<String> allocationTags) {
+ tagsManager.removeContainer(nodeId, containerId, allocationTags);
+ }
+
+ @Override
+ public void removeTags(NodeId nodeId, ApplicationId applicationId,
+ Set<String> allocationTags) {
+ tagsManager.removeTags(nodeId, applicationId, allocationTags);
+ }
+
+ @Override
+ public long getNodeCardinality(NodeId nodeId, ApplicationId applicationId,
+ String tag) throws InvalidAllocationTagsQueryException {
+ return tagsManager.getNodeCardinality(nodeId, applicationId, tag);
+ }
+
+ @Override
+ public long getRackCardinality(String rack, ApplicationId applicationId,
+ String tag) throws InvalidAllocationTagsQueryException {
+ return tagsManager.getRackCardinality(rack, applicationId, tag);
+ }
+
+ @Override
+ public boolean allocationTagExistsOnNode(NodeId nodeId,
+ ApplicationId applicationId, String tag)
+ throws InvalidAllocationTagsQueryException {
+ return tagsManager.allocationTagExistsOnNode(nodeId, applicationId, tag);
+ }
+
+ @Override
+ public long getNodeCardinalityByOp(NodeId nodeId,
+ ApplicationId applicationId, Set<String> tags, LongBinaryOperator op)
+ throws InvalidAllocationTagsQueryException {
+ return tagsManager.getNodeCardinalityByOp(nodeId, applicationId, tags, op);
+ }
+
+ @Override
+ public long getRackCardinalityByOp(String rack, ApplicationId applicationId,
+ Set<String> tags, LongBinaryOperator op)
+ throws InvalidAllocationTagsQueryException {
+ return tagsManager.getRackCardinalityByOp(rack, applicationId, tags, op);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8055216f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
index 7afe4ef..76f451e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
@@ -23,7 +23,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -363,87 +362,6 @@ public class TestAllocationTagsManager {
}
@Test
- public void testTempContainerAllocations()
- throws InvalidAllocationTagsQueryException {
- /**
- * Construct both TEMP and normal containers: Node1: TEMP container_1_1
- * (mapper/reducer/app_1) container_1_2 (service/app_1)
- *
- * Node2: container_1_3 (reducer/app_1) TEMP container_2_1 (service/app_2)
- */
-
- AllocationTagsManager atm = new AllocationTagsManager(rmContext);
-
- // 3 Containers from app1
- atm.addTempContainer(NodeId.fromString("host1:123"),
- TestUtils.getMockApplicationId(1),
- ImmutableSet.of("mapper", "reducer"));
-
- atm.addContainer(NodeId.fromString("host1:123"),
- TestUtils.getMockContainerId(1, 2), ImmutableSet.of("service"));
-
- atm.addContainer(NodeId.fromString("host2:123"),
- TestUtils.getMockContainerId(1, 3), ImmutableSet.of("reducer"));
-
- // 1 Container from app2
- atm.addTempContainer(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(2), ImmutableSet.of("service"));
-
- // Expect tag mappings to be present including temp Tags
- Assert.assertEquals(1,
- atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
- TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
- Long::sum));
-
- Assert.assertEquals(1,
- atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
- TestUtils.getMockApplicationId(1), ImmutableSet.of("service"),
- Long::sum));
-
- Assert.assertEquals(1,
- atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
- Long::sum));
-
- // Do a temp Tag cleanup on app2
- atm.cleanTempContainers(TestUtils.getMockApplicationId(2));
- Assert.assertEquals(0,
- atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
- Long::sum));
- // Expect app1 to be unaffected
- Assert.assertEquals(1,
- atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
- TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
- Long::sum));
- // Do a cleanup on app1 as well
- atm.cleanTempContainers(TestUtils.getMockApplicationId(1));
- Assert.assertEquals(0,
- atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
- TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
- Long::sum));
-
- // Non temp-tags should be unaffected
- Assert.assertEquals(1,
- atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
- TestUtils.getMockApplicationId(1), ImmutableSet.of("service"),
- Long::sum));
-
- Assert.assertEquals(0,
- atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
- Long::sum));
-
- // Expect app2 with no containers, and app1 with 2 containers across 2 nodes
- Assert.assertEquals(2,
- atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(1))
- .getTypeToTagsWithCount().size());
-
- Assert.assertNull(
- atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(2)));
- }
-
- @Test
public void testQueryCardinalityWithIllegalParameters()
throws InvalidAllocationTagsQueryException {
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8055216f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/TestLocalAllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/TestLocalAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/TestLocalAllocationTagsManager.java
new file mode 100644
index 0000000..0b9657f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/TestLocalAllocationTagsManager.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Tests the LocalAllocationTagsManager.
+ */
+public class TestLocalAllocationTagsManager {
+
+ private RMContext rmContext;
+
+ @Before
+ public void setup() {
+ MockRM rm = new MockRM();
+ rm.start();
+ MockNodes.resetHostIds();
+ List<RMNode> rmNodes =
+ MockNodes.newNodes(2, 4, Resource.newInstance(4096, 4));
+ for (RMNode rmNode : rmNodes) {
+ rm.getRMContext().getRMNodes().putIfAbsent(rmNode.getNodeID(), rmNode);
+ }
+ rmContext = rm.getRMContext();
+ }
+
+ @Test
+ public void testTempContainerAllocations()
+ throws InvalidAllocationTagsQueryException {
+ /**
+ * Construct both TEMP and normal containers: Node1: TEMP container_1_1
+ * (mapper/reducer/app_1) container_1_2 (service/app_1)
+ *
+ * Node2: container_1_3 (reducer/app_1) TEMP container_2_1 (service/app_2)
+ */
+
+ AllocationTagsManager atm = new AllocationTagsManager(rmContext);
+ LocalAllocationTagsManager ephAtm =
+ new LocalAllocationTagsManager(atm);
+
+ // 3 Containers from app1
+ ephAtm.addTempTags(NodeId.fromString("host1:123"),
+ TestUtils.getMockApplicationId(1),
+ ImmutableSet.of("mapper", "reducer"));
+
+ atm.addContainer(NodeId.fromString("host1:123"),
+ TestUtils.getMockContainerId(1, 2), ImmutableSet.of("service"));
+
+ atm.addContainer(NodeId.fromString("host2:123"),
+ TestUtils.getMockContainerId(1, 3), ImmutableSet.of("reducer"));
+
+ // 1 Container from app2
+ ephAtm.addTempTags(NodeId.fromString("host2:123"),
+ TestUtils.getMockApplicationId(2), ImmutableSet.of("service"));
+
+ // Expect tag mappings to be present including temp Tags
+ Assert.assertEquals(1,
+ atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
+ TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
+ Long::sum));
+
+ Assert.assertEquals(1,
+ atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
+ TestUtils.getMockApplicationId(1), ImmutableSet.of("service"),
+ Long::sum));
+
+ Assert.assertEquals(1,
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
+ TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
+ Long::sum));
+
+ // Do a temp Tag cleanup on app2
+ ephAtm.cleanTempContainers(TestUtils.getMockApplicationId(2));
+ Assert.assertEquals(0,
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
+ TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
+ Long::sum));
+ // Expect app1 to be unaffected
+ Assert.assertEquals(1,
+ atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
+ TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
+ Long::sum));
+ // Do a cleanup on app1 as well
+ ephAtm.cleanTempContainers(TestUtils.getMockApplicationId(1));
+ Assert.assertEquals(0,
+ atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
+ TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
+ Long::sum));
+
+ // Non temp-tags should be unaffected
+ Assert.assertEquals(1,
+ atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
+ TestUtils.getMockApplicationId(1), ImmutableSet.of("service"),
+ Long::sum));
+
+ Assert.assertEquals(0,
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
+ TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
+ Long::sum));
+
+ // Expect app2 with no containers, and app1 with 2 containers across 2 nodes
+ Assert.assertEquals(2,
+ atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(1))
+ .getTypeToTagsWithCount().size());
+
+ Assert.assertNull(
+ atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(2)));
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org