You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2021/10/22 15:33:55 UTC

[hadoop] branch trunk updated: YARN-10930. Introduce universal capacity resource vector. Contributed by Andras Gyori

This is an automated email from the ASF dual-hosted git repository.

snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 32ecaed  YARN-10930. Introduce universal capacity resource vector. Contributed by Andras Gyori
32ecaed is described below

commit 32ecaed9c3c06a48ef01d0437e62e8faccd3e9f3
Author: 9uapaw <gy...@gmail.com>
AuthorDate: Fri Oct 22 17:32:33 2021 +0200

    YARN-10930. Introduce universal capacity resource vector. Contributed by Andras Gyori
---
 .../scheduler/capacity/AbstractCSQueue.java        |  12 +-
 .../scheduler/capacity/CSQueue.java                |   8 +
 .../capacity/CapacitySchedulerConfiguration.java   |  19 +-
 .../scheduler/capacity/QueueCapacityVector.java    | 258 +++++++++++++++++++++
 .../scheduler/capacity/ResourceVector.java         | 129 +++++++++++
 .../capacity/conf/QueueCapacityConfigParser.java   | 215 +++++++++++++++++
 .../capacity/TestQueueCapacityVector.java          | 111 +++++++++
 .../scheduler/capacity/TestResourceVector.java     | 118 ++++++++++
 .../conf/TestQueueCapacityConfigParser.java        | 241 +++++++++++++++++++
 9 files changed, 1108 insertions(+), 3 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 2d9bf85..e3feb51 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -131,6 +131,8 @@ public abstract class AbstractCSQueue implements CSQueue {
   protected CapacityConfigType capacityConfigType =
       CapacityConfigType.NONE;
 
+  protected Map<String, QueueCapacityVector> configuredCapacityVectors;
+
   private final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
   protected CapacitySchedulerContext csContext;
@@ -374,6 +376,8 @@ public abstract class AbstractCSQueue implements CSQueue {
 
       this.reservationsContinueLooking =
           configuration.getReservationContinueLook();
+      this.configuredCapacityVectors = csContext.getConfiguration()
+          .parseConfiguredResourceVector(queuePath, configuredNodeLabels);
 
       // Update metrics
       CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
@@ -688,6 +692,12 @@ public abstract class AbstractCSQueue implements CSQueue {
         minimumAllocation);
   }
 
+  @Override
+  public QueueCapacityVector getConfiguredCapacityVector(
+      String label) {
+    return configuredCapacityVectors.get(label);
+  }
+
   private void initializeQueueState(CapacitySchedulerConfiguration configuration) {
     QueueState previousState = getState();
     QueueState configuredState = configuration
@@ -978,7 +988,7 @@ public abstract class AbstractCSQueue implements CSQueue {
           "Default lifetime " + defaultAppLifetime
               + " can't exceed maximum lifetime " + myMaxAppLifetime);
     }
-    
+
     if (defaultAppLifetime <= 0) {
       defaultAppLifetime = myMaxAppLifetime;
     }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 03a5afb..2acc1d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -420,6 +420,14 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
   Resource getEffectiveCapacity(String label);
 
   /**
+   * Get configured capacity resource vector parsed from the capacity config
+   * of the queue.
+   * @param label node label (partition)
+   * @return capacity resource vector
+   */
+  QueueCapacityVector getConfiguredCapacityVector(String label);
+
+  /**
    * Get effective capacity of queue. If min/max resource is configured,
    * preference will be given to absolute configuration over normal capacity.
    * Also round down the result to normalizeDown.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index e7b1cbd..615a4d0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.yarn.server.resourcemanager.placement.csmappingrule.MappingRule;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.QueueCapacityConfigParser;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.MappingRuleCreator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,9 +74,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import java.util.Set;
 
 public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration {
 
@@ -413,6 +414,10 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
 
   public static final String MAPPING_RULE_FORMAT_DEFAULT =
       MAPPING_RULE_FORMAT_LEGACY;
+
+  private static final QueueCapacityConfigParser queueCapacityConfigParser
+      = new QueueCapacityConfigParser();
+
   private ConfigurationProperties configurationProperties;
 
   /**
@@ -454,7 +459,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
     return PREFIX + "user." + user + DOT;
   }
 
-  private String getNodeLabelPrefix(String queue, String label) {
+  public static String getNodeLabelPrefix(String queue, String label) {
     if (label.equals(CommonNodeLabelsManager.NO_LABEL)) {
       return getQueuePrefix(queue);
     }
@@ -2571,6 +2576,16 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
     updateMinMaxResourceToConf(label, queue, resource, MAXIMUM_CAPACITY);
   }
 
+  public Map<String, QueueCapacityVector> parseConfiguredResourceVector(
+      String queuePath, Set<String> labels) {
+    Map<String, QueueCapacityVector> queueResourceVectors = new HashMap<>();
+    for (String label : labels) {
+      queueResourceVectors.put(label, queueCapacityConfigParser.parse(this, queuePath, label));
+    }
+
+    return queueResourceVectors;
+  }
+
   private void updateMinMaxResourceToConf(String label, String queue,
       Resource resource, String type) {
     if (queue.equals("root")) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacityVector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacityVector.java
new file mode 100644
index 0000000..9f6e0e2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacityVector.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Contains capacity values with calculation types associated for each
+ * resource.
+ */
+public class QueueCapacityVector implements
+    Iterable<QueueCapacityVector.QueueCapacityVectorEntry> {
+  private static final String START_PARENTHESES = "[";
+  private static final String END_PARENTHESES = "]";
+  private static final String RESOURCE_DELIMITER = ",";
+  private static final String VALUE_DELIMITER = "=";
+
+  private final ResourceVector resource;
+  private final Map<String, QueueCapacityType> capacityTypes
+      = new HashMap<>();
+  private final Map<QueueCapacityType, Set<String>> capacityTypePerResource
+      = new HashMap<>();
+
+  public QueueCapacityVector() {
+    this.resource = new ResourceVector();
+  }
+
+  private QueueCapacityVector(ResourceVector resource) {
+    this.resource = resource;
+  }
+
+  /**
+   * Creates a zero {@code QueueCapacityVector}. The resources are defined
+   * in absolute capacity type by default.
+   *
+   * @return zero capacity vector
+   */
+  public static QueueCapacityVector newInstance() {
+    QueueCapacityVector newCapacityVector =
+        new QueueCapacityVector(ResourceVector.newInstance());
+    for (Map.Entry<String, Float> resourceEntry : newCapacityVector.resource) {
+      newCapacityVector.storeResourceType(resourceEntry.getKey(),
+          QueueCapacityType.ABSOLUTE);
+    }
+
+    return newCapacityVector;
+  }
+
+  /**
+   * Creates a uniform and homogeneous {@code QueueCapacityVector}.
+   * The resources are defined in absolute capacity type by default.
+   *
+   * @param value value to be set for each resource
+   * @param capacityType capacity type to be set for each resource
+   * @return uniform capacity vector
+   */
+  public static QueueCapacityVector of(
+      float value, QueueCapacityType capacityType) {
+    QueueCapacityVector newCapacityVector =
+        new QueueCapacityVector(ResourceVector.of(value));
+    for (Map.Entry<String, Float> resourceEntry : newCapacityVector.resource) {
+      newCapacityVector.storeResourceType(resourceEntry.getKey(), capacityType);
+    }
+
+    return newCapacityVector;
+  }
+
+  public QueueCapacityVectorEntry getResource(String resourceName) {
+    return new QueueCapacityVectorEntry(capacityTypes.get(resourceName),
+        resourceName, resource.getValue(resourceName));
+  }
+
+  /**
+   * Returns the number of resources defined for this vector.
+   *
+   * @return number of resources
+   */
+  public int getResourceCount() {
+    return capacityTypes.size();
+  }
+
+  /**
+   * Set the value and capacity type of a resource.
+   *
+   * @param resourceName name of the resource
+   * @param value        value of the resource
+   * @param capacityType type of the resource
+   */
+  public void setResource(String resourceName, float value,
+                          QueueCapacityType capacityType) {
+    // Necessary due to backward compatibility (memory = memory-mb)
+    String convertedResourceName = resourceName;
+    if (resourceName.equals("memory")) {
+      convertedResourceName = ResourceInformation.MEMORY_URI;
+    }
+    resource.setValue(convertedResourceName, value);
+    storeResourceType(convertedResourceName, capacityType);
+  }
+
+  /**
+   * A shorthand to retrieve the value stored for the memory resource.
+   *
+   * @return value of memory resource
+   */
+  public float getMemory() {
+    return resource.getValue(ResourceInformation.MEMORY_URI);
+  }
+
+  /**
+   * Returns the name of all resources that are defined in the given capacity
+   * type.
+   *
+   * @param capacityType the capacity type of the resources
+   * @return all resource names for the given capacity type
+   */
+  public Set<String> getResourceNamesByCapacityType(
+      QueueCapacityType capacityType) {
+    return capacityTypePerResource.getOrDefault(capacityType,
+        Collections.emptySet());
+  }
+
+  public boolean isResourceOfType(
+      String resourceName, QueueCapacityType capacityType) {
+    return capacityTypes.containsKey(resourceName) &&
+        capacityTypes.get(resourceName).equals(capacityType);
+  }
+
+  @Override
+  public Iterator<QueueCapacityVectorEntry> iterator() {
+    return new Iterator<QueueCapacityVectorEntry>() {
+      private final Iterator<Map.Entry<String, Float>> resources =
+          resource.iterator();
+      private int i = 0;
+
+      @Override
+      public boolean hasNext() {
+        return resources.hasNext() && capacityTypes.size() > i;
+      }
+
+      @Override
+      public QueueCapacityVectorEntry next() {
+        Map.Entry<String, Float> resourceInformation = resources.next();
+        i++;
+        return new QueueCapacityVectorEntry(
+            capacityTypes.get(resourceInformation.getKey()),
+            resourceInformation.getKey(), resourceInformation.getValue());
+      }
+    };
+  }
+
+  /**
+   * Returns a set of all capacity type defined for this vector.
+   *
+   * @return capacity types
+   */
+  public Set<QueueCapacityType> getDefinedCapacityTypes() {
+    return capacityTypePerResource.keySet();
+  }
+
+  private void storeResourceType(
+      String resourceName, QueueCapacityType resourceType) {
+    if (capacityTypes.get(resourceName) != null
+        && !capacityTypes.get(resourceName).equals(resourceType)) {
+      capacityTypePerResource.get(capacityTypes.get(resourceName))
+          .remove(resourceName);
+    }
+
+    capacityTypePerResource.putIfAbsent(resourceType, new HashSet<>());
+    capacityTypePerResource.get(resourceType).add(resourceName);
+    capacityTypes.put(resourceName, resourceType);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder stringVector = new StringBuilder();
+    stringVector.append(START_PARENTHESES);
+
+    int resourceCount = 0;
+    for (Map.Entry<String, Float> resourceEntry : resource) {
+      resourceCount++;
+      stringVector.append(resourceEntry.getKey())
+          .append(VALUE_DELIMITER)
+          .append(resourceEntry.getValue())
+          .append(capacityTypes.get(resourceEntry.getKey()).postfix);
+      if (resourceCount < capacityTypes.size()) {
+        stringVector.append(RESOURCE_DELIMITER);
+      }
+    }
+
+    stringVector.append(END_PARENTHESES);
+
+    return stringVector.toString();
+  }
+
+  /**
+   * Represents a capacity type associated with its syntax postfix.
+   */
+  public enum QueueCapacityType {
+    PERCENTAGE("%"), ABSOLUTE(""), WEIGHT("w");
+    private final String postfix;
+
+    QueueCapacityType(String postfix) {
+      this.postfix = postfix;
+    }
+
+    public String getPostfix() {
+      return postfix;
+    }
+  }
+
+  public static class QueueCapacityVectorEntry {
+    private final QueueCapacityType vectorResourceType;
+    private final float resourceValue;
+    private final String resourceName;
+
+    public QueueCapacityVectorEntry(QueueCapacityType vectorResourceType,
+                                    String resourceName, float resourceValue) {
+      this.vectorResourceType = vectorResourceType;
+      this.resourceValue = resourceValue;
+      this.resourceName = resourceName;
+    }
+
+    public QueueCapacityType getVectorResourceType() {
+      return vectorResourceType;
+    }
+
+    public float getResourceValue() {
+      return resourceValue;
+    }
+
+    public String getResourceName() {
+      return resourceName;
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ResourceVector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ResourceVector.java
new file mode 100644
index 0000000..88c09af
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ResourceVector.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Represents a simple resource floating point value storage
+ * grouped by resource names.
+ */
+public class ResourceVector implements Iterable<Map.Entry<String, Float>> {
+  private final Map<String, Float> resourcesByName = new HashMap<>();
+
+  /**
+   * Creates a new {@code ResourceVector} with all pre-defined resources set to
+   * zero.
+   * @return zero resource vector
+   */
+  public static ResourceVector newInstance() {
+    ResourceVector zeroResourceVector = new ResourceVector();
+    for (ResourceInformation resource : ResourceUtils.getResourceTypesArray()) {
+      zeroResourceVector.setValue(resource.getName(), 0);
+    }
+
+    return zeroResourceVector;
+  }
+
+  /**
+   * Creates a new {@code ResourceVector} with all pre-defined resources set to
+   * the same value.
+   * @param value the value to set all resources to
+   * @return uniform resource vector
+   */
+  public static ResourceVector of(float value) {
+    ResourceVector emptyResourceVector = new ResourceVector();
+    for (ResourceInformation resource : ResourceUtils.getResourceTypesArray()) {
+      emptyResourceVector.setValue(resource.getName(), value);
+    }
+
+    return emptyResourceVector;
+  }
+
+  /**
+   * Creates a new {@code ResourceVector} with the values set in a
+   * {@code Resource} object.
+   * @param resource resource object the resource vector will be based on
+   * @return uniform resource vector
+   */
+  public static ResourceVector of(Resource resource) {
+    ResourceVector resourceVector = new ResourceVector();
+    for (ResourceInformation resourceInformation : resource.getResources()) {
+      resourceVector.setValue(resourceInformation.getName(),
+          resourceInformation.getValue());
+    }
+
+    return resourceVector;
+  }
+
+  /**
+   * Subtract values for each resource defined in the given resource vector.
+   * @param otherResourceVector rhs resource vector of the subtraction
+   */
+  public void subtract(ResourceVector otherResourceVector) {
+    for (Map.Entry<String, Float> resource : otherResourceVector) {
+      setValue(resource.getKey(), getValue(resource.getKey()) - resource.getValue());
+    }
+  }
+
+  /**
+   * Increments the given resource by the specified value.
+   * @param resourceName name of the resource
+   * @param value value to be added to the resource's current value
+   */
+  public void increment(String resourceName, float value) {
+    setValue(resourceName, getValue(resourceName) + value);
+  }
+
+  public Float getValue(String resourceName) {
+    return resourcesByName.get(resourceName);
+  }
+
+  public void setValue(String resourceName, float value) {
+    resourcesByName.put(resourceName, value);
+  }
+
+  @Override
+  public Iterator<Map.Entry<String, Float>> iterator() {
+    return resourcesByName.entrySet().iterator();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    return this.resourcesByName.equals(((ResourceVector) o).resourcesByName);
+  }
+
+  @Override
+  public int hashCode() {
+    return resourcesByName.hashCode();
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/QueueCapacityConfigParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/QueueCapacityConfigParser.java
new file mode 100644
index 0000000..28eb33c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/QueueCapacityConfigParser.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityType;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * A class that parses {@code QueueCapacityVector} from the capacity
+ * configuration property set for a queue.
+ *
+ * A new syntax for capacity property could be implemented, by creating a parser
+ * with a regex to match the pattern and a method that creates a
+ * {@code QueueCapacityVector} from the matched pattern.
+ * Extending the parsers field with a {@code Parser} object in the constructor
+ * is needed in this case.
+ *
+ * A new capacity type for the existing parsers could be added by extending
+ * the {@code QueueCapacityVector.QueueCapacityType} with a new type and its
+ * associated postfix symbol.
+ */
+public class QueueCapacityConfigParser {
+  private static final String UNIFORM_REGEX = "^([0-9.]+)(.*)";
+  private static final String RESOURCE_REGEX = "^\\[([\\w\\.,\\-_%\\ /]+=[\\w\\.,\\-_%\\ /]+)+\\]$";
+
+  private static final Pattern RESOURCE_PATTERN = Pattern.compile(RESOURCE_REGEX);
+  private static final Pattern UNIFORM_PATTERN = Pattern.compile(UNIFORM_REGEX);
+  public static final String FLOAT_DIGIT_REGEX = "[0-9.]";
+
+  private final List<Parser> parsers = new ArrayList<>();
+
+  public QueueCapacityConfigParser() {
+    parsers.add(new Parser(RESOURCE_PATTERN, this::heterogeneousParser));
+    parsers.add(new Parser(UNIFORM_PATTERN, this::uniformParser));
+  }
+
+  /**
+   * Creates a {@code QueueCapacityVector} parsed from the capacity configuration
+   * property set for a queue.
+   * @param conf configuration object
+   * @param queuePath queue for which the capacity property is parsed
+   * @param label node label
+   * @return a parsed capacity vector
+   */
+  public QueueCapacityVector parse(CapacitySchedulerConfiguration conf,
+                                   String queuePath, String label) {
+
+    if (queuePath.equals(CapacitySchedulerConfiguration.ROOT)) {
+      return QueueCapacityVector.of(100f, QueueCapacityType.PERCENTAGE);
+    }
+
+    String propertyName = CapacitySchedulerConfiguration.getNodeLabelPrefix(
+        queuePath, label) + CapacitySchedulerConfiguration.CAPACITY;
+    String capacityString = conf.get(propertyName);
+
+    if (capacityString == null) {
+      return new QueueCapacityVector();
+    }
+    // Trim all spaces from capacity string
+    capacityString = capacityString.replaceAll(" ", "");
+
+    for (Parser parser : parsers) {
+      Matcher matcher = parser.regex.matcher(capacityString);
+      if (matcher.find()) {
+        return parser.parser.apply(matcher);
+      }
+    }
+
+    return new QueueCapacityVector();
+  }
+
+  /**
+   * A parser method that is usable on uniform capacity values e.g. percentage or
+   * weight.
+   * @param matcher a regex matcher that contains parsed value and its possible
+   *                suffix
+   * @return a parsed capacity vector
+   */
+  private QueueCapacityVector uniformParser(Matcher matcher) {
+    QueueCapacityType capacityType = null;
+    String value = matcher.group(1);
+    if (matcher.groupCount() == 2) {
+      String matchedSuffix = matcher.group(2);
+      for (QueueCapacityType suffix : QueueCapacityType.values()) {
+        // Absolute uniform syntax is not supported
+        if (suffix.equals(QueueCapacityType.ABSOLUTE)) {
+          continue;
+        }
+        // when capacity is given in percentage, we do not need % symbol
+        String uniformSuffix = suffix.getPostfix().replaceAll("%", "");
+        if (uniformSuffix.equals(matchedSuffix)) {
+          capacityType = suffix;
+        }
+      }
+    }
+
+    if (capacityType == null) {
+      return new QueueCapacityVector();
+    }
+
+    return QueueCapacityVector.of(Float.parseFloat(value), capacityType);
+  }
+
+  /**
+   * A parser method that is usable on resource capacity values e.g. mixed or
+   * absolute resource.
+   * @param matcher a regex matcher that contains the matched resource string
+   * @return a parsed capacity vector
+   */
+  private QueueCapacityVector heterogeneousParser(Matcher matcher) {
+    QueueCapacityVector capacityVector = QueueCapacityVector.newInstance();
+
+    /*
+     * Absolute resource configuration for a queue will be grouped by "[]".
+     * Syntax of absolute resource config could be like below
+     * "memory=4Gi vcores=2". Ideally this means "4GB of memory and 2 vcores".
+     */
+    // Get the sub-group.
+    String bracketedGroup = matcher.group(0);
+    // Get the string inside starting and closing []
+    bracketedGroup = bracketedGroup.substring(1, bracketedGroup.length() - 1);
+    // Split by comma and equals delimiter eg. the string memory=1024,vcores=6
+    // is converted to an array of array as {{memory,1024}, {vcores, 6}}
+    for (String kvPair : bracketedGroup.trim().split(",")) {
+      String[] splits = kvPair.split("=");
+
+      // Ensure that each sub string is key value pair separated by '='.
+      if (splits.length > 1) {
+        setCapacityVector(capacityVector, splits[0], splits[1]);
+      }
+    }
+
+    // Memory always have to be defined
+    if (capacityVector.getMemory() == 0L) {
+      return new QueueCapacityVector();
+    }
+
+    return capacityVector;
+  }
+
+  private void setCapacityVector(
+      QueueCapacityVector resource, String resourceName, String resourceValue) {
+    QueueCapacityType capacityType = QueueCapacityType.ABSOLUTE;
+
+    // Extract suffix from a value e.g. for 6w extract w
+    String suffix = resourceValue.replaceAll(FLOAT_DIGIT_REGEX, "");
+    if (!resourceValue.endsWith(suffix)) {
+      return;
+    }
+
+    float parsedResourceValue = Float.parseFloat(resourceValue.substring(
+        0, resourceValue.length() - suffix.length()));
+    float convertedValue = parsedResourceValue;
+
+    if (!suffix.isEmpty() && UnitsConversionUtil.KNOWN_UNITS.contains(suffix)) {
+      // Convert all incoming units to MB if units is configured.
+      convertedValue = UnitsConversionUtil.convert(suffix, "Mi", (long) parsedResourceValue);
+    } else {
+      for (QueueCapacityType capacityTypeSuffix : QueueCapacityType.values()) {
+        if (capacityTypeSuffix.getPostfix().equals(suffix)) {
+          capacityType = capacityTypeSuffix;
+        }
+      }
+    }
+
+    resource.setResource(resourceName, convertedValue, capacityType);
+  }
+
+  /**
+   * Checks whether the given capacity string is in a capacity vector compatible
+   * format.
+   * @param configuredCapacity capacity string
+   * @return true, if capacity string is in capacity vector format,
+   * false otherwise
+   */
+  public boolean isCapacityVectorFormat(String configuredCapacity) {
+    return configuredCapacity != null
+        && RESOURCE_PATTERN.matcher(configuredCapacity).find();
+  }
+
+  private static class Parser {
+    private final Pattern regex;
+    private final Function<Matcher, QueueCapacityVector> parser;
+
+    Parser(Pattern regex, Function<Matcher, QueueCapacityVector> parser) {
+      this.regex = regex;
+      this.parser = parser;
+    }
+  }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacityVector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacityVector.java
new file mode 100644
index 0000000..058e14b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacityVector.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.util.Lists;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
+import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
+
+public class TestQueueCapacityVector {
+  private static final String CUSTOM_RESOURCE = "custom";
+  public static final String MIXED_CAPACITY_VECTOR_STRING =
+      "[custom=3.0,memory-mb=10.0w,vcores=6.0%]";
+
+  private final YarnConfiguration conf = new YarnConfiguration();
+
+  @Before
+  public void setUp() {
+    conf.set(YarnConfiguration.RESOURCE_TYPES, CUSTOM_RESOURCE);
+    ResourceUtils.resetResourceTypes(conf);
+  }
+
+  @Test
+  public void getResourceNamesByCapacityType() {
+    QueueCapacityVector capacityVector = QueueCapacityVector.newInstance();
+
+    capacityVector.setResource(MEMORY_URI, 10, QueueCapacityType.PERCENTAGE);
+    capacityVector.setResource(VCORES_URI, 6, QueueCapacityType.PERCENTAGE);
+
+    // custom is not set, defaults to 0
+    Assert.assertEquals(1, capacityVector.getResourceNamesByCapacityType(
+        QueueCapacityType.ABSOLUTE).size());
+    Assert.assertTrue(capacityVector.getResourceNamesByCapacityType(
+        QueueCapacityType.ABSOLUTE).contains(CUSTOM_RESOURCE));
+
+    Assert.assertEquals(2, capacityVector.getResourceNamesByCapacityType(
+        QueueCapacityType.PERCENTAGE).size());
+    Assert.assertTrue(capacityVector.getResourceNamesByCapacityType(
+        QueueCapacityType.PERCENTAGE).contains(VCORES_URI));
+    Assert.assertTrue(capacityVector.getResourceNamesByCapacityType(
+        QueueCapacityType.PERCENTAGE).contains(MEMORY_URI));
+    Assert.assertEquals(10, capacityVector.getResource(MEMORY_URI).getResourceValue(), EPSILON);
+    Assert.assertEquals(6, capacityVector.getResource(VCORES_URI).getResourceValue(), EPSILON);
+  }
+
+  @Test
+  public void isResourceOfType() {
+    QueueCapacityVector capacityVector = QueueCapacityVector.newInstance();
+
+    capacityVector.setResource(MEMORY_URI, 10, QueueCapacityType.WEIGHT);
+    capacityVector.setResource(VCORES_URI, 6, QueueCapacityType.PERCENTAGE);
+    capacityVector.setResource(CUSTOM_RESOURCE, 3, QueueCapacityType.ABSOLUTE);
+
+    Assert.assertTrue(capacityVector.isResourceOfType(MEMORY_URI, QueueCapacityType.WEIGHT));
+    Assert.assertTrue(capacityVector.isResourceOfType(VCORES_URI, QueueCapacityType.PERCENTAGE));
+    Assert.assertTrue(capacityVector.isResourceOfType(CUSTOM_RESOURCE, QueueCapacityType.ABSOLUTE));
+  }
+
+  @Test
+  public void testIterator() {
+    QueueCapacityVector capacityVector = QueueCapacityVector.newInstance();
+    List<QueueCapacityVectorEntry> entries = Lists.newArrayList(capacityVector);
+
+    Assert.assertEquals(3, entries.size());
+
+    QueueCapacityVector emptyCapacityVector = new QueueCapacityVector();
+    List<QueueCapacityVectorEntry> emptyEntries = Lists.newArrayList(emptyCapacityVector);
+
+    Assert.assertEquals(0, emptyEntries.size());
+  }
+
+  @Test
+  public void testToString() {
+    QueueCapacityVector capacityVector = QueueCapacityVector.newInstance();
+
+    capacityVector.setResource(MEMORY_URI, 10, QueueCapacityType.WEIGHT);
+    capacityVector.setResource(VCORES_URI, 6, QueueCapacityType.PERCENTAGE);
+    capacityVector.setResource(CUSTOM_RESOURCE, 3, QueueCapacityType.ABSOLUTE);
+
+    Assert.assertEquals(MIXED_CAPACITY_VECTOR_STRING, capacityVector.toString());
+
+    QueueCapacityVector emptyCapacityVector = new QueueCapacityVector();
+    Assert.assertEquals("[]", emptyCapacityVector.toString());
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestResourceVector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestResourceVector.java
new file mode 100644
index 0000000..fd6edb1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestResourceVector.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
+import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
+
+public class TestResourceVector {
+  private final static String CUSTOM_RESOURCE = "custom";
+
+  private final YarnConfiguration conf = new YarnConfiguration();
+
+  @Before
+  public void setUp() {
+    conf.set(YarnConfiguration.RESOURCE_TYPES, CUSTOM_RESOURCE);
+    ResourceUtils.resetResourceTypes(conf);
+  }
+
+  @Test
+  public void testCreation() {
+    ResourceVector zeroResourceVector = ResourceVector.newInstance();
+    Assert.assertEquals(0, zeroResourceVector.getValue(MEMORY_URI), EPSILON);
+    Assert.assertEquals(0, zeroResourceVector.getValue(VCORES_URI), EPSILON);
+    Assert.assertEquals(0, zeroResourceVector.getValue(CUSTOM_RESOURCE), EPSILON);
+
+    ResourceVector uniformResourceVector = ResourceVector.of(10);
+    Assert.assertEquals(10, uniformResourceVector.getValue(MEMORY_URI), EPSILON);
+    Assert.assertEquals(10, uniformResourceVector.getValue(VCORES_URI), EPSILON);
+    Assert.assertEquals(10, uniformResourceVector.getValue(CUSTOM_RESOURCE), EPSILON);
+
+    Map<String, Long> customResources = new HashMap<>();
+    customResources.put(CUSTOM_RESOURCE, 2L);
+    Resource resource = Resource.newInstance(10, 5, customResources);
+    ResourceVector resourceVectorFromResource = ResourceVector.of(resource);
+    Assert.assertEquals(10, resourceVectorFromResource.getValue(MEMORY_URI), EPSILON);
+    Assert.assertEquals(5, resourceVectorFromResource.getValue(VCORES_URI), EPSILON);
+    Assert.assertEquals(2, resourceVectorFromResource.getValue(CUSTOM_RESOURCE), EPSILON);
+  }
+
+  @Test
+  public void testSubtract() {
+    ResourceVector lhsResourceVector = ResourceVector.of(13);
+    ResourceVector rhsResourceVector = ResourceVector.of(5);
+    lhsResourceVector.subtract(rhsResourceVector);
+
+    Assert.assertEquals(8, lhsResourceVector.getValue(MEMORY_URI), EPSILON);
+    Assert.assertEquals(8, lhsResourceVector.getValue(VCORES_URI), EPSILON);
+    Assert.assertEquals(8, lhsResourceVector.getValue(CUSTOM_RESOURCE), EPSILON);
+
+    ResourceVector negativeResourceVector = ResourceVector.of(-100);
+
+    // Check whether overflow causes any issues
+    negativeResourceVector.subtract(ResourceVector.of(Float.MAX_VALUE));
+    Assert.assertEquals(-Float.MAX_VALUE, negativeResourceVector.getValue(MEMORY_URI), EPSILON);
+    Assert.assertEquals(-Float.MAX_VALUE, negativeResourceVector.getValue(VCORES_URI), EPSILON);
+    Assert.assertEquals(-Float.MAX_VALUE, negativeResourceVector.getValue(CUSTOM_RESOURCE),
+        EPSILON);
+
+  }
+
+  @Test
+  public void testIncrement() {
+    ResourceVector resourceVector = ResourceVector.of(13);
+    resourceVector.increment(MEMORY_URI, 5);
+
+    Assert.assertEquals(18, resourceVector.getValue(MEMORY_URI), EPSILON);
+    Assert.assertEquals(13, resourceVector.getValue(VCORES_URI), EPSILON);
+    Assert.assertEquals(13, resourceVector.getValue(CUSTOM_RESOURCE), EPSILON);
+
+    // Check whether overflow causes any issues
+    ResourceVector maxFloatResourceVector = ResourceVector.of(Float.MAX_VALUE);
+    maxFloatResourceVector.increment(MEMORY_URI, 100);
+    Assert.assertEquals(Float.MAX_VALUE, maxFloatResourceVector.getValue(MEMORY_URI), EPSILON);
+  }
+
+  @Test
+  public void testEquals() {
+    ResourceVector resourceVector = ResourceVector.of(13);
+    ResourceVector resourceVectorOther = ResourceVector.of(14);
+    Resource resource = Resource.newInstance(13, 13);
+
+    Assert.assertNotEquals(null, resourceVector);
+    Assert.assertNotEquals(resourceVectorOther, resourceVector);
+    Assert.assertNotEquals(resource, resourceVector);
+
+    ResourceVector resourceVectorOne = ResourceVector.of(1);
+    resourceVectorOther.subtract(resourceVectorOne);
+
+    Assert.assertEquals(resourceVectorOther, resourceVector);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestQueueCapacityConfigParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestQueueCapacityConfigParser.java
new file mode 100644
index 0000000..1aba816
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestQueueCapacityConfigParser.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import org.apache.hadoop.util.Lists;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityType;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
+import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
+import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;
+import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetricsForCustomResources.GB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
+
+public class TestQueueCapacityConfigParser {
+
+  private static final String ALL_RESOURCE_TEMPLATE = "[memory-mb=%s, vcores=%s, yarn.io/gpu=%s]";
+  private static final String MEMORY_VCORE_TEMPLATE = "[memory-mb=%s, vcores=%s]";
+
+  private static final String MEMORY_ABSOLUTE = "12Gi";
+  private static final float VCORE_ABSOLUTE = 6;
+  private static final float GPU_ABSOLUTE = 10;
+
+  private static final float PERCENTAGE_VALUE = 50f;
+  private static final float MEMORY_MIXED = 1024;
+  private static final float WEIGHT_VALUE = 6;
+
+  private static final String QUEUE = "root.test";
+
+  private static final String ABSOLUTE_RESOURCE = String.format(
+      ALL_RESOURCE_TEMPLATE, MEMORY_ABSOLUTE, VCORE_ABSOLUTE, GPU_ABSOLUTE);
+  private static final String ABSOLUTE_RESOURCE_MEMORY_VCORE = String.format(
+      MEMORY_VCORE_TEMPLATE, MEMORY_ABSOLUTE, VCORE_ABSOLUTE);
+  private static final String MIXED_RESOURCE = String.format(
+      ALL_RESOURCE_TEMPLATE, MEMORY_MIXED, PERCENTAGE_VALUE + "%", WEIGHT_VALUE + "w");
+  private static final String RESOURCE_TYPES = GPU_URI;
+
+  public static final String NONEXISTINGSUFFIX = "50nonexistingsuffix";
+  public static final String EMPTY_BRACKET = "[]";
+  public static final String INVALID_CAPACITY_BRACKET = "[invalid]";
+  public static final String INVALID_CAPACITY_FORMAT = "[memory-100,vcores-60]";
+
+  private final QueueCapacityConfigParser capacityConfigParser
+      = new QueueCapacityConfigParser();
+
+  @Test
+  public void testPercentageCapacityConfig() {
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    conf.setCapacity(QUEUE, PERCENTAGE_VALUE);
+
+    QueueCapacityVector percentageCapacityVector = capacityConfigParser.parse(conf, QUEUE,
+        NO_LABEL);
+    QueueCapacityVectorEntry memory = percentageCapacityVector.getResource(MEMORY_URI);
+    QueueCapacityVectorEntry vcore = percentageCapacityVector.getResource(VCORES_URI);
+
+    Assert.assertEquals(QueueCapacityType.PERCENTAGE, memory.getVectorResourceType());
+    Assert.assertEquals(PERCENTAGE_VALUE, memory.getResourceValue(), EPSILON);
+
+    Assert.assertEquals(QueueCapacityType.PERCENTAGE, vcore.getVectorResourceType());
+    Assert.assertEquals(PERCENTAGE_VALUE, vcore.getResourceValue(), EPSILON);
+
+    QueueCapacityVector rootCapacityVector = capacityConfigParser.parse(conf,
+        CapacitySchedulerConfiguration.ROOT, NO_LABEL);
+
+    QueueCapacityVectorEntry memoryRoot = rootCapacityVector.getResource(MEMORY_URI);
+    QueueCapacityVectorEntry vcoreRoot = rootCapacityVector.getResource(VCORES_URI);
+
+    Assert.assertEquals(QueueCapacityType.PERCENTAGE, memoryRoot.getVectorResourceType());
+    Assert.assertEquals(100f, memoryRoot.getResourceValue(), EPSILON);
+
+    Assert.assertEquals(QueueCapacityType.PERCENTAGE, vcoreRoot.getVectorResourceType());
+    Assert.assertEquals(100f, vcoreRoot.getResourceValue(), EPSILON);
+  }
+
+  @Test
+  public void testWeightCapacityConfig() {
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    conf.setNonLabeledQueueWeight(QUEUE, WEIGHT_VALUE);
+
+    QueueCapacityVector weightCapacityVector = capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
+
+    QueueCapacityVectorEntry memory = weightCapacityVector.getResource(MEMORY_URI);
+    QueueCapacityVectorEntry vcore = weightCapacityVector.getResource(VCORES_URI);
+
+    Assert.assertEquals(QueueCapacityType.WEIGHT, memory.getVectorResourceType());
+    Assert.assertEquals(WEIGHT_VALUE, memory.getResourceValue(), EPSILON);
+
+    Assert.assertEquals(QueueCapacityType.WEIGHT, vcore.getVectorResourceType());
+    Assert.assertEquals(WEIGHT_VALUE, vcore.getResourceValue(), EPSILON);
+  }
+
+  @Test
+  public void testAbsoluteCapacityVectorConfig() {
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE) +
+        CapacitySchedulerConfiguration.CAPACITY, ABSOLUTE_RESOURCE);
+    conf.set(YarnConfiguration.RESOURCE_TYPES, RESOURCE_TYPES);
+    ResourceUtils.resetResourceTypes(conf);
+
+    QueueCapacityVector absoluteCapacityVector = capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
+
+    Assert.assertEquals(QueueCapacityType.ABSOLUTE, absoluteCapacityVector.getResource(MEMORY_URI)
+        .getVectorResourceType());
+    Assert.assertEquals(12 * GB, absoluteCapacityVector.getResource(MEMORY_URI)
+        .getResourceValue(), EPSILON);
+
+    Assert.assertEquals(QueueCapacityType.ABSOLUTE, absoluteCapacityVector.getResource(VCORES_URI)
+        .getVectorResourceType());
+    Assert.assertEquals(VCORE_ABSOLUTE, absoluteCapacityVector.getResource(VCORES_URI)
+        .getResourceValue(), EPSILON);
+
+    Assert.assertEquals(QueueCapacityType.ABSOLUTE, absoluteCapacityVector.getResource(GPU_URI)
+        .getVectorResourceType());
+    Assert.assertEquals(GPU_ABSOLUTE, absoluteCapacityVector.getResource(GPU_URI)
+        .getResourceValue(), EPSILON);
+
+    conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE) +
+        CapacitySchedulerConfiguration.CAPACITY, ABSOLUTE_RESOURCE_MEMORY_VCORE);
+    QueueCapacityVector withoutGpuVector = capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
+
+    Assert.assertEquals(3, withoutGpuVector.getResourceCount());
+    Assert.assertEquals(0f, withoutGpuVector.getResource(GPU_URI).getResourceValue(), EPSILON);
+  }
+
+  @Test
+  public void testMixedCapacityConfig() {
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+        + CapacitySchedulerConfiguration.CAPACITY, MIXED_RESOURCE);
+    conf.set(YarnConfiguration.RESOURCE_TYPES, RESOURCE_TYPES);
+    ResourceUtils.resetResourceTypes(conf);
+
+    QueueCapacityVector mixedCapacityVector =
+        capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
+
+    Assert.assertEquals(QueueCapacityType.ABSOLUTE,
+        mixedCapacityVector.getResource(MEMORY_URI).getVectorResourceType());
+    Assert.assertEquals(MEMORY_MIXED, mixedCapacityVector.getResource(MEMORY_URI)
+        .getResourceValue(), EPSILON);
+
+    Assert.assertEquals(QueueCapacityType.PERCENTAGE,
+        mixedCapacityVector.getResource(VCORES_URI).getVectorResourceType());
+    Assert.assertEquals(PERCENTAGE_VALUE,
+        mixedCapacityVector.getResource(VCORES_URI).getResourceValue(), EPSILON);
+
+    Assert.assertEquals(QueueCapacityType.WEIGHT,
+        mixedCapacityVector.getResource(GPU_URI).getVectorResourceType());
+    Assert.assertEquals(WEIGHT_VALUE,
+        mixedCapacityVector.getResource(GPU_URI).getResourceValue(), EPSILON);
+
+    // Test undefined capacity type default value
+    conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+        + CapacitySchedulerConfiguration.CAPACITY, ABSOLUTE_RESOURCE_MEMORY_VCORE);
+
+    QueueCapacityVector mixedCapacityVectorWithGpuUndefined =
+        capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
+    Assert.assertEquals(QueueCapacityType.ABSOLUTE,
+        mixedCapacityVectorWithGpuUndefined.getResource(MEMORY_URI).getVectorResourceType());
+    Assert.assertEquals(0, mixedCapacityVectorWithGpuUndefined.getResource(GPU_URI)
+        .getResourceValue(), EPSILON);
+
+  }
+
+  @Test
+  public void testInvalidCapacityConfigs() {
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+
+    conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+        + CapacitySchedulerConfiguration.CAPACITY, NONEXISTINGSUFFIX);
+    QueueCapacityVector capacityVectorWithInvalidSuffix =
+        capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
+    List<QueueCapacityVectorEntry> entriesWithInvalidSuffix =
+        Lists.newArrayList(capacityVectorWithInvalidSuffix.iterator());
+    Assert.assertEquals(0, entriesWithInvalidSuffix.size());
+
+    conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+        + CapacitySchedulerConfiguration.CAPACITY, INVALID_CAPACITY_FORMAT);
+    QueueCapacityVector invalidDelimiterCapacityVector =
+        capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
+    List<QueueCapacityVectorEntry> invalidDelimiterEntries =
+        Lists.newArrayList(invalidDelimiterCapacityVector.iterator());
+    Assert.assertEquals(0, invalidDelimiterEntries.size());
+
+    conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+        + CapacitySchedulerConfiguration.CAPACITY, INVALID_CAPACITY_BRACKET);
+    QueueCapacityVector invalidCapacityVector =
+        capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
+    List<QueueCapacityVectorEntry> resources =
+        Lists.newArrayList(invalidCapacityVector.iterator());
+    Assert.assertEquals(0, resources.size());
+
+    conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+        + CapacitySchedulerConfiguration.CAPACITY, EMPTY_BRACKET);
+    QueueCapacityVector emptyBracketCapacityVector =
+        capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
+    List<QueueCapacityVectorEntry> emptyEntries =
+        Lists.newArrayList(emptyBracketCapacityVector.iterator());
+    Assert.assertEquals(0, emptyEntries.size());
+
+    conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+        + CapacitySchedulerConfiguration.CAPACITY, "");
+    QueueCapacityVector emptyCapacity =
+        capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
+    List<QueueCapacityVectorEntry> emptyResources =
+        Lists.newArrayList(emptyCapacity.iterator());
+    Assert.assertEquals(emptyResources.size(), 0);
+
+    conf.unset(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+        + CapacitySchedulerConfiguration.CAPACITY);
+    QueueCapacityVector nonSetCapacity =
+        capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
+    List<QueueCapacityVectorEntry> nonSetResources =
+        Lists.newArrayList(nonSetCapacity.iterator());
+    Assert.assertEquals(nonSetResources.size(), 0);
+  }
+}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org