You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/24 22:48:27 UTC
[26/42] hadoop git commit: YARN-7653. Node group support for
AllocationTagsManager. (Panagiotis Garefalakis via asuresh)
YARN-7653. Node group support for AllocationTagsManager. (Panagiotis Garefalakis via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2b46f2de
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2b46f2de
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2b46f2de
Branch: refs/heads/YARN-6592
Commit: 2b46f2de698daa597b938ee9599b795b9d595485
Parents: 99aa947
Author: Arun Suresh <as...@apache.org>
Authored: Fri Dec 22 07:24:37 2017 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Wed Jan 24 14:20:34 2018 -0800
----------------------------------------------------------------------
.../server/resourcemanager/ResourceManager.java | 2 +-
.../constraint/AllocationTagsManager.java | 282 ++++++++++++++-----
.../rmcontainer/TestRMContainerImpl.java | 2 +-
.../constraint/TestAllocationTagsManager.java | 269 ++++++++++++------
4 files changed, 392 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b46f2de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index ab1560f..2126d4a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -496,7 +496,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
protected AllocationTagsManager createAllocationTagsManager() {
- return new AllocationTagsManager();
+ return new AllocationTagsManager(this.rmContext);
}
protected DelegationTokenRenewer createDelegationTokenRenewer() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b46f2de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
index c278606..7b0b959 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.log4j.Logger;
import java.util.HashMap;
@@ -38,9 +39,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.LongBinaryOperator;
/**
- * Support storing maps between container-tags/applications and
- * nodes. This will be required by affinity/anti-affinity implementation and
- * cardinality.
+ * In-memory mapping between applications/container-tags and nodes/racks.
+ * Required by constrained affinity/anti-affinity and cardinality placement.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@@ -51,48 +51,54 @@ public class AllocationTagsManager {
private ReentrantReadWriteLock.ReadLock readLock;
private ReentrantReadWriteLock.WriteLock writeLock;
+ private final RMContext rmContext;
- // Application's tags to node
- private Map<ApplicationId, NodeToCountedTags> perAppMappings =
+ // Application's tags to Node
+ private Map<ApplicationId, NodeToCountedTags> perAppNodeMappings =
+ new HashMap<>();
+ // Application's tags to Rack
+ private Map<ApplicationId, NodeToCountedTags> perAppRackMappings =
new HashMap<>();
// Global tags to node mapping (used to fast return aggregated tags
// cardinality across apps)
- private NodeToCountedTags globalMapping = new NodeToCountedTags();
+ private NodeToCountedTags<NodeId> globalNodeMapping = new NodeToCountedTags();
+ // Global tags to Rack mapping
+ private NodeToCountedTags<String> globalRackMapping = new NodeToCountedTags();
/**
- * Store node to counted tags.
+ * Generic store mapping type <T> to counted tags.
+ * Currently used both for NodeId to Tag, Count and Rack to Tag, Count
*/
@VisibleForTesting
- static class NodeToCountedTags {
- // Map<NodeId, Map<Tag, Count>>
- private Map<NodeId, Map<String, Long>> nodeToTagsWithCount =
- new HashMap<>();
+ static class NodeToCountedTags<T> {
+ // Map<Type, Map<Tag, Count>>
+ private Map<T, Map<String, Long>> typeToTagsWithCount = new HashMap<>();
// protected by external locks
- private void addTagsToNode(NodeId nodeId, Set<String> tags) {
- Map<String, Long> innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId,
- k -> new HashMap<>());
+ private void addTags(T type, Set<String> tags) {
+ Map<String, Long> innerMap =
+ typeToTagsWithCount.computeIfAbsent(type, k -> new HashMap<>());
for (String tag : tags) {
Long count = innerMap.get(tag);
if (count == null) {
innerMap.put(tag, 1L);
- } else{
+ } else {
innerMap.put(tag, count + 1);
}
}
}
// protected by external locks
- private void addTagToNode(NodeId nodeId, String tag) {
- Map<String, Long> innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId,
- k -> new HashMap<>());
+ private void addTag(T type, String tag) {
+ Map<String, Long> innerMap =
+ typeToTagsWithCount.computeIfAbsent(type, k -> new HashMap<>());
Long count = innerMap.get(tag);
if (count == null) {
innerMap.put(tag, 1L);
- } else{
+ } else {
innerMap.put(tag, count + 1);
}
}
@@ -104,17 +110,17 @@ public class AllocationTagsManager {
} else {
if (count <= 0) {
LOG.warn(
- "Trying to remove tags from node, however the count already"
+ "Trying to remove tags from node/rack, however the count already"
+ " becomes 0 or less, it could be a potential bug.");
}
innerMap.remove(tag);
}
}
- private void removeTagsFromNode(NodeId nodeId, Set<String> tags) {
- Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
+ private void removeTags(T type, Set<String> tags) {
+ Map<String, Long> innerMap = typeToTagsWithCount.get(type);
if (innerMap == null) {
- LOG.warn("Failed to find node=" + nodeId
+ LOG.warn("Failed to find node/rack=" + type
+ " while trying to remove tags, please double check.");
return;
}
@@ -124,14 +130,14 @@ public class AllocationTagsManager {
}
if (innerMap.isEmpty()) {
- nodeToTagsWithCount.remove(nodeId);
+ typeToTagsWithCount.remove(type);
}
}
- private void removeTagFromNode(NodeId nodeId, String tag) {
- Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
+ private void removeTag(T type, String tag) {
+ Map<String, Long> innerMap = typeToTagsWithCount.get(type);
if (innerMap == null) {
- LOG.warn("Failed to find node=" + nodeId
+ LOG.warn("Failed to find node/rack=" + type
+ " while trying to remove tags, please double check.");
return;
}
@@ -139,12 +145,12 @@ public class AllocationTagsManager {
removeTagFromInnerMap(innerMap, tag);
if (innerMap.isEmpty()) {
- nodeToTagsWithCount.remove(nodeId);
+ typeToTagsWithCount.remove(type);
}
}
- private long getCardinality(NodeId nodeId, String tag) {
- Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
+ private long getCardinality(T type, String tag) {
+ Map<String, Long> innerMap = typeToTagsWithCount.get(type);
if (innerMap == null) {
return 0;
}
@@ -152,9 +158,9 @@ public class AllocationTagsManager {
return value == null ? 0 : value;
}
- private long getCardinality(NodeId nodeId, Set<String> tags,
+ private long getCardinality(T type, Set<String> tags,
LongBinaryOperator op) {
- Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
+ Map<String, Long> innerMap = typeToTagsWithCount.get(type);
if (innerMap == null) {
return 0;
}
@@ -193,29 +199,40 @@ public class AllocationTagsManager {
}
private boolean isEmpty() {
- return nodeToTagsWithCount.isEmpty();
+ return typeToTagsWithCount.isEmpty();
}
@VisibleForTesting
- public Map<NodeId, Map<String, Long>> getNodeToTagsWithCount() {
- return nodeToTagsWithCount;
+ public Map<T, Map<String, Long>> getTypeToTagsWithCount() {
+ return typeToTagsWithCount;
}
}
@VisibleForTesting
- Map<ApplicationId, NodeToCountedTags> getPerAppMappings() {
- return perAppMappings;
+ Map<ApplicationId, NodeToCountedTags> getPerAppNodeMappings() {
+ return perAppNodeMappings;
+ }
+
+ @VisibleForTesting
+ Map<ApplicationId, NodeToCountedTags> getPerAppRackMappings() {
+ return perAppRackMappings;
+ }
+
+ @VisibleForTesting
+ NodeToCountedTags getGlobalNodeMapping() {
+ return globalNodeMapping;
}
@VisibleForTesting
- NodeToCountedTags getGlobalMapping() {
- return globalMapping;
+ NodeToCountedTags getGlobalRackMapping() {
+ return globalRackMapping;
}
- public AllocationTagsManager() {
+ public AllocationTagsManager(RMContext context) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
+ rmContext = context;
}
/**
@@ -243,21 +260,30 @@ public class AllocationTagsManager {
writeLock.lock();
try {
- NodeToCountedTags perAppTagsMapping = perAppMappings.computeIfAbsent(
- applicationId, k -> new NodeToCountedTags());
-
+ NodeToCountedTags perAppTagsMapping = perAppNodeMappings
+ .computeIfAbsent(applicationId, k -> new NodeToCountedTags());
+ NodeToCountedTags perAppRackTagsMapping = perAppRackMappings
+ .computeIfAbsent(applicationId, k -> new NodeToCountedTags());
+ // Covering test-cases where context is mocked
+ String nodeRack = (rmContext.getRMNodes() != null
+ && rmContext.getRMNodes().get(nodeId) != null)
+ ? rmContext.getRMNodes().get(nodeId).getRackName()
+ : "default-rack";
if (useSet) {
- perAppTagsMapping.addTagsToNode(nodeId, allocationTags);
- globalMapping.addTagsToNode(nodeId, allocationTags);
+ perAppTagsMapping.addTags(nodeId, allocationTags);
+ perAppRackTagsMapping.addTags(nodeRack, allocationTags);
+ globalNodeMapping.addTags(nodeId, allocationTags);
+ globalRackMapping.addTags(nodeRack, allocationTags);
} else {
- perAppTagsMapping.addTagToNode(nodeId, applicationIdTag);
- globalMapping.addTagToNode(nodeId, applicationIdTag);
+ perAppTagsMapping.addTag(nodeId, applicationIdTag);
+ perAppRackTagsMapping.addTag(nodeRack, applicationIdTag);
+ globalNodeMapping.addTag(nodeId, applicationIdTag);
+ globalRackMapping.addTag(nodeRack, applicationIdTag);
}
if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Added container=" + containerId + " with tags=[" + StringUtils
- .join(allocationTags, ",") + "]");
+ LOG.debug("Added container=" + containerId + " with tags=["
+ + StringUtils.join(allocationTags, ",") + "]");
}
} finally {
writeLock.unlock();
@@ -287,27 +313,40 @@ public class AllocationTagsManager {
writeLock.lock();
try {
- NodeToCountedTags perAppTagsMapping = perAppMappings.get(applicationId);
+ NodeToCountedTags perAppTagsMapping =
+ perAppNodeMappings.get(applicationId);
+ NodeToCountedTags perAppRackTagsMapping =
+ perAppRackMappings.get(applicationId);
if (perAppTagsMapping == null) {
return;
}
-
+ // Covering test-cases where context is mocked
+ String nodeRack = (rmContext.getRMNodes() != null
+ && rmContext.getRMNodes().get(nodeId) != null)
+ ? rmContext.getRMNodes().get(nodeId).getRackName()
+ : "default-rack";
if (useSet) {
- perAppTagsMapping.removeTagsFromNode(nodeId, allocationTags);
- globalMapping.removeTagsFromNode(nodeId, allocationTags);
+ perAppTagsMapping.removeTags(nodeId, allocationTags);
+ perAppRackTagsMapping.removeTags(nodeRack, allocationTags);
+ globalNodeMapping.removeTags(nodeId, allocationTags);
+ globalRackMapping.removeTags(nodeRack, allocationTags);
} else {
- perAppTagsMapping.removeTagFromNode(nodeId, applicationIdTag);
- globalMapping.removeTagFromNode(nodeId, applicationIdTag);
+ perAppTagsMapping.removeTag(nodeId, applicationIdTag);
+ perAppRackTagsMapping.removeTag(nodeRack, applicationIdTag);
+ globalNodeMapping.removeTag(nodeId, applicationIdTag);
+ globalRackMapping.removeTag(nodeRack, applicationIdTag);
}
if (perAppTagsMapping.isEmpty()) {
- perAppMappings.remove(applicationId);
+ perAppNodeMappings.remove(applicationId);
+ }
+ if (perAppRackTagsMapping.isEmpty()) {
+ perAppRackMappings.remove(applicationId);
}
if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Removed container=" + containerId + " with tags=[" + StringUtils
- .join(allocationTags, ",") + "]");
+ LOG.debug("Removed container=" + containerId + " with tags=["
+ + StringUtils.join(allocationTags, ",") + "]");
}
} finally {
writeLock.unlock();
@@ -315,18 +354,16 @@ public class AllocationTagsManager {
}
/**
- * Get cardinality for following conditions. External can pass-in a binary op
- * to implement customized logic. *
+ * Get Node cardinality for a specific tag.
+ * When applicationId is null, method returns aggregated cardinality
+ *
* @param nodeId nodeId, required.
* @param applicationId applicationId. When null is specified, return
* aggregated cardinality among all nodes.
* @param tag allocation tag, see
* {@link SchedulingRequest#getAllocationTags()},
- * When multiple tags specified. Returns cardinality
- * depends on op. If a specified tag doesn't exist,
- * 0 will be its cardinality.
- * When null/empty tags specified, all tags
- * (of the node/app) will be considered.
+ * If a specified tag doesn't exist,
+ * method returns 0.
* @return cardinality of specified query on the node.
* @throws InvalidAllocationTagsQueryException when illegal query
* parameter specified
@@ -338,14 +375,14 @@ public class AllocationTagsManager {
try {
if (nodeId == null) {
throw new InvalidAllocationTagsQueryException(
- "Must specify nodeId/tags/op to query cardinality");
+ "Must specify nodeId/tag to query cardinality");
}
NodeToCountedTags mapping;
if (applicationId != null) {
- mapping = perAppMappings.get(applicationId);
- } else{
- mapping = globalMapping;
+ mapping = perAppNodeMappings.get(applicationId);
+ } else {
+ mapping = globalNodeMapping;
}
if (mapping == null) {
@@ -359,11 +396,54 @@ public class AllocationTagsManager {
}
/**
+ * Get Rack cardinality for a specific tag.
+ *
+ * @param rack rack, required.
+ * @param applicationId applicationId. When null is specified, return
+ * aggregated cardinality among all nodes.
+ * @param tag allocation tag, see
+ * {@link SchedulingRequest#getAllocationTags()},
+ * If a specified tag doesn't exist,
+ * method returns 0.
+ * @return cardinality of specified query on the rack.
+ * @throws InvalidAllocationTagsQueryException when illegal query
+ * parameter specified
+ */
+ public long getRackCardinality(String rack, ApplicationId applicationId,
+ String tag) throws InvalidAllocationTagsQueryException {
+ readLock.lock();
+
+ try {
+ if (rack == null) {
+ throw new InvalidAllocationTagsQueryException(
+ "Must specify rack/tag to query cardinality");
+ }
+
+ NodeToCountedTags mapping;
+ if (applicationId != null) {
+ mapping = perAppRackMappings.get(applicationId);
+ } else {
+ mapping = globalRackMapping;
+ }
+
+ if (mapping == null) {
+ return 0;
+ }
+
+ return mapping.getCardinality(rack, tag);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+
+
+ /**
* Check if given tag exists on node.
*
* @param nodeId nodeId, required.
* @param applicationId applicationId. When null is specified, return
- * aggregated cardinality among all nodes.
+ * aggregation among all applications.
* @param tag allocation tag, see
* {@link SchedulingRequest#getAllocationTags()},
* When multiple tags specified. Returns cardinality
@@ -387,7 +467,7 @@ public class AllocationTagsManager {
*
* @param nodeId nodeId, required.
* @param applicationId applicationId. When null is specified, return
- * aggregated cardinality among all nodes.
+ * aggregated cardinality among all applications.
* @param tags allocation tags, see
* {@link SchedulingRequest#getAllocationTags()},
* When multiple tags specified. Returns cardinality
@@ -396,7 +476,7 @@ public class AllocationTagsManager {
* specified, all tags (of the node/app) will be
* considered.
* @param op operator. Such as Long::max, Long::sum, etc. Required.
- * This sparameter only take effect when #values >= 2.
+ * This parameter only take effect when #values >= 2.
* @return cardinality of specified query on the node.
* @throws InvalidAllocationTagsQueryException when illegal query
* parameter specified
@@ -414,9 +494,9 @@ public class AllocationTagsManager {
NodeToCountedTags mapping;
if (applicationId != null) {
- mapping = perAppMappings.get(applicationId);
- } else{
- mapping = globalMapping;
+ mapping = perAppNodeMappings.get(applicationId);
+ } else {
+ mapping = globalNodeMapping;
}
if (mapping == null) {
@@ -428,4 +508,52 @@ public class AllocationTagsManager {
readLock.unlock();
}
}
+
+ /**
+ * Get cardinality for following conditions. External can pass-in a binary op
+ * to implement customized logic.
+ *
+ * @param rack rack, required.
+ * @param applicationId applicationId. When null is specified, return
+ * aggregated cardinality among all applications.
+ * @param tags allocation tags, see
+ * {@link SchedulingRequest#getAllocationTags()},
+ * When multiple tags specified. Returns cardinality
+ * depends on op. If a specified tag doesn't exist, 0
+ * will be its cardinality. When null/empty tags
+ * specified, all tags (of the rack/app) will be
+ * considered.
+ * @param op operator. Such as Long::max, Long::sum, etc. Required.
+ * This parameter only take effect when #values >= 2.
+ * @return cardinality of specified query on the rack.
+ * @throws InvalidAllocationTagsQueryException when illegal query
+ * parameter specified
+ */
+ public long getRackCardinalityByOp(String rack, ApplicationId applicationId,
+ Set<String> tags, LongBinaryOperator op)
+ throws InvalidAllocationTagsQueryException {
+ readLock.lock();
+
+ try {
+ if (rack == null || op == null) {
+ throw new InvalidAllocationTagsQueryException(
+ "Must specify rack/tags/op to query cardinality");
+ }
+
+ NodeToCountedTags mapping;
+ if (applicationId != null) {
+ mapping = perAppRackMappings.get(applicationId);
+ } else {
+ mapping = globalRackMapping;
+ }
+
+ if (mapping == null) {
+ return 0;
+ }
+
+ return mapping.getCardinality(rack, tags, op);
+ } finally {
+ readLock.unlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b46f2de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
index 538d128..b927870 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
@@ -405,8 +405,8 @@ public class TestRMContainerImpl {
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
- AllocationTagsManager tagsManager = new AllocationTagsManager();
RMContext rmContext = mock(RMContext.class);
+ AllocationTagsManager tagsManager = new AllocationTagsManager(rmContext);
when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b46f2de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
index 4bb2a18..0ce1614 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
@@ -20,202 +20,300 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
-import com.google.common.collect.ImmutableSet;
+import java.util.List;
+
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import com.google.common.collect.ImmutableSet;
+
/**
* Test functionality of AllocationTagsManager.
*/
public class TestAllocationTagsManager {
+ private RMContext rmContext;
+
+ @Before
+ public void setup() {
+ MockRM rm = new MockRM();
+ rm.start();
+ MockNodes.resetHostIds();
+ List<RMNode> rmNodes =
+ MockNodes.newNodes(2, 4, Resource.newInstance(4096, 4));
+ for (RMNode rmNode : rmNodes) {
+ rm.getRMContext().getRMNodes().putIfAbsent(rmNode.getNodeID(), rmNode);
+ }
+ rmContext = rm.getRMContext();
+ }
+
+
@Test
public void testAllocationTagsManagerSimpleCases()
throws InvalidAllocationTagsQueryException {
- AllocationTagsManager atm = new AllocationTagsManager();
+
+ AllocationTagsManager atm = new AllocationTagsManager(rmContext);
/**
* Construct test case:
- * Node1:
+ * Node1 (rack0):
* container_1_1 (mapper/reducer/app_1)
* container_1_3 (service/app_1)
*
- * Node2:
+ * Node2 (rack0):
* container_1_2 (mapper/reducer/app_1)
* container_1_4 (reducer/app_1)
* container_2_1 (service/app_2)
*/
// 3 Containers from app1
- atm.addContainer(NodeId.fromString("node1:1234"),
+ atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
- atm.addContainer(NodeId.fromString("node2:1234"),
+ atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
- atm.addContainer(NodeId.fromString("node1:1234"),
+ atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
- atm.addContainer(NodeId.fromString("node2:1234"),
+ atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
ImmutableSet.of("reducer"));
// 1 Container from app2
- atm.addContainer(NodeId.fromString("node2:1234"),
+ atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
- // Get Cardinality of app1 on node1, with tag "mapper"
+ // Get Node Cardinality of app1 on node1, with tag "mapper"
Assert.assertEquals(1,
- atm.getNodeCardinalityByOp(NodeId.fromString("node1:1234"),
+ atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
Long::max));
- // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=min
+ // Get Rack Cardinality of app1 on rack0, with tag "mapper"
+ Assert.assertEquals(2, atm.getRackCardinality("rack0",
+ TestUtils.getMockApplicationId(1), "mapper"));
+
+ // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=min
Assert.assertEquals(1,
- atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("mapper", "reducer"), Long::min));
- // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=max
+ // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=max
Assert.assertEquals(2,
- atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("mapper", "reducer"), Long::max));
- // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=sum
+ // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=sum
Assert.assertEquals(3,
- atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("mapper", "reducer"), Long::sum));
- // Get Cardinality by passing single tag.
+ // Get Node Cardinality by passing single tag.
Assert.assertEquals(1,
- atm.getNodeCardinality(NodeId.fromString("node2:1234"),
+ atm.getNodeCardinality(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), "mapper"));
Assert.assertEquals(2,
- atm.getNodeCardinality(NodeId.fromString("node2:1234"),
+ atm.getNodeCardinality(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), "reducer"));
- // Get Cardinality of app1 on node2, with tag "no_existed/reducer", op=min
+ // Get Node Cardinality of app1 on node2, with tag "no_existed/reducer",
+ // op=min
Assert.assertEquals(0,
- atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("no_existed", "reducer"), Long::min));
- // Get Cardinality of app1 on node2, with tag "<applicationId>", op=max
+ // Get Node Cardinality of app1 on node2, with tag "<applicationId>", op=max
// (Expect this returns #containers from app1 on node2)
+ Assert
+ .assertEquals(2,
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
+ TestUtils.getMockApplicationId(1),
+ ImmutableSet.of(AllocationTagsNamespaces.APP_ID
+ + TestUtils.getMockApplicationId(1).toString()),
+ Long::max));
+
+ // Get Node Cardinality of app1 on node2, with empty tag set, op=max
Assert.assertEquals(2,
- atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
- TestUtils.getMockApplicationId(1), ImmutableSet
- .of(AllocationTagsNamespaces.APP_ID + TestUtils
- .getMockApplicationId(1).toString()), Long::max));
-
- // Get Cardinality of app1 on node2, with empty tag set, op=max
- Assert.assertEquals(2,
- atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max));
- // Get Cardinality of all apps on node2, with empty tag set, op=sum
- Assert.assertEquals(7,
- atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), null,
- ImmutableSet.of(), Long::sum));
+ // Get Node Cardinality of all apps on node2, with empty tag set, op=sum
+ Assert.assertEquals(7, atm.getNodeCardinalityByOp(
+ NodeId.fromString("host2:123"), null, ImmutableSet.of(), Long::sum));
- // Get Cardinality of app_1 on node2, with empty tag set, op=sum
+ // Get Node Cardinality of app_1 on node2, with empty tag set, op=sum
Assert.assertEquals(5,
- atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum));
- // Get Cardinality of app_1 on node2, with empty tag set, op=sum
+ // Get Node Cardinality of app_1 on node2, with empty tag set, op=sum
Assert.assertEquals(2,
- atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum));
// Finish all containers:
- atm.removeContainer(NodeId.fromString("node1:1234"),
+ atm.removeContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
- atm.removeContainer(NodeId.fromString("node2:1234"),
+ atm.removeContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
- atm.removeContainer(NodeId.fromString("node1:1234"),
+ atm.removeContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
- atm.removeContainer(NodeId.fromString("node2:1234"),
+ atm.removeContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
ImmutableSet.of("reducer"));
- atm.removeContainer(NodeId.fromString("node2:1234"),
+ atm.removeContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
// Expect all cardinality to be 0
// Get Cardinality of app1 on node1, with tag "mapper"
Assert.assertEquals(0,
- atm.getNodeCardinalityByOp(NodeId.fromString("node1:1234"),
+ atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
Long::max));
- // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=min
+ // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=min
Assert.assertEquals(0,
- atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("mapper", "reducer"), Long::min));
- // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=max
+ // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=max
Assert.assertEquals(0,
- atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("mapper", "reducer"), Long::max));
- // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=sum
+ // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=sum
Assert.assertEquals(0,
- atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("mapper", "reducer"), Long::sum));
- // Get Cardinality of app1 on node2, with tag "<applicationId>", op=max
+ // Get Node Cardinality of app1 on node2, with tag "<applicationId>", op=max
// (Expect this returns #containers from app1 on node2)
Assert.assertEquals(0,
- atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of(TestUtils.getMockApplicationId(1).toString()),
Long::max));
Assert.assertEquals(0,
- atm.getNodeCardinality(NodeId.fromString("node2:1234"),
+ atm.getNodeCardinality(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1),
TestUtils.getMockApplicationId(1).toString()));
- // Get Cardinality of app1 on node2, with empty tag set, op=max
+ // Get Node Cardinality of app1 on node2, with empty tag set, op=max
Assert.assertEquals(0,
- atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max));
- // Get Cardinality of all apps on node2, with empty tag set, op=sum
- Assert.assertEquals(0,
- atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), null,
- ImmutableSet.of(), Long::sum));
+ // Get Node Cardinality of all apps on node2, with empty tag set, op=sum
+ Assert.assertEquals(0, atm.getNodeCardinalityByOp(
+ NodeId.fromString("host2:123"), null, ImmutableSet.of(), Long::sum));
- // Get Cardinality of app_1 on node2, with empty tag set, op=sum
+ // Get Node Cardinality of app_1 on node2, with empty tag set, op=sum
Assert.assertEquals(0,
- atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum));
- // Get Cardinality of app_1 on node2, with empty tag set, op=sum
+ // Get Node Cardinality of app_2 on node2, with empty tag set, op=sum
Assert.assertEquals(0,
- atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum));
}
+
+ @Test
+ public void testAllocationTagsManagerRackMapping()
+ throws InvalidAllocationTagsQueryException {
+
+ AllocationTagsManager atm = new AllocationTagsManager(rmContext);
+
+ /**
+ * Construct Rack test case:
+ * Node1 (rack0):
+ * container_1_1 (mapper/reducer/app_1)
+ * container_1_4 (reducer/app_2)
+ *
+ * Node2 (rack0):
+ * container_1_2 (mapper/reducer/app_2)
+ * container_1_3 (service/app_1)
+ *
+ * Node5 (rack1):
+ * container_2_1 (service/app_2)
+ */
+
+ // 3 Containers from app1
+ atm.addContainer(NodeId.fromString("host1:123"),
+ TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+ ImmutableSet.of("mapper", "reducer"));
+
+ atm.addContainer(NodeId.fromString("host2:123"),
+ TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 2),
+ ImmutableSet.of("mapper", "reducer"));
+
+ atm.addContainer(NodeId.fromString("host1:123"),
+ TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 4),
+ ImmutableSet.of("reducer"));
+
+ atm.addContainer(NodeId.fromString("host2:123"),
+ TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
+ ImmutableSet.of("service"));
+
+ // 1 Container from app2
+ atm.addContainer(NodeId.fromString("host2:123"),
+ TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
+ ImmutableSet.of("service"));
+
+ // Get Rack Cardinality of app1 on rack0, with tag "mapper"
+ Assert.assertEquals(1, atm.getRackCardinality("rack0",
+ TestUtils.getMockApplicationId(1), "mapper"));
+
+ // Get Rack Cardinality of app2 on rack0, with tag "reducer"
+ Assert.assertEquals(2, atm.getRackCardinality("rack0",
+ TestUtils.getMockApplicationId(2), "reducer"));
+
+ // Get Rack Cardinality of all apps on rack0, with tag "reducer"
+ Assert.assertEquals(3, atm.getRackCardinality("rack0", null, "reducer"));
+
+ // Get Rack Cardinality of app_1 on rack0, with empty tag set, op=max
+ Assert.assertEquals(2, atm.getRackCardinalityByOp("rack0",
+ TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max));
+
+ // Get Rack Cardinality of app_1 on rack0, with empty tag set, op=min
+ Assert.assertEquals(1, atm.getRackCardinalityByOp("rack0",
+ TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::min));
+
+ // Get Rack Cardinality of all apps on rack0, with empty tag set, op=min
+ Assert.assertEquals(3, atm.getRackCardinalityByOp("rack0", null,
+ ImmutableSet.of(), Long::max));
+ }
+
@Test
public void testAllocationTagsManagerMemoryAfterCleanup()
throws InvalidAllocationTagsQueryException {
@@ -223,54 +321,57 @@ public class TestAllocationTagsManager {
* Make sure YARN cleans up all memory once container/app finishes.
*/
- AllocationTagsManager atm = new AllocationTagsManager();
+ AllocationTagsManager atm = new AllocationTagsManager(rmContext);
// Add a bunch of containers
- atm.addContainer(NodeId.fromString("node1:1234"),
+ atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
- atm.addContainer(NodeId.fromString("node2:1234"),
+ atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
- atm.addContainer(NodeId.fromString("node1:1234"),
+ atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
- atm.addContainer(NodeId.fromString("node2:1234"),
+ atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
ImmutableSet.of("reducer"));
- atm.addContainer(NodeId.fromString("node2:1234"),
+ atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
// Remove all these containers
- atm.removeContainer(NodeId.fromString("node1:1234"),
+ atm.removeContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
- atm.removeContainer(NodeId.fromString("node2:1234"),
+ atm.removeContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
- atm.removeContainer(NodeId.fromString("node1:1234"),
+ atm.removeContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
- atm.removeContainer(NodeId.fromString("node2:1234"),
+ atm.removeContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
ImmutableSet.of("reducer"));
- atm.removeContainer(NodeId.fromString("node2:1234"),
+ atm.removeContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
// Check internal data structure
Assert.assertEquals(0,
- atm.getGlobalMapping().getNodeToTagsWithCount().size());
- Assert.assertEquals(0, atm.getPerAppMappings().size());
+ atm.getGlobalNodeMapping().getTypeToTagsWithCount().size());
+ Assert.assertEquals(0, atm.getPerAppNodeMappings().size());
+ Assert.assertEquals(0,
+ atm.getGlobalRackMapping().getTypeToTagsWithCount().size());
+ Assert.assertEquals(0, atm.getPerAppRackMappings().size());
}
@Test
@@ -280,26 +381,26 @@ public class TestAllocationTagsManager {
* Make sure YARN cleans up all memory once container/app finishes.
*/
- AllocationTagsManager atm = new AllocationTagsManager();
+ AllocationTagsManager atm = new AllocationTagsManager(rmContext);
// Add a bunch of containers
- atm.addContainer(NodeId.fromString("node1:1234"),
+ atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
- atm.addContainer(NodeId.fromString("node2:1234"),
+ atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
- atm.addContainer(NodeId.fromString("node1:1234"),
+ atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
- atm.addContainer(NodeId.fromString("node2:1234"),
+ atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
ImmutableSet.of("reducer"));
- atm.addContainer(NodeId.fromString("node2:1234"),
+ atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
@@ -317,7 +418,7 @@ public class TestAllocationTagsManager {
// No op
caughtException = false;
try {
- atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), ImmutableSet.of("mapper"), null);
} catch (InvalidAllocationTagsQueryException e) {
caughtException = true;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org