You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/09 19:56:08 UTC
[50/50] [abbrv] hadoop git commit: YARN-7522. Introduce
AllocationTagsManager to associate allocation tags to nodes. (Wangda Tan via
asuresh)
YARN-7522. Introduce AllocationTagsManager to associate allocation tags to nodes. (Wangda Tan via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3f037a13
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3f037a13
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3f037a13
Branch: refs/heads/YARN-6592
Commit: 3f037a13d4387da946686b62cbd931e361f07835
Parents: c087d11
Author: Arun Suresh <as...@apache.org>
Authored: Fri Dec 8 00:24:00 2017 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Tue Jan 9 11:54:47 2018 -0800
----------------------------------------------------------------------
.../resourcemanager/RMActiveServiceContext.java | 15 +
.../yarn/server/resourcemanager/RMContext.java | 5 +
.../server/resourcemanager/RMContextImpl.java | 12 +
.../server/resourcemanager/ResourceManager.java | 9 +
.../constraint/AllocationTagsManager.java | 431 +++++++++++++++++++
.../constraint/AllocationTagsNamespaces.java | 31 ++
.../InvalidAllocationTagsQueryException.java | 35 ++
.../rmcontainer/RMContainer.java | 8 +
.../rmcontainer/RMContainerImpl.java | 21 +
.../constraint/TestAllocationTagsManager.java | 328 ++++++++++++++
.../rmcontainer/TestRMContainerImpl.java | 124 ++++++
.../scheduler/capacity/TestUtils.java | 9 +
.../scheduler/fifo/TestFifoScheduler.java | 5 +
13 files changed, 1033 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f037a13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index 9dc5945..6ee3a4c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
+import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@@ -107,6 +108,7 @@ public class RMActiveServiceContext {
private RMAppLifetimeMonitor rmAppLifetimeMonitor;
private QueueLimitCalculator queueLimitCalculator;
+ private AllocationTagsManager allocationTagsManager;
public RMActiveServiceContext() {
queuePlacementManager = new PlacementManager();
@@ -398,6 +400,19 @@ public class RMActiveServiceContext {
@Private
@Unstable
+ public AllocationTagsManager getAllocationTagsManager() {
+ return allocationTagsManager;
+ }
+
+ @Private
+ @Unstable
+ public void setAllocationTagsManager(
+ AllocationTagsManager allocationTagsManager) {
+ this.allocationTagsManager = allocationTagsManager;
+ }
+
+ @Private
+ @Unstable
public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() {
return rmDelegatedNodeLabelsUpdater;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f037a13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index ec94030..62899d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
+import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@@ -166,4 +167,8 @@ public interface RMContext extends ApplicationMasterServiceContext {
void setResourceProfilesManager(ResourceProfilesManager mgr);
String getAppProxyUrl(Configuration conf, ApplicationId applicationId);
+
+ AllocationTagsManager getAllocationTagsManager();
+
+ void setAllocationTagsManager(AllocationTagsManager allocationTagsManager);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f037a13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 80a9109..315fdc1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
+import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@@ -504,6 +505,17 @@ public class RMContextImpl implements RMContext {
}
@Override
+ public AllocationTagsManager getAllocationTagsManager() {
+ return activeServiceContext.getAllocationTagsManager();
+ }
+
+ @Override
+ public void setAllocationTagsManager(
+ AllocationTagsManager allocationTagsManager) {
+ activeServiceContext.setAllocationTagsManager(allocationTagsManager);
+ }
+
+ @Override
public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() {
return activeServiceContext.getRMDelegatedNodeLabelsUpdater();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f037a13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index a0317f6..8d1000e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Pu
import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@@ -490,6 +491,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
throws InstantiationException, IllegalAccessException {
return new RMNodeLabelsManager();
}
+
+ protected AllocationTagsManager createAllocationTagsManager() {
+ return new AllocationTagsManager();
+ }
protected DelegationTokenRenewer createDelegationTokenRenewer() {
return new DelegationTokenRenewer();
@@ -609,6 +614,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
addService(nlm);
rmContext.setNodeLabelManager(nlm);
+ AllocationTagsManager allocationTagsManager =
+ createAllocationTagsManager();
+ rmContext.setAllocationTagsManager(allocationTagsManager);
+
RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater =
createRMDelegatedNodeLabelsUpdater();
if (delegatedNodeLabelsUpdater != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f037a13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java
new file mode 100644
index 0000000..b67fab9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsManager.java
@@ -0,0 +1,431 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.constraint;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.LongBinaryOperator;
+
+/**
+ * Support storing maps between container-tags/applications and
+ * nodes. This will be required by affinity/anti-affinity implementation and
+ * cardinality.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class AllocationTagsManager {
+
+ private static final Logger LOG = Logger.getLogger(
+ AllocationTagsManager.class);
+
+ private ReentrantReadWriteLock.ReadLock readLock;
+ private ReentrantReadWriteLock.WriteLock writeLock;
+
+ // Application's tags to node
+ private Map<ApplicationId, NodeToCountedTags> perAppMappings =
+ new HashMap<>();
+
+ // Global tags to node mapping (used to fast return aggregated tags
+ // cardinality across apps)
+ private NodeToCountedTags globalMapping = new NodeToCountedTags();
+
+ /**
+ * Store node to counted tags.
+ */
+ @VisibleForTesting
+ static class NodeToCountedTags {
+ // Map<NodeId, Map<Tag, Count>>
+ private Map<NodeId, Map<String, Long>> nodeToTagsWithCount =
+ new HashMap<>();
+
+ // protected by external locks
+ private void addTagsToNode(NodeId nodeId, Set<String> tags) {
+ Map<String, Long> innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId,
+ k -> new HashMap<>());
+
+ for (String tag : tags) {
+ Long count = innerMap.get(tag);
+ if (count == null) {
+ innerMap.put(tag, 1L);
+ } else{
+ innerMap.put(tag, count + 1);
+ }
+ }
+ }
+
+ // protected by external locks
+ private void addTagToNode(NodeId nodeId, String tag) {
+ Map<String, Long> innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId,
+ k -> new HashMap<>());
+
+ Long count = innerMap.get(tag);
+ if (count == null) {
+ innerMap.put(tag, 1L);
+ } else{
+ innerMap.put(tag, count + 1);
+ }
+ }
+
+ private void removeTagFromInnerMap(Map<String, Long> innerMap, String tag) {
+ Long count = innerMap.get(tag);
+ if (count > 1) {
+ innerMap.put(tag, count - 1);
+ } else {
+ if (count <= 0) {
+ LOG.warn(
+ "Trying to remove tags from node, however the count already"
+ + " becomes 0 or less, it could be a potential bug.");
+ }
+ innerMap.remove(tag);
+ }
+ }
+
+ private void removeTagsFromNode(NodeId nodeId, Set<String> tags) {
+ Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
+ if (innerMap == null) {
+ LOG.warn("Failed to find node=" + nodeId
+ + " while trying to remove tags, please double check.");
+ return;
+ }
+
+ for (String tag : tags) {
+ removeTagFromInnerMap(innerMap, tag);
+ }
+
+ if (innerMap.isEmpty()) {
+ nodeToTagsWithCount.remove(nodeId);
+ }
+ }
+
+ private void removeTagFromNode(NodeId nodeId, String tag) {
+ Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
+ if (innerMap == null) {
+ LOG.warn("Failed to find node=" + nodeId
+ + " while trying to remove tags, please double check.");
+ return;
+ }
+
+ removeTagFromInnerMap(innerMap, tag);
+
+ if (innerMap.isEmpty()) {
+ nodeToTagsWithCount.remove(nodeId);
+ }
+ }
+
+ private long getCardinality(NodeId nodeId, String tag) {
+ Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
+ if (innerMap == null) {
+ return 0;
+ }
+ Long value = innerMap.get(tag);
+ return value == null ? 0 : value;
+ }
+
+ private long getCardinality(NodeId nodeId, Set<String> tags,
+ LongBinaryOperator op) {
+ Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
+ if (innerMap == null) {
+ return 0;
+ }
+
+ long returnValue = 0;
+ boolean firstTag = true;
+
+ if (tags != null && !tags.isEmpty()) {
+ for (String tag : tags) {
+ Long value = innerMap.get(tag);
+ if (value == null) {
+ value = 0L;
+ }
+
+ if (firstTag) {
+ returnValue = value;
+ firstTag = false;
+ continue;
+ }
+
+ returnValue = op.applyAsLong(returnValue, value);
+ }
+ } else {
+ // Similar to above if, but only iterate values for better performance
+ for (long value : innerMap.values()) {
+ // For the first value, we will not apply op
+ if (firstTag) {
+ returnValue = value;
+ firstTag = false;
+ continue;
+ }
+ returnValue = op.applyAsLong(returnValue, value);
+ }
+ }
+ return returnValue;
+ }
+
+ private boolean isEmpty() {
+ return nodeToTagsWithCount.isEmpty();
+ }
+
+ @VisibleForTesting
+ public Map<NodeId, Map<String, Long>> getNodeToTagsWithCount() {
+ return nodeToTagsWithCount;
+ }
+ }
+
+ @VisibleForTesting
+ Map<ApplicationId, NodeToCountedTags> getPerAppMappings() {
+ return perAppMappings;
+ }
+
+ @VisibleForTesting
+ NodeToCountedTags getGlobalMapping() {
+ return globalMapping;
+ }
+
+ public AllocationTagsManager() {
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ readLock = lock.readLock();
+ writeLock = lock.writeLock();
+ }
+
+ /**
+ * Notify container allocated on a node.
+ *
+ * @param nodeId allocated node.
+ * @param applicationId applicationId
+ * @param containerId container id.
+ * @param allocationTags allocation tags, see
+ * {@link SchedulingRequest#getAllocationTags()}
+ * application_id will be added to allocationTags.
+ */
+ public void addContainer(NodeId nodeId, ApplicationId applicationId,
+ ContainerId containerId, Set<String> allocationTags) {
+ String applicationIdTag =
+ AllocationTagsNamespaces.APP_ID + applicationId.toString();
+
+ boolean useSet = false;
+ if (allocationTags != null && !allocationTags.isEmpty()) {
+ // Copy before edit it.
+ allocationTags = new HashSet<>(allocationTags);
+ allocationTags.add(applicationIdTag);
+ useSet = true;
+ }
+
+ writeLock.lock();
+ try {
+ NodeToCountedTags perAppTagsMapping = perAppMappings.computeIfAbsent(
+ applicationId, k -> new NodeToCountedTags());
+
+ if (useSet) {
+ perAppTagsMapping.addTagsToNode(nodeId, allocationTags);
+ globalMapping.addTagsToNode(nodeId, allocationTags);
+ } else {
+ perAppTagsMapping.addTagToNode(nodeId, applicationIdTag);
+ globalMapping.addTagToNode(nodeId, applicationIdTag);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Added container=" + containerId + " with tags=[" + StringUtils
+ .join(allocationTags, ",") + "]");
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Notify container removed.
+ *
+ * @param nodeId nodeId
+ * @param applicationId applicationId
+ * @param containerId containerId.
+ * @param allocationTags allocation tags for given container
+ */
+ public void removeContainer(NodeId nodeId, ApplicationId applicationId,
+ ContainerId containerId, Set<String> allocationTags) {
+ String applicationIdTag =
+ AllocationTagsNamespaces.APP_ID + applicationId.toString();
+ boolean useSet = false;
+
+ if (allocationTags != null && !allocationTags.isEmpty()) {
+ // Copy before edit it.
+ allocationTags = new HashSet<>(allocationTags);
+ allocationTags.add(applicationIdTag);
+ useSet = true;
+ }
+
+ writeLock.lock();
+ try {
+ NodeToCountedTags perAppTagsMapping = perAppMappings.get(applicationId);
+ if (perAppTagsMapping == null) {
+ return;
+ }
+
+ if (useSet) {
+ perAppTagsMapping.removeTagsFromNode(nodeId, allocationTags);
+ globalMapping.removeTagsFromNode(nodeId, allocationTags);
+ } else {
+ perAppTagsMapping.removeTagFromNode(nodeId, applicationIdTag);
+ globalMapping.removeTagFromNode(nodeId, applicationIdTag);
+ }
+
+ if (perAppTagsMapping.isEmpty()) {
+ perAppMappings.remove(applicationId);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Removed container=" + containerId + " with tags=[" + StringUtils
+ .join(allocationTags, ",") + "]");
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Get cardinality for following conditions. External can pass-in a binary op
+ * to implement customized logic. *
+ * @param nodeId nodeId, required.
+ * @param applicationId applicationId. When null is specified, return
+ * aggregated cardinality among all nodes.
+ * @param tag allocation tag, see
+ * {@link SchedulingRequest#getAllocationTags()},
+ * When multiple tags specified. Returns cardinality
+ * depends on op. If a specified tag doesn't exist,
+ * 0 will be its cardinality.
+ * When null/empty tags specified, all tags
+ * (of the node/app) will be considered.
+ * @return cardinality of specified query on the node.
+ * @throws InvalidAllocationTagsQueryException when illegal query
+ * parameter specified
+ */
+ public long getNodeCardinality(NodeId nodeId, ApplicationId applicationId,
+ String tag) throws InvalidAllocationTagsQueryException {
+ readLock.lock();
+
+ try {
+ if (nodeId == null) {
+ throw new InvalidAllocationTagsQueryException(
+ "Must specify nodeId/tags/op to query cardinality");
+ }
+
+ NodeToCountedTags mapping;
+ if (applicationId != null) {
+ mapping = perAppMappings.get(applicationId);
+ } else{
+ mapping = globalMapping;
+ }
+
+ if (mapping == null) {
+ return 0;
+ }
+
+ return mapping.getCardinality(nodeId, tag);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Check if given tag exists on node.
+ *
+ * @param nodeId nodeId, required.
+ * @param applicationId applicationId. When null is specified, return
+ * aggregated cardinality among all nodes.
+ * @param tag allocation tag, see
+ * {@link SchedulingRequest#getAllocationTags()},
+ * When multiple tags specified. Returns cardinality
+ * depends on op. If a specified tag doesn't exist,
+ * 0 will be its cardinality.
+ * When null/empty tags specified, all tags
+ * (of the node/app) will be considered.
+ * @return cardinality of specified query on the node.
+ * @throws InvalidAllocationTagsQueryException when illegal query
+ * parameter specified
+ */
+ public boolean allocationTagExistsOnNode(NodeId nodeId,
+ ApplicationId applicationId, String tag)
+ throws InvalidAllocationTagsQueryException {
+ return getNodeCardinality(nodeId, applicationId, tag) > 0;
+ }
+
+ /**
+ * Get cardinality for following conditions. External can pass-in a binary op
+ * to implement customized logic.
+ *
+ * @param nodeId nodeId, required.
+ * @param applicationId applicationId. When null is specified, return
+ * aggregated cardinality among all nodes.
+ * @param tags allocation tags, see
+ * {@link SchedulingRequest#getAllocationTags()},
+ * When multiple tags specified. Returns cardinality
+ * depends on op. If a specified tag doesn't exist, 0
+ * will be its cardinality. When null/empty tags
+ * specified, all tags (of the node/app) will be
+ * considered.
+ * @param op operator. Such as Long::max, Long::sum, etc. Required.
+ * This sparameter only take effect when #values >= 2.
+ * @return cardinality of specified query on the node.
+ * @throws InvalidAllocationTagsQueryException when illegal query
+ * parameter specified
+ */
+ public long getNodeCardinalityByOp(NodeId nodeId, ApplicationId applicationId,
+ Set<String> tags, LongBinaryOperator op)
+ throws InvalidAllocationTagsQueryException {
+ readLock.lock();
+
+ try {
+ if (nodeId == null || op == null) {
+ throw new InvalidAllocationTagsQueryException(
+ "Must specify nodeId/tags/op to query cardinality");
+ }
+
+ NodeToCountedTags mapping;
+ if (applicationId != null) {
+ mapping = perAppMappings.get(applicationId);
+ } else{
+ mapping = globalMapping;
+ }
+
+ if (mapping == null) {
+ return 0;
+ }
+
+ return mapping.getCardinality(nodeId, tags, op);
+ } finally {
+ readLock.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f037a13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java
new file mode 100644
index 0000000..893ff1c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/AllocationTagsNamespaces.java
@@ -0,0 +1,31 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.constraint;
+
+/**
+ * Predefined namespaces for tags
+ *
+ * Same as namespace of resource types. Namespaces of placement tags are start
+ * with alphabets and ended with "/"
+ */
+public class AllocationTagsNamespaces {
+ public static final String APP_ID = "yarn_app_id/";
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f037a13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/InvalidAllocationTagsQueryException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/InvalidAllocationTagsQueryException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/InvalidAllocationTagsQueryException.java
new file mode 100644
index 0000000..5519e39
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/InvalidAllocationTagsQueryException.java
@@ -0,0 +1,35 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.constraint;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Exception when invalid parameter specified to do placement tags related
+ * queries.
+ */
+public class InvalidAllocationTagsQueryException extends YarnException {
+ private static final long serialVersionUID = 12312831974894L;
+
+ public InvalidAllocationTagsQueryException(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f037a13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
index f3cbf63..8f751b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
import java.util.List;
+import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -30,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@@ -115,4 +117,10 @@ public interface RMContainer extends EventHandler<RMContainerEvent>,
boolean completed();
NodeId getNodeId();
+
+ /**
+ * Return {@link SchedulingRequest#getAllocationTags()} specified by AM.
+ * @return allocation tags, could be null/empty
+ */
+ Set<String> getAllocationTags();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f037a13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index e26689e..184cdfc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -189,6 +190,9 @@ public class RMContainerImpl implements RMContainer {
private boolean isExternallyAllocated;
private SchedulerRequestKey allocatedSchedulerKey;
+ // TODO, set it when container allocated by scheduler (From SchedulingRequest)
+ private Set<String> allocationTags = null;
+
public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext) {
@@ -501,6 +505,11 @@ public class RMContainerImpl implements RMContainer {
return nodeId;
}
+ @Override
+ public Set<String> getAllocationTags() {
+ return allocationTags;
+ }
+
private static class BaseTransition implements
SingleArcTransition<RMContainerImpl, RMContainerEvent> {
@@ -565,6 +574,12 @@ public class RMContainerImpl implements RMContainer {
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
+ // Notify placementManager
+ container.rmContext.getAllocationTagsManager().addContainer(
+ container.getNodeId(),
+ container.getApplicationAttemptId().getApplicationId(),
+ container.getContainerId(), container.getAllocationTags());
+
container.eventHandler.handle(new RMAppAttemptEvent(
container.appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED));
}
@@ -676,6 +691,12 @@ public class RMContainerImpl implements RMContainer {
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
+ // Notify placementManager
+ container.rmContext.getAllocationTagsManager().removeContainer(
+ container.getNodeId(),
+ container.getApplicationAttemptId().getApplicationId(),
+ container.getContainerId(), container.getAllocationTags());
+
RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event;
container.finishTime = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f037a13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/TestAllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/TestAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/TestAllocationTagsManager.java
new file mode 100644
index 0000000..0358792
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/constraint/TestAllocationTagsManager.java
@@ -0,0 +1,328 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.constraint;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test functionality of AllocationTagsManager.
+ */
+public class TestAllocationTagsManager {
+ @Test
+ public void testAllocationTagsManagerSimpleCases()
+ throws InvalidAllocationTagsQueryException {
+ AllocationTagsManager atm = new AllocationTagsManager();
+
+ /**
+ * Construct test case:
+ * Node1:
+ * container_1_1 (mapper/reducer/app_1)
+ * container_1_3 (service/app_1)
+ *
+ * Node2:
+ * container_1_2 (mapper/reducer/app_1)
+ * container_1_4 (reducer/app_1)
+ * container_2_1 (service/app_2)
+ */
+
+ // 3 Containers from app1
+ atm.addContainer(NodeId.fromString("node1:1234"),
+ TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+ ImmutableSet.of("mapper", "reducer"));
+
+ atm.addContainer(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
+ ImmutableSet.of("mapper", "reducer"));
+
+ atm.addContainer(NodeId.fromString("node1:1234"),
+ TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
+ ImmutableSet.of("service"));
+
+ atm.addContainer(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
+ ImmutableSet.of("reducer"));
+
+ // 1 Container from app2
+ atm.addContainer(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
+ ImmutableSet.of("service"));
+
+ // Get Cardinality of app1 on node1, with tag "mapper"
+ Assert.assertEquals(1,
+ atm.getNodeCardinalityByOp(NodeId.fromString("node1:1234"),
+ TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
+ Long::max));
+
+ // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=min
+ Assert.assertEquals(1,
+ atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1),
+ ImmutableSet.of("mapper", "reducer"), Long::min));
+
+ // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=max
+ Assert.assertEquals(2,
+ atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1),
+ ImmutableSet.of("mapper", "reducer"), Long::max));
+
+ // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=sum
+ Assert.assertEquals(3,
+ atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1),
+ ImmutableSet.of("mapper", "reducer"), Long::sum));
+
+ // Get Cardinality by passing single tag.
+ Assert.assertEquals(1,
+ atm.getNodeCardinality(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1), "mapper"));
+
+ Assert.assertEquals(2,
+ atm.getNodeCardinality(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1), "reducer"));
+
+ // Get Cardinality of app1 on node2, with tag "no_existed/reducer", op=min
+ Assert.assertEquals(0,
+ atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1),
+ ImmutableSet.of("no_existed", "reducer"), Long::min));
+
+ // Get Cardinality of app1 on node2, with tag "<applicationId>", op=max
+ // (Expect this returns #containers from app1 on node2)
+ Assert.assertEquals(2,
+ atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1), ImmutableSet
+ .of(AllocationTagsNamespaces.APP_ID + TestUtils
+ .getMockApplicationId(1).toString()), Long::max));
+
+ // Get Cardinality of app1 on node2, with empty tag set, op=max
+ Assert.assertEquals(2,
+ atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max));
+
+ // Get Cardinality of all apps on node2, with empty tag set, op=sum
+ Assert.assertEquals(7,
+ atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), null,
+ ImmutableSet.of(), Long::sum));
+
+ // Get Cardinality of app_1 on node2, with empty tag set, op=sum
+ Assert.assertEquals(5,
+ atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum));
+
+ // Get Cardinality of app_1 on node2, with empty tag set, op=sum
+ Assert.assertEquals(2,
+ atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum));
+
+ // Finish all containers:
+ atm.removeContainer(NodeId.fromString("node1:1234"),
+ TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+ ImmutableSet.of("mapper", "reducer"));
+
+ atm.removeContainer(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
+ ImmutableSet.of("mapper", "reducer"));
+
+ atm.removeContainer(NodeId.fromString("node1:1234"),
+ TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
+ ImmutableSet.of("service"));
+
+ atm.removeContainer(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
+ ImmutableSet.of("reducer"));
+
+ atm.removeContainer(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
+ ImmutableSet.of("service"));
+
+ // Expect all cardinality to be 0
+ // Get Cardinality of app1 on node1, with tag "mapper"
+ Assert.assertEquals(0,
+ atm.getNodeCardinalityByOp(NodeId.fromString("node1:1234"),
+ TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
+ Long::max));
+
+ // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=min
+ Assert.assertEquals(0,
+ atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1),
+ ImmutableSet.of("mapper", "reducer"), Long::min));
+
+ // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=max
+ Assert.assertEquals(0,
+ atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1),
+ ImmutableSet.of("mapper", "reducer"), Long::max));
+
+ // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=sum
+ Assert.assertEquals(0,
+ atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1),
+ ImmutableSet.of("mapper", "reducer"), Long::sum));
+
+ // Get Cardinality of app1 on node2, with tag "<applicationId>", op=max
+ // (Expect this returns #containers from app1 on node2)
+ Assert.assertEquals(0,
+ atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1),
+ ImmutableSet.of(TestUtils.getMockApplicationId(1).toString()),
+ Long::max));
+
+ Assert.assertEquals(0,
+ atm.getNodeCardinality(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1),
+ TestUtils.getMockApplicationId(1).toString()));
+
+ // Get Cardinality of app1 on node2, with empty tag set, op=max
+ Assert.assertEquals(0,
+ atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max));
+
+ // Get Cardinality of all apps on node2, with empty tag set, op=sum
+ Assert.assertEquals(0,
+ atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), null,
+ ImmutableSet.of(), Long::sum));
+
+ // Get Cardinality of app_1 on node2, with empty tag set, op=sum
+ Assert.assertEquals(0,
+ atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum));
+
+ // Get Cardinality of app_1 on node2, with empty tag set, op=sum
+ Assert.assertEquals(0,
+ atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum));
+ }
+
+ @Test
+ public void testAllocationTagsManagerMemoryAfterCleanup()
+ throws InvalidAllocationTagsQueryException {
+ /**
+ * Make sure YARN cleans up all memory once container/app finishes.
+ */
+
+ AllocationTagsManager atm = new AllocationTagsManager();
+
+ // Add a bunch of containers
+ atm.addContainer(NodeId.fromString("node1:1234"),
+ TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+ ImmutableSet.of("mapper", "reducer"));
+
+ atm.addContainer(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
+ ImmutableSet.of("mapper", "reducer"));
+
+ atm.addContainer(NodeId.fromString("node1:1234"),
+ TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
+ ImmutableSet.of("service"));
+
+ atm.addContainer(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
+ ImmutableSet.of("reducer"));
+
+ atm.addContainer(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
+ ImmutableSet.of("service"));
+
+ // Remove all these containers
+ atm.removeContainer(NodeId.fromString("node1:1234"),
+ TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+ ImmutableSet.of("mapper", "reducer"));
+
+ atm.removeContainer(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
+ ImmutableSet.of("mapper", "reducer"));
+
+ atm.removeContainer(NodeId.fromString("node1:1234"),
+ TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
+ ImmutableSet.of("service"));
+
+ atm.removeContainer(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
+ ImmutableSet.of("reducer"));
+
+ atm.removeContainer(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
+ ImmutableSet.of("service"));
+
+ // Check internal data structure
+ Assert.assertEquals(0,
+ atm.getGlobalMapping().getNodeToTagsWithCount().size());
+ Assert.assertEquals(0, atm.getPerAppMappings().size());
+ }
+
+ @Test
+ public void testQueryCardinalityWithIllegalParameters()
+ throws InvalidAllocationTagsQueryException {
+ /**
+ * Make sure YARN cleans up all memory once container/app finishes.
+ */
+
+ AllocationTagsManager atm = new AllocationTagsManager();
+
+ // Add a bunch of containers
+ atm.addContainer(NodeId.fromString("node1:1234"),
+ TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+ ImmutableSet.of("mapper", "reducer"));
+
+ atm.addContainer(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
+ ImmutableSet.of("mapper", "reducer"));
+
+ atm.addContainer(NodeId.fromString("node1:1234"),
+ TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
+ ImmutableSet.of("service"));
+
+ atm.addContainer(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
+ ImmutableSet.of("reducer"));
+
+ atm.addContainer(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
+ ImmutableSet.of("service"));
+
+ // No node-id
+ boolean caughtException = false;
+ try {
+ atm.getNodeCardinalityByOp(null, TestUtils.getMockApplicationId(2),
+ ImmutableSet.of("mapper"), Long::min);
+ } catch (InvalidAllocationTagsQueryException e) {
+ caughtException = true;
+ }
+ Assert.assertTrue("should fail because of nodeId specified",
+ caughtException);
+
+ // No op
+ caughtException = false;
+ try {
+ atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
+ TestUtils.getMockApplicationId(2), ImmutableSet.of("mapper"), null);
+ } catch (InvalidAllocationTagsQueryException e) {
+ caughtException = true;
+ }
+ Assert.assertTrue("should fail because of nodeId specified",
+ caughtException);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f037a13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
index 6c189b3..27ff311 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@@ -109,6 +110,8 @@ public class TestRMContainerImpl {
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
when(rmContext.getRMApps()).thenReturn(rmApps);
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
+ AllocationTagsManager ptm = mock(AllocationTagsManager.class);
+ when(rmContext.getAllocationTagsManager()).thenReturn(ptm);
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(
YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
@@ -209,6 +212,8 @@ public class TestRMContainerImpl {
when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
+ AllocationTagsManager ptm = mock(AllocationTagsManager.class);
+ when(rmContext.getAllocationTagsManager()).thenReturn(ptm);
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(
@@ -367,4 +372,123 @@ public class TestRMContainerImpl {
verify(publisher, times(1)).containerCreated(any(RMContainer.class), anyLong());
verify(publisher, times(1)).containerFinished(any(RMContainer.class), anyLong());
}
+
+ @Test
+ public void testContainerTransitionNotifyPlacementTagsManager()
+ throws Exception {
+ DrainDispatcher drainDispatcher = new DrainDispatcher();
+ EventHandler<RMAppAttemptEvent> appAttemptEventHandler = mock(
+ EventHandler.class);
+ EventHandler generic = mock(EventHandler.class);
+ drainDispatcher.register(RMAppAttemptEventType.class,
+ appAttemptEventHandler);
+ drainDispatcher.register(RMNodeEventType.class, generic);
+ drainDispatcher.init(new YarnConfiguration());
+ drainDispatcher.start();
+ NodeId nodeId = BuilderUtils.newNodeId("host", 3425);
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+ ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+ appId, 1);
+ ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
+ ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class);
+
+ Resource resource = BuilderUtils.newResource(512, 1);
+ Priority priority = BuilderUtils.newPriority(5);
+
+ Container container = BuilderUtils.newContainer(containerId, nodeId,
+ "host:3465", resource, priority, null);
+ ConcurrentMap<ApplicationId, RMApp> rmApps =
+ spy(new ConcurrentHashMap<ApplicationId, RMApp>());
+ RMApp rmApp = mock(RMApp.class);
+ when(rmApp.getRMAppAttempt(Matchers.any())).thenReturn(null);
+ Mockito.doReturn(rmApp).when(rmApps).get(Matchers.any());
+
+ RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
+ SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
+ AllocationTagsManager tagsManager = new AllocationTagsManager();
+ RMContext rmContext = mock(RMContext.class);
+ when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
+ when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
+ when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
+ when(rmContext.getRMApps()).thenReturn(rmApps);
+ when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
+ when(rmContext.getAllocationTagsManager()).thenReturn(tagsManager);
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setBoolean(
+ YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
+ true);
+ when(rmContext.getYarnConfiguration()).thenReturn(conf);
+
+ /* First container: ALLOCATED -> KILLED */
+ RMContainer rmContainer = new RMContainerImpl(container,
+ SchedulerRequestKey.extractFrom(container), appAttemptId,
+ nodeId, "user", rmContext);
+
+ Assert.assertEquals(0,
+ tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+
+ rmContainer.handle(new RMContainerEvent(containerId,
+ RMContainerEventType.START));
+
+ Assert.assertEquals(1,
+ tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+
+ rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus
+ .newInstance(containerId, ContainerState.COMPLETE, "", 0),
+ RMContainerEventType.KILL));
+
+ Assert.assertEquals(0,
+ tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+
+ /* Second container: ACQUIRED -> FINISHED */
+ rmContainer = new RMContainerImpl(container,
+ SchedulerRequestKey.extractFrom(container), appAttemptId,
+ nodeId, "user", rmContext);
+
+ Assert.assertEquals(0,
+ tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+
+ rmContainer.handle(new RMContainerEvent(containerId,
+ RMContainerEventType.START));
+
+ Assert.assertEquals(1,
+ tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+
+ rmContainer.handle(
+ new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED));
+
+ rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus
+ .newInstance(containerId, ContainerState.COMPLETE, "", 0),
+ RMContainerEventType.FINISHED));
+
+ Assert.assertEquals(0,
+ tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+
+ /* Third container: RUNNING -> FINISHED */
+ rmContainer = new RMContainerImpl(container,
+ SchedulerRequestKey.extractFrom(container), appAttemptId,
+ nodeId, "user", rmContext);
+
+ Assert.assertEquals(0,
+ tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+
+ rmContainer.handle(new RMContainerEvent(containerId,
+ RMContainerEventType.START));
+
+ Assert.assertEquals(1,
+ tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+
+ rmContainer.handle(
+ new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED));
+
+ rmContainer.handle(
+ new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED));
+
+ rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus
+ .newInstance(containerId, ContainerState.COMPLETE, "", 0),
+ RMContainerEventType.FINISHED));
+
+ Assert.assertEquals(0,
+ tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f037a13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index e3326c7..61a5555 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -135,6 +136,9 @@ public class TestUtils {
new DefaultResourceCalculator());
rmContext.setScheduler(mockScheduler);
+ AllocationTagsManager ptm = mock(AllocationTagsManager.class);
+ rmContext.setAllocationTagsManager(ptm);
+
return rmContext;
}
@@ -234,6 +238,11 @@ public class TestUtils {
doReturn(id).when(containerId).getContainerId();
return containerId;
}
+
+ public static ContainerId getMockContainerId(int appId, int containerId) {
+ ApplicationAttemptId attemptId = getMockApplicationAttemptId(appId, 1);
+ return ContainerId.newContainerId(attemptId, containerId);
+ }
public static Container getMockContainer(
ContainerId containerId, NodeId nodeId,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f037a13/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
index 3f97b59..4b902a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -234,6 +235,8 @@ public class TestFifoScheduler {
FifoScheduler scheduler = new FifoScheduler();
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
null, containerTokenSecretManager, nmTokenSecretManager, null, scheduler);
+ AllocationTagsManager ptm = mock(AllocationTagsManager.class);
+ rmContext.setAllocationTagsManager(ptm);
rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
rmContext.setRMApplicationHistoryWriter(
mock(RMApplicationHistoryWriter.class));
@@ -312,12 +315,14 @@ public class TestFifoScheduler {
FifoScheduler scheduler = new FifoScheduler();
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
null, containerTokenSecretManager, nmTokenSecretManager, null, scheduler);
+ AllocationTagsManager ptm = mock(AllocationTagsManager.class);
rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class));
((RMContextImpl) rmContext).setYarnConfiguration(new YarnConfiguration());
NullRMNodeLabelsManager nlm = new NullRMNodeLabelsManager();
nlm.init(new Configuration());
rmContext.setNodeLabelManager(nlm);
+ rmContext.setAllocationTagsManager(ptm);
scheduler.setRMContext(rmContext);
((RMContextImpl) rmContext).setScheduler(scheduler);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org