You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2021/10/22 15:33:55 UTC
[hadoop] branch trunk updated: YARN-10930. Introduce universal
capacity resource vector. Contributed by Andras Gyori
This is an automated email from the ASF dual-hosted git repository.
snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 32ecaed YARN-10930. Introduce universal capacity resource vector. Contributed by Andras Gyori
32ecaed is described below
commit 32ecaed9c3c06a48ef01d0437e62e8faccd3e9f3
Author: 9uapaw <gy...@gmail.com>
AuthorDate: Fri Oct 22 17:32:33 2021 +0200
YARN-10930. Introduce universal capacity resource vector. Contributed by Andras Gyori
---
.../scheduler/capacity/AbstractCSQueue.java | 12 +-
.../scheduler/capacity/CSQueue.java | 8 +
.../capacity/CapacitySchedulerConfiguration.java | 19 +-
.../scheduler/capacity/QueueCapacityVector.java | 258 +++++++++++++++++++++
.../scheduler/capacity/ResourceVector.java | 129 +++++++++++
.../capacity/conf/QueueCapacityConfigParser.java | 215 +++++++++++++++++
.../capacity/TestQueueCapacityVector.java | 111 +++++++++
.../scheduler/capacity/TestResourceVector.java | 118 ++++++++++
.../conf/TestQueueCapacityConfigParser.java | 241 +++++++++++++++++++
9 files changed, 1108 insertions(+), 3 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 2d9bf85..e3feb51 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -131,6 +131,8 @@ public abstract class AbstractCSQueue implements CSQueue {
protected CapacityConfigType capacityConfigType =
CapacityConfigType.NONE;
+ protected Map<String, QueueCapacityVector> configuredCapacityVectors;
+
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
protected CapacitySchedulerContext csContext;
@@ -374,6 +376,8 @@ public abstract class AbstractCSQueue implements CSQueue {
this.reservationsContinueLooking =
configuration.getReservationContinueLook();
+ this.configuredCapacityVectors = csContext.getConfiguration()
+ .parseConfiguredResourceVector(queuePath, configuredNodeLabels);
// Update metrics
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
@@ -688,6 +692,12 @@ public abstract class AbstractCSQueue implements CSQueue {
minimumAllocation);
}
+ @Override
+ public QueueCapacityVector getConfiguredCapacityVector(
+ String label) {
+ return configuredCapacityVectors.get(label);
+ }
+
private void initializeQueueState(CapacitySchedulerConfiguration configuration) {
QueueState previousState = getState();
QueueState configuredState = configuration
@@ -978,7 +988,7 @@ public abstract class AbstractCSQueue implements CSQueue {
"Default lifetime " + defaultAppLifetime
+ " can't exceed maximum lifetime " + myMaxAppLifetime);
}
-
+
if (defaultAppLifetime <= 0) {
defaultAppLifetime = myMaxAppLifetime;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 03a5afb..2acc1d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -420,6 +420,14 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
Resource getEffectiveCapacity(String label);
/**
+ * Get configured capacity resource vector parsed from the capacity config
+ * of the queue.
+ * @param label node label (partition)
+ * @return capacity resource vector
+ */
+ QueueCapacityVector getConfiguredCapacityVector(String label);
+
+ /**
* Get effective capacity of queue. If min/max resource is configured,
* preference will be given to absolute configuration over normal capacity.
* Also round down the result to normalizeDown.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index e7b1cbd..615a4d0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.server.resourcemanager.placement.csmappingrule.MappingRule;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.QueueCapacityConfigParser;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.MappingRuleCreator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,9 +74,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import java.util.Set;
public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration {
@@ -413,6 +414,10 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final String MAPPING_RULE_FORMAT_DEFAULT =
MAPPING_RULE_FORMAT_LEGACY;
+
+ private static final QueueCapacityConfigParser queueCapacityConfigParser
+ = new QueueCapacityConfigParser();
+
private ConfigurationProperties configurationProperties;
/**
@@ -454,7 +459,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
return PREFIX + "user." + user + DOT;
}
- private String getNodeLabelPrefix(String queue, String label) {
+ public static String getNodeLabelPrefix(String queue, String label) {
if (label.equals(CommonNodeLabelsManager.NO_LABEL)) {
return getQueuePrefix(queue);
}
@@ -2571,6 +2576,16 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
updateMinMaxResourceToConf(label, queue, resource, MAXIMUM_CAPACITY);
}
+ public Map<String, QueueCapacityVector> parseConfiguredResourceVector(
+ String queuePath, Set<String> labels) {
+ Map<String, QueueCapacityVector> queueResourceVectors = new HashMap<>();
+ for (String label : labels) {
+ queueResourceVectors.put(label, queueCapacityConfigParser.parse(this, queuePath, label));
+ }
+
+ return queueResourceVectors;
+ }
+
private void updateMinMaxResourceToConf(String label, String queue,
Resource resource, String type) {
if (queue.equals("root")) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacityVector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacityVector.java
new file mode 100644
index 0000000..9f6e0e2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacityVector.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Contains capacity values with calculation types associated for each
+ * resource.
+ */
+public class QueueCapacityVector implements
+ Iterable<QueueCapacityVector.QueueCapacityVectorEntry> {
+ private static final String START_PARENTHESES = "[";
+ private static final String END_PARENTHESES = "]";
+ private static final String RESOURCE_DELIMITER = ",";
+ private static final String VALUE_DELIMITER = "=";
+
+ private final ResourceVector resource;
+ private final Map<String, QueueCapacityType> capacityTypes
+ = new HashMap<>();
+ private final Map<QueueCapacityType, Set<String>> capacityTypePerResource
+ = new HashMap<>();
+
+ public QueueCapacityVector() {
+ this.resource = new ResourceVector();
+ }
+
+ private QueueCapacityVector(ResourceVector resource) {
+ this.resource = resource;
+ }
+
+ /**
+ * Creates a zero {@code QueueCapacityVector}. The resources are defined
+ * in absolute capacity type by default.
+ *
+ * @return zero capacity vector
+ */
+ public static QueueCapacityVector newInstance() {
+ QueueCapacityVector newCapacityVector =
+ new QueueCapacityVector(ResourceVector.newInstance());
+ for (Map.Entry<String, Float> resourceEntry : newCapacityVector.resource) {
+ newCapacityVector.storeResourceType(resourceEntry.getKey(),
+ QueueCapacityType.ABSOLUTE);
+ }
+
+ return newCapacityVector;
+ }
+
+ /**
+ * Creates a uniform and homogeneous {@code QueueCapacityVector}.
+ * The resources are defined in absolute capacity type by default.
+ *
+ * @param value value to be set for each resource
+ * @param capacityType capacity type to be set for each resource
+ * @return uniform capacity vector
+ */
+ public static QueueCapacityVector of(
+ float value, QueueCapacityType capacityType) {
+ QueueCapacityVector newCapacityVector =
+ new QueueCapacityVector(ResourceVector.of(value));
+ for (Map.Entry<String, Float> resourceEntry : newCapacityVector.resource) {
+ newCapacityVector.storeResourceType(resourceEntry.getKey(), capacityType);
+ }
+
+ return newCapacityVector;
+ }
+
+ public QueueCapacityVectorEntry getResource(String resourceName) {
+ return new QueueCapacityVectorEntry(capacityTypes.get(resourceName),
+ resourceName, resource.getValue(resourceName));
+ }
+
+ /**
+ * Returns the number of resources defined for this vector.
+ *
+ * @return number of resources
+ */
+ public int getResourceCount() {
+ return capacityTypes.size();
+ }
+
+ /**
+ * Set the value and capacity type of a resource.
+ *
+ * @param resourceName name of the resource
+ * @param value value of the resource
+ * @param capacityType type of the resource
+ */
+ public void setResource(String resourceName, float value,
+ QueueCapacityType capacityType) {
+ // Necessary due to backward compatibility (memory = memory-mb)
+ String convertedResourceName = resourceName;
+ if (resourceName.equals("memory")) {
+ convertedResourceName = ResourceInformation.MEMORY_URI;
+ }
+ resource.setValue(convertedResourceName, value);
+ storeResourceType(convertedResourceName, capacityType);
+ }
+
+ /**
+ * A shorthand to retrieve the value stored for the memory resource.
+ *
+ * @return value of memory resource
+ */
+ public float getMemory() {
+ return resource.getValue(ResourceInformation.MEMORY_URI);
+ }
+
+ /**
+ * Returns the name of all resources that are defined in the given capacity
+ * type.
+ *
+ * @param capacityType the capacity type of the resources
+ * @return all resource names for the given capacity type
+ */
+ public Set<String> getResourceNamesByCapacityType(
+ QueueCapacityType capacityType) {
+ return capacityTypePerResource.getOrDefault(capacityType,
+ Collections.emptySet());
+ }
+
+ public boolean isResourceOfType(
+ String resourceName, QueueCapacityType capacityType) {
+ return capacityTypes.containsKey(resourceName) &&
+ capacityTypes.get(resourceName).equals(capacityType);
+ }
+
+ @Override
+ public Iterator<QueueCapacityVectorEntry> iterator() {
+ return new Iterator<QueueCapacityVectorEntry>() {
+ private final Iterator<Map.Entry<String, Float>> resources =
+ resource.iterator();
+ private int i = 0;
+
+ @Override
+ public boolean hasNext() {
+ return resources.hasNext() && capacityTypes.size() > i;
+ }
+
+ @Override
+ public QueueCapacityVectorEntry next() {
+ Map.Entry<String, Float> resourceInformation = resources.next();
+ i++;
+ return new QueueCapacityVectorEntry(
+ capacityTypes.get(resourceInformation.getKey()),
+ resourceInformation.getKey(), resourceInformation.getValue());
+ }
+ };
+ }
+
+ /**
+ * Returns a set of all capacity type defined for this vector.
+ *
+ * @return capacity types
+ */
+ public Set<QueueCapacityType> getDefinedCapacityTypes() {
+ return capacityTypePerResource.keySet();
+ }
+
+ private void storeResourceType(
+ String resourceName, QueueCapacityType resourceType) {
+ if (capacityTypes.get(resourceName) != null
+ && !capacityTypes.get(resourceName).equals(resourceType)) {
+ capacityTypePerResource.get(capacityTypes.get(resourceName))
+ .remove(resourceName);
+ }
+
+ capacityTypePerResource.putIfAbsent(resourceType, new HashSet<>());
+ capacityTypePerResource.get(resourceType).add(resourceName);
+ capacityTypes.put(resourceName, resourceType);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder stringVector = new StringBuilder();
+ stringVector.append(START_PARENTHESES);
+
+ int resourceCount = 0;
+ for (Map.Entry<String, Float> resourceEntry : resource) {
+ resourceCount++;
+ stringVector.append(resourceEntry.getKey())
+ .append(VALUE_DELIMITER)
+ .append(resourceEntry.getValue())
+ .append(capacityTypes.get(resourceEntry.getKey()).postfix);
+ if (resourceCount < capacityTypes.size()) {
+ stringVector.append(RESOURCE_DELIMITER);
+ }
+ }
+
+ stringVector.append(END_PARENTHESES);
+
+ return stringVector.toString();
+ }
+
+ /**
+ * Represents a capacity type associated with its syntax postfix.
+ */
+ public enum QueueCapacityType {
+ PERCENTAGE("%"), ABSOLUTE(""), WEIGHT("w");
+ private final String postfix;
+
+ QueueCapacityType(String postfix) {
+ this.postfix = postfix;
+ }
+
+ public String getPostfix() {
+ return postfix;
+ }
+ }
+
+ public static class QueueCapacityVectorEntry {
+ private final QueueCapacityType vectorResourceType;
+ private final float resourceValue;
+ private final String resourceName;
+
+ public QueueCapacityVectorEntry(QueueCapacityType vectorResourceType,
+ String resourceName, float resourceValue) {
+ this.vectorResourceType = vectorResourceType;
+ this.resourceValue = resourceValue;
+ this.resourceName = resourceName;
+ }
+
+ public QueueCapacityType getVectorResourceType() {
+ return vectorResourceType;
+ }
+
+ public float getResourceValue() {
+ return resourceValue;
+ }
+
+ public String getResourceName() {
+ return resourceName;
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ResourceVector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ResourceVector.java
new file mode 100644
index 0000000..88c09af
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ResourceVector.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Represents a simple resource floating point value storage
+ * grouped by resource names.
+ */
+public class ResourceVector implements Iterable<Map.Entry<String, Float>> {
+ private final Map<String, Float> resourcesByName = new HashMap<>();
+
+ /**
+ * Creates a new {@code ResourceVector} with all pre-defined resources set to
+ * zero.
+ * @return zero resource vector
+ */
+ public static ResourceVector newInstance() {
+ ResourceVector zeroResourceVector = new ResourceVector();
+ for (ResourceInformation resource : ResourceUtils.getResourceTypesArray()) {
+ zeroResourceVector.setValue(resource.getName(), 0);
+ }
+
+ return zeroResourceVector;
+ }
+
+ /**
+ * Creates a new {@code ResourceVector} with all pre-defined resources set to
+ * the same value.
+ * @param value the value to set all resources to
+ * @return uniform resource vector
+ */
+ public static ResourceVector of(float value) {
+ ResourceVector emptyResourceVector = new ResourceVector();
+ for (ResourceInformation resource : ResourceUtils.getResourceTypesArray()) {
+ emptyResourceVector.setValue(resource.getName(), value);
+ }
+
+ return emptyResourceVector;
+ }
+
+ /**
+ * Creates a new {@code ResourceVector} with the values set in a
+ * {@code Resource} object.
+ * @param resource resource object the resource vector will be based on
+ * @return uniform resource vector
+ */
+ public static ResourceVector of(Resource resource) {
+ ResourceVector resourceVector = new ResourceVector();
+ for (ResourceInformation resourceInformation : resource.getResources()) {
+ resourceVector.setValue(resourceInformation.getName(),
+ resourceInformation.getValue());
+ }
+
+ return resourceVector;
+ }
+
+ /**
+ * Subtract values for each resource defined in the given resource vector.
+ * @param otherResourceVector rhs resource vector of the subtraction
+ */
+ public void subtract(ResourceVector otherResourceVector) {
+ for (Map.Entry<String, Float> resource : otherResourceVector) {
+ setValue(resource.getKey(), getValue(resource.getKey()) - resource.getValue());
+ }
+ }
+
+ /**
+ * Increments the given resource by the specified value.
+ * @param resourceName name of the resource
+ * @param value value to be added to the resource's current value
+ */
+ public void increment(String resourceName, float value) {
+ setValue(resourceName, getValue(resourceName) + value);
+ }
+
+ public Float getValue(String resourceName) {
+ return resourcesByName.get(resourceName);
+ }
+
+ public void setValue(String resourceName, float value) {
+ resourcesByName.put(resourceName, value);
+ }
+
+ @Override
+ public Iterator<Map.Entry<String, Float>> iterator() {
+ return resourcesByName.entrySet().iterator();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ return this.resourcesByName.equals(((ResourceVector) o).resourcesByName);
+ }
+
+ @Override
+ public int hashCode() {
+ return resourcesByName.hashCode();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/QueueCapacityConfigParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/QueueCapacityConfigParser.java
new file mode 100644
index 0000000..28eb33c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/QueueCapacityConfigParser.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityType;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * A class that parses {@code QueueCapacityVector} from the capacity
+ * configuration property set for a queue.
+ *
+ * A new syntax for capacity property could be implemented, by creating a parser
+ * with a regex to match the pattern and a method that creates a
+ * {@code QueueCapacityVector} from the matched pattern.
+ * Extending the parsers field with a {@code Parser} object in the constructor
+ * is needed in this case.
+ *
+ * A new capacity type for the existing parsers could be added by extending
+ * the {@code QueueCapacityVector.QueueCapacityType} with a new type and its
+ * associated postfix symbol.
+ */
+public class QueueCapacityConfigParser {
+ private static final String UNIFORM_REGEX = "^([0-9.]+)(.*)";
+ private static final String RESOURCE_REGEX = "^\\[([\\w\\.,\\-_%\\ /]+=[\\w\\.,\\-_%\\ /]+)+\\]$";
+
+ private static final Pattern RESOURCE_PATTERN = Pattern.compile(RESOURCE_REGEX);
+ private static final Pattern UNIFORM_PATTERN = Pattern.compile(UNIFORM_REGEX);
+ public static final String FLOAT_DIGIT_REGEX = "[0-9.]";
+
+ private final List<Parser> parsers = new ArrayList<>();
+
+ public QueueCapacityConfigParser() {
+ parsers.add(new Parser(RESOURCE_PATTERN, this::heterogeneousParser));
+ parsers.add(new Parser(UNIFORM_PATTERN, this::uniformParser));
+ }
+
+ /**
+ * Creates a {@code QueueCapacityVector} parsed from the capacity configuration
+ * property set for a queue.
+ * @param conf configuration object
+ * @param queuePath queue for which the capacity property is parsed
+ * @param label node label
+ * @return a parsed capacity vector
+ */
+ public QueueCapacityVector parse(CapacitySchedulerConfiguration conf,
+ String queuePath, String label) {
+
+ if (queuePath.equals(CapacitySchedulerConfiguration.ROOT)) {
+ return QueueCapacityVector.of(100f, QueueCapacityType.PERCENTAGE);
+ }
+
+ String propertyName = CapacitySchedulerConfiguration.getNodeLabelPrefix(
+ queuePath, label) + CapacitySchedulerConfiguration.CAPACITY;
+ String capacityString = conf.get(propertyName);
+
+ if (capacityString == null) {
+ return new QueueCapacityVector();
+ }
+ // Trim all spaces from capacity string
+ capacityString = capacityString.replaceAll(" ", "");
+
+ for (Parser parser : parsers) {
+ Matcher matcher = parser.regex.matcher(capacityString);
+ if (matcher.find()) {
+ return parser.parser.apply(matcher);
+ }
+ }
+
+ return new QueueCapacityVector();
+ }
+
+ /**
+ * A parser method that is usable on uniform capacity values e.g. percentage or
+ * weight.
+ * @param matcher a regex matcher that contains parsed value and its possible
+ * suffix
+ * @return a parsed capacity vector
+ */
+ private QueueCapacityVector uniformParser(Matcher matcher) {
+ QueueCapacityType capacityType = null;
+ String value = matcher.group(1);
+ if (matcher.groupCount() == 2) {
+ String matchedSuffix = matcher.group(2);
+ for (QueueCapacityType suffix : QueueCapacityType.values()) {
+ // Absolute uniform syntax is not supported
+ if (suffix.equals(QueueCapacityType.ABSOLUTE)) {
+ continue;
+ }
+ // when capacity is given in percentage, we do not need % symbol
+ String uniformSuffix = suffix.getPostfix().replaceAll("%", "");
+ if (uniformSuffix.equals(matchedSuffix)) {
+ capacityType = suffix;
+ }
+ }
+ }
+
+ if (capacityType == null) {
+ return new QueueCapacityVector();
+ }
+
+ return QueueCapacityVector.of(Float.parseFloat(value), capacityType);
+ }
+
+ /**
+ * A parser method that is usable on resource capacity values e.g. mixed or
+ * absolute resource.
+ * @param matcher a regex matcher that contains the matched resource string
+ * @return a parsed capacity vector
+ */
+ private QueueCapacityVector heterogeneousParser(Matcher matcher) {
+ QueueCapacityVector capacityVector = QueueCapacityVector.newInstance();
+
+ /*
+ * Absolute resource configuration for a queue will be grouped by "[]".
+ * Syntax of absolute resource config could be like below
+ * "memory=4Gi vcores=2". Ideally this means "4GB of memory and 2 vcores".
+ */
+ // Get the sub-group.
+ String bracketedGroup = matcher.group(0);
+ // Get the string inside starting and closing []
+ bracketedGroup = bracketedGroup.substring(1, bracketedGroup.length() - 1);
+ // Split by comma and equals delimiter eg. the string memory=1024,vcores=6
+ // is converted to an array of array as {{memory,1024}, {vcores, 6}}
+ for (String kvPair : bracketedGroup.trim().split(",")) {
+ String[] splits = kvPair.split("=");
+
+ // Ensure that each sub string is key value pair separated by '='.
+ if (splits.length > 1) {
+ setCapacityVector(capacityVector, splits[0], splits[1]);
+ }
+ }
+
+ // Memory always have to be defined
+ if (capacityVector.getMemory() == 0L) {
+ return new QueueCapacityVector();
+ }
+
+ return capacityVector;
+ }
+
+ private void setCapacityVector(
+ QueueCapacityVector resource, String resourceName, String resourceValue) {
+ QueueCapacityType capacityType = QueueCapacityType.ABSOLUTE;
+
+ // Extract suffix from a value e.g. for 6w extract w
+ String suffix = resourceValue.replaceAll(FLOAT_DIGIT_REGEX, "");
+ if (!resourceValue.endsWith(suffix)) {
+ return;
+ }
+
+ float parsedResourceValue = Float.parseFloat(resourceValue.substring(
+ 0, resourceValue.length() - suffix.length()));
+ float convertedValue = parsedResourceValue;
+
+ if (!suffix.isEmpty() && UnitsConversionUtil.KNOWN_UNITS.contains(suffix)) {
+ // Convert all incoming units to MB if units is configured.
+ convertedValue = UnitsConversionUtil.convert(suffix, "Mi", (long) parsedResourceValue);
+ } else {
+ for (QueueCapacityType capacityTypeSuffix : QueueCapacityType.values()) {
+ if (capacityTypeSuffix.getPostfix().equals(suffix)) {
+ capacityType = capacityTypeSuffix;
+ }
+ }
+ }
+
+ resource.setResource(resourceName, convertedValue, capacityType);
+ }
+
+ /**
+ * Checks whether the given capacity string is in a capacity vector compatible
+ * format.
+ * @param configuredCapacity capacity string
+ * @return true, if capacity string is in capacity vector format,
+ * false otherwise
+ */
+ public boolean isCapacityVectorFormat(String configuredCapacity) {
+ return configuredCapacity != null
+ && RESOURCE_PATTERN.matcher(configuredCapacity).find();
+ }
+
+ private static class Parser {
+ private final Pattern regex;
+ private final Function<Matcher, QueueCapacityVector> parser;
+
+ Parser(Pattern regex, Function<Matcher, QueueCapacityVector> parser) {
+ this.regex = regex;
+ this.parser = parser;
+ }
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacityVector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacityVector.java
new file mode 100644
index 0000000..058e14b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacityVector.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.util.Lists;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
+import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
+
+public class TestQueueCapacityVector {
+ private static final String CUSTOM_RESOURCE = "custom";
+ public static final String MIXED_CAPACITY_VECTOR_STRING =
+ "[custom=3.0,memory-mb=10.0w,vcores=6.0%]";
+
+ private final YarnConfiguration conf = new YarnConfiguration();
+
+ @Before
+ public void setUp() {
+ conf.set(YarnConfiguration.RESOURCE_TYPES, CUSTOM_RESOURCE);
+ ResourceUtils.resetResourceTypes(conf);
+ }
+
+ @Test
+ public void getResourceNamesByCapacityType() {
+ QueueCapacityVector capacityVector = QueueCapacityVector.newInstance();
+
+ capacityVector.setResource(MEMORY_URI, 10, QueueCapacityType.PERCENTAGE);
+ capacityVector.setResource(VCORES_URI, 6, QueueCapacityType.PERCENTAGE);
+
+ // custom is not set, defaults to 0
+ Assert.assertEquals(1, capacityVector.getResourceNamesByCapacityType(
+ QueueCapacityType.ABSOLUTE).size());
+ Assert.assertTrue(capacityVector.getResourceNamesByCapacityType(
+ QueueCapacityType.ABSOLUTE).contains(CUSTOM_RESOURCE));
+
+ Assert.assertEquals(2, capacityVector.getResourceNamesByCapacityType(
+ QueueCapacityType.PERCENTAGE).size());
+ Assert.assertTrue(capacityVector.getResourceNamesByCapacityType(
+ QueueCapacityType.PERCENTAGE).contains(VCORES_URI));
+ Assert.assertTrue(capacityVector.getResourceNamesByCapacityType(
+ QueueCapacityType.PERCENTAGE).contains(MEMORY_URI));
+ Assert.assertEquals(10, capacityVector.getResource(MEMORY_URI).getResourceValue(), EPSILON);
+ Assert.assertEquals(6, capacityVector.getResource(VCORES_URI).getResourceValue(), EPSILON);
+ }
+
+ @Test
+ public void isResourceOfType() {
+ QueueCapacityVector capacityVector = QueueCapacityVector.newInstance();
+
+ capacityVector.setResource(MEMORY_URI, 10, QueueCapacityType.WEIGHT);
+ capacityVector.setResource(VCORES_URI, 6, QueueCapacityType.PERCENTAGE);
+ capacityVector.setResource(CUSTOM_RESOURCE, 3, QueueCapacityType.ABSOLUTE);
+
+ Assert.assertTrue(capacityVector.isResourceOfType(MEMORY_URI, QueueCapacityType.WEIGHT));
+ Assert.assertTrue(capacityVector.isResourceOfType(VCORES_URI, QueueCapacityType.PERCENTAGE));
+ Assert.assertTrue(capacityVector.isResourceOfType(CUSTOM_RESOURCE, QueueCapacityType.ABSOLUTE));
+ }
+
+ @Test
+ public void testIterator() {
+ QueueCapacityVector capacityVector = QueueCapacityVector.newInstance();
+ List<QueueCapacityVectorEntry> entries = Lists.newArrayList(capacityVector);
+
+ Assert.assertEquals(3, entries.size());
+
+ QueueCapacityVector emptyCapacityVector = new QueueCapacityVector();
+ List<QueueCapacityVectorEntry> emptyEntries = Lists.newArrayList(emptyCapacityVector);
+
+ Assert.assertEquals(0, emptyEntries.size());
+ }
+
+ @Test
+ public void testToString() {
+ QueueCapacityVector capacityVector = QueueCapacityVector.newInstance();
+
+ capacityVector.setResource(MEMORY_URI, 10, QueueCapacityType.WEIGHT);
+ capacityVector.setResource(VCORES_URI, 6, QueueCapacityType.PERCENTAGE);
+ capacityVector.setResource(CUSTOM_RESOURCE, 3, QueueCapacityType.ABSOLUTE);
+
+ Assert.assertEquals(MIXED_CAPACITY_VECTOR_STRING, capacityVector.toString());
+
+ QueueCapacityVector emptyCapacityVector = new QueueCapacityVector();
+ Assert.assertEquals("[]", emptyCapacityVector.toString());
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestResourceVector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestResourceVector.java
new file mode 100644
index 0000000..fd6edb1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestResourceVector.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
+import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
+
+public class TestResourceVector {
+ private final static String CUSTOM_RESOURCE = "custom";
+
+ private final YarnConfiguration conf = new YarnConfiguration();
+
+ @Before
+ public void setUp() {
+ conf.set(YarnConfiguration.RESOURCE_TYPES, CUSTOM_RESOURCE);
+ ResourceUtils.resetResourceTypes(conf);
+ }
+
+ @Test
+ public void testCreation() {
+ ResourceVector zeroResourceVector = ResourceVector.newInstance();
+ Assert.assertEquals(0, zeroResourceVector.getValue(MEMORY_URI), EPSILON);
+ Assert.assertEquals(0, zeroResourceVector.getValue(VCORES_URI), EPSILON);
+ Assert.assertEquals(0, zeroResourceVector.getValue(CUSTOM_RESOURCE), EPSILON);
+
+ ResourceVector uniformResourceVector = ResourceVector.of(10);
+ Assert.assertEquals(10, uniformResourceVector.getValue(MEMORY_URI), EPSILON);
+ Assert.assertEquals(10, uniformResourceVector.getValue(VCORES_URI), EPSILON);
+ Assert.assertEquals(10, uniformResourceVector.getValue(CUSTOM_RESOURCE), EPSILON);
+
+ Map<String, Long> customResources = new HashMap<>();
+ customResources.put(CUSTOM_RESOURCE, 2L);
+ Resource resource = Resource.newInstance(10, 5, customResources);
+ ResourceVector resourceVectorFromResource = ResourceVector.of(resource);
+ Assert.assertEquals(10, resourceVectorFromResource.getValue(MEMORY_URI), EPSILON);
+ Assert.assertEquals(5, resourceVectorFromResource.getValue(VCORES_URI), EPSILON);
+ Assert.assertEquals(2, resourceVectorFromResource.getValue(CUSTOM_RESOURCE), EPSILON);
+ }
+
+ @Test
+ public void testSubtract() {
+ ResourceVector lhsResourceVector = ResourceVector.of(13);
+ ResourceVector rhsResourceVector = ResourceVector.of(5);
+ lhsResourceVector.subtract(rhsResourceVector);
+
+ Assert.assertEquals(8, lhsResourceVector.getValue(MEMORY_URI), EPSILON);
+ Assert.assertEquals(8, lhsResourceVector.getValue(VCORES_URI), EPSILON);
+ Assert.assertEquals(8, lhsResourceVector.getValue(CUSTOM_RESOURCE), EPSILON);
+
+ ResourceVector negativeResourceVector = ResourceVector.of(-100);
+
+ // Check whether overflow causes any issues
+ negativeResourceVector.subtract(ResourceVector.of(Float.MAX_VALUE));
+ Assert.assertEquals(-Float.MAX_VALUE, negativeResourceVector.getValue(MEMORY_URI), EPSILON);
+ Assert.assertEquals(-Float.MAX_VALUE, negativeResourceVector.getValue(VCORES_URI), EPSILON);
+ Assert.assertEquals(-Float.MAX_VALUE, negativeResourceVector.getValue(CUSTOM_RESOURCE),
+ EPSILON);
+
+ }
+
+ @Test
+ public void testIncrement() {
+ ResourceVector resourceVector = ResourceVector.of(13);
+ resourceVector.increment(MEMORY_URI, 5);
+
+ Assert.assertEquals(18, resourceVector.getValue(MEMORY_URI), EPSILON);
+ Assert.assertEquals(13, resourceVector.getValue(VCORES_URI), EPSILON);
+ Assert.assertEquals(13, resourceVector.getValue(CUSTOM_RESOURCE), EPSILON);
+
+ // Check whether overflow causes any issues
+ ResourceVector maxFloatResourceVector = ResourceVector.of(Float.MAX_VALUE);
+ maxFloatResourceVector.increment(MEMORY_URI, 100);
+ Assert.assertEquals(Float.MAX_VALUE, maxFloatResourceVector.getValue(MEMORY_URI), EPSILON);
+ }
+
+ @Test
+ public void testEquals() {
+ ResourceVector resourceVector = ResourceVector.of(13);
+ ResourceVector resourceVectorOther = ResourceVector.of(14);
+ Resource resource = Resource.newInstance(13, 13);
+
+ Assert.assertNotEquals(null, resourceVector);
+ Assert.assertNotEquals(resourceVectorOther, resourceVector);
+ Assert.assertNotEquals(resource, resourceVector);
+
+ ResourceVector resourceVectorOne = ResourceVector.of(1);
+ resourceVectorOther.subtract(resourceVectorOne);
+
+ Assert.assertEquals(resourceVectorOther, resourceVector);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestQueueCapacityConfigParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestQueueCapacityConfigParser.java
new file mode 100644
index 0000000..1aba816
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestQueueCapacityConfigParser.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import org.apache.hadoop.util.Lists;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityType;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
+import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
+import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;
+import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetricsForCustomResources.GB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
+
+public class TestQueueCapacityConfigParser {
+
+ private static final String ALL_RESOURCE_TEMPLATE = "[memory-mb=%s, vcores=%s, yarn.io/gpu=%s]";
+ private static final String MEMORY_VCORE_TEMPLATE = "[memory-mb=%s, vcores=%s]";
+
+ private static final String MEMORY_ABSOLUTE = "12Gi";
+ private static final float VCORE_ABSOLUTE = 6;
+ private static final float GPU_ABSOLUTE = 10;
+
+ private static final float PERCENTAGE_VALUE = 50f;
+ private static final float MEMORY_MIXED = 1024;
+ private static final float WEIGHT_VALUE = 6;
+
+ private static final String QUEUE = "root.test";
+
+ private static final String ABSOLUTE_RESOURCE = String.format(
+ ALL_RESOURCE_TEMPLATE, MEMORY_ABSOLUTE, VCORE_ABSOLUTE, GPU_ABSOLUTE);
+ private static final String ABSOLUTE_RESOURCE_MEMORY_VCORE = String.format(
+ MEMORY_VCORE_TEMPLATE, MEMORY_ABSOLUTE, VCORE_ABSOLUTE);
+ private static final String MIXED_RESOURCE = String.format(
+ ALL_RESOURCE_TEMPLATE, MEMORY_MIXED, PERCENTAGE_VALUE + "%", WEIGHT_VALUE + "w");
+ private static final String RESOURCE_TYPES = GPU_URI;
+
+ public static final String NONEXISTINGSUFFIX = "50nonexistingsuffix";
+ public static final String EMPTY_BRACKET = "[]";
+ public static final String INVALID_CAPACITY_BRACKET = "[invalid]";
+ public static final String INVALID_CAPACITY_FORMAT = "[memory-100,vcores-60]";
+
+ private final QueueCapacityConfigParser capacityConfigParser
+ = new QueueCapacityConfigParser();
+
+ @Test
+ public void testPercentageCapacityConfig() {
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ conf.setCapacity(QUEUE, PERCENTAGE_VALUE);
+
+ QueueCapacityVector percentageCapacityVector = capacityConfigParser.parse(conf, QUEUE,
+ NO_LABEL);
+ QueueCapacityVectorEntry memory = percentageCapacityVector.getResource(MEMORY_URI);
+ QueueCapacityVectorEntry vcore = percentageCapacityVector.getResource(VCORES_URI);
+
+ Assert.assertEquals(QueueCapacityType.PERCENTAGE, memory.getVectorResourceType());
+ Assert.assertEquals(PERCENTAGE_VALUE, memory.getResourceValue(), EPSILON);
+
+ Assert.assertEquals(QueueCapacityType.PERCENTAGE, vcore.getVectorResourceType());
+ Assert.assertEquals(PERCENTAGE_VALUE, vcore.getResourceValue(), EPSILON);
+
+ QueueCapacityVector rootCapacityVector = capacityConfigParser.parse(conf,
+ CapacitySchedulerConfiguration.ROOT, NO_LABEL);
+
+ QueueCapacityVectorEntry memoryRoot = rootCapacityVector.getResource(MEMORY_URI);
+ QueueCapacityVectorEntry vcoreRoot = rootCapacityVector.getResource(VCORES_URI);
+
+ Assert.assertEquals(QueueCapacityType.PERCENTAGE, memoryRoot.getVectorResourceType());
+ Assert.assertEquals(100f, memoryRoot.getResourceValue(), EPSILON);
+
+ Assert.assertEquals(QueueCapacityType.PERCENTAGE, vcoreRoot.getVectorResourceType());
+ Assert.assertEquals(100f, vcoreRoot.getResourceValue(), EPSILON);
+ }
+
+ @Test
+ public void testWeightCapacityConfig() {
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ conf.setNonLabeledQueueWeight(QUEUE, WEIGHT_VALUE);
+
+ QueueCapacityVector weightCapacityVector = capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
+
+ QueueCapacityVectorEntry memory = weightCapacityVector.getResource(MEMORY_URI);
+ QueueCapacityVectorEntry vcore = weightCapacityVector.getResource(VCORES_URI);
+
+ Assert.assertEquals(QueueCapacityType.WEIGHT, memory.getVectorResourceType());
+ Assert.assertEquals(WEIGHT_VALUE, memory.getResourceValue(), EPSILON);
+
+ Assert.assertEquals(QueueCapacityType.WEIGHT, vcore.getVectorResourceType());
+ Assert.assertEquals(WEIGHT_VALUE, vcore.getResourceValue(), EPSILON);
+ }
+
+ @Test
+ public void testAbsoluteCapacityVectorConfig() {
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE) +
+ CapacitySchedulerConfiguration.CAPACITY, ABSOLUTE_RESOURCE);
+ conf.set(YarnConfiguration.RESOURCE_TYPES, RESOURCE_TYPES);
+ ResourceUtils.resetResourceTypes(conf);
+
+ QueueCapacityVector absoluteCapacityVector = capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
+
+ Assert.assertEquals(QueueCapacityType.ABSOLUTE, absoluteCapacityVector.getResource(MEMORY_URI)
+ .getVectorResourceType());
+ Assert.assertEquals(12 * GB, absoluteCapacityVector.getResource(MEMORY_URI)
+ .getResourceValue(), EPSILON);
+
+ Assert.assertEquals(QueueCapacityType.ABSOLUTE, absoluteCapacityVector.getResource(VCORES_URI)
+ .getVectorResourceType());
+ Assert.assertEquals(VCORE_ABSOLUTE, absoluteCapacityVector.getResource(VCORES_URI)
+ .getResourceValue(), EPSILON);
+
+ Assert.assertEquals(QueueCapacityType.ABSOLUTE, absoluteCapacityVector.getResource(GPU_URI)
+ .getVectorResourceType());
+ Assert.assertEquals(GPU_ABSOLUTE, absoluteCapacityVector.getResource(GPU_URI)
+ .getResourceValue(), EPSILON);
+
+ conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE) +
+ CapacitySchedulerConfiguration.CAPACITY, ABSOLUTE_RESOURCE_MEMORY_VCORE);
+ QueueCapacityVector withoutGpuVector = capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
+
+ Assert.assertEquals(3, withoutGpuVector.getResourceCount());
+ Assert.assertEquals(0f, withoutGpuVector.getResource(GPU_URI).getResourceValue(), EPSILON);
+ }
+
+ @Test
+ public void testMixedCapacityConfig() {
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ + CapacitySchedulerConfiguration.CAPACITY, MIXED_RESOURCE);
+ conf.set(YarnConfiguration.RESOURCE_TYPES, RESOURCE_TYPES);
+ ResourceUtils.resetResourceTypes(conf);
+
+ QueueCapacityVector mixedCapacityVector =
+ capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
+
+ Assert.assertEquals(QueueCapacityType.ABSOLUTE,
+ mixedCapacityVector.getResource(MEMORY_URI).getVectorResourceType());
+ Assert.assertEquals(MEMORY_MIXED, mixedCapacityVector.getResource(MEMORY_URI)
+ .getResourceValue(), EPSILON);
+
+ Assert.assertEquals(QueueCapacityType.PERCENTAGE,
+ mixedCapacityVector.getResource(VCORES_URI).getVectorResourceType());
+ Assert.assertEquals(PERCENTAGE_VALUE,
+ mixedCapacityVector.getResource(VCORES_URI).getResourceValue(), EPSILON);
+
+ Assert.assertEquals(QueueCapacityType.WEIGHT,
+ mixedCapacityVector.getResource(GPU_URI).getVectorResourceType());
+ Assert.assertEquals(WEIGHT_VALUE,
+ mixedCapacityVector.getResource(GPU_URI).getResourceValue(), EPSILON);
+
+ // Test undefined capacity type default value
+ conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ + CapacitySchedulerConfiguration.CAPACITY, ABSOLUTE_RESOURCE_MEMORY_VCORE);
+
+ QueueCapacityVector mixedCapacityVectorWithGpuUndefined =
+ capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
+ Assert.assertEquals(QueueCapacityType.ABSOLUTE,
+ mixedCapacityVectorWithGpuUndefined.getResource(MEMORY_URI).getVectorResourceType());
+ Assert.assertEquals(0, mixedCapacityVectorWithGpuUndefined.getResource(GPU_URI)
+ .getResourceValue(), EPSILON);
+
+ }
+
+ @Test
+ public void testInvalidCapacityConfigs() {
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+
+ conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ + CapacitySchedulerConfiguration.CAPACITY, NONEXISTINGSUFFIX);
+ QueueCapacityVector capacityVectorWithInvalidSuffix =
+ capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
+ List<QueueCapacityVectorEntry> entriesWithInvalidSuffix =
+ Lists.newArrayList(capacityVectorWithInvalidSuffix.iterator());
+ Assert.assertEquals(0, entriesWithInvalidSuffix.size());
+
+ conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ + CapacitySchedulerConfiguration.CAPACITY, INVALID_CAPACITY_FORMAT);
+ QueueCapacityVector invalidDelimiterCapacityVector =
+ capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
+ List<QueueCapacityVectorEntry> invalidDelimiterEntries =
+ Lists.newArrayList(invalidDelimiterCapacityVector.iterator());
+ Assert.assertEquals(0, invalidDelimiterEntries.size());
+
+ conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ + CapacitySchedulerConfiguration.CAPACITY, INVALID_CAPACITY_BRACKET);
+ QueueCapacityVector invalidCapacityVector =
+ capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
+ List<QueueCapacityVectorEntry> resources =
+ Lists.newArrayList(invalidCapacityVector.iterator());
+ Assert.assertEquals(0, resources.size());
+
+ conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ + CapacitySchedulerConfiguration.CAPACITY, EMPTY_BRACKET);
+ QueueCapacityVector emptyBracketCapacityVector =
+ capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
+ List<QueueCapacityVectorEntry> emptyEntries =
+ Lists.newArrayList(emptyBracketCapacityVector.iterator());
+ Assert.assertEquals(0, emptyEntries.size());
+
+ conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ + CapacitySchedulerConfiguration.CAPACITY, "");
+ QueueCapacityVector emptyCapacity =
+ capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
+ List<QueueCapacityVectorEntry> emptyResources =
+ Lists.newArrayList(emptyCapacity.iterator());
+ Assert.assertEquals(emptyResources.size(), 0);
+
+ conf.unset(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ + CapacitySchedulerConfiguration.CAPACITY);
+ QueueCapacityVector nonSetCapacity =
+ capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
+ List<QueueCapacityVectorEntry> nonSetResources =
+ Lists.newArrayList(nonSetCapacity.iterator());
+ Assert.assertEquals(nonSetResources.size(), 0);
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org