You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bo...@apache.org on 2018/08/16 15:58:24 UTC
hadoop git commit: YARN-7953. [GQ] Data structures for federation
global queues calculations. Contributed by Abhishek Modi.
Repository: hadoop
Updated Branches:
refs/heads/YARN-7402 91dd58b76 -> 717874a16
YARN-7953. [GQ] Data structures for federation global queues calculations. Contributed by Abhishek Modi.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/717874a1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/717874a1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/717874a1
Branch: refs/heads/YARN-7402
Commit: 717874a166aa22f8334dda72b35c55dbad1eccf7
Parents: 91dd58b
Author: Botong Huang <bo...@apache.org>
Authored: Thu Aug 16 08:28:35 2018 -0700
Committer: Botong Huang <bo...@apache.org>
Committed: Thu Aug 16 08:28:35 2018 -0700
----------------------------------------------------------------------
.../pom.xml | 3 +
...ederationGlobalQueueValidationException.java | 28 +
.../globalqueues/FederationGlobalView.java | 198 +++++
.../globalqueues/FederationQueue.java | 761 +++++++++++++++++++
.../globalqueues/package-info.java | 17 +
.../globalqueues/GlobalQueueTestUtil.java | 133 ++++
.../globalqueues/TestFederationQueue.java | 98 +++
.../resources/globalqueues/basic-queue.json | 9 +
.../globalqueues/tree-queue-adaptable.json | 96 +++
.../test/resources/globalqueues/tree-queue.json | 128 ++++
10 files changed, 1471 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/717874a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml
index c137c9e..f0097af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml
@@ -108,6 +108,9 @@
<excludes>
<exclude>src/test/resources/schedulerInfo1.json</exclude>
<exclude>src/test/resources/schedulerInfo2.json</exclude>
+ <exclude>src/test/resources/globalqueues/basic-queue.json</exclude>
+ <exclude>src/test/resources/globalqueues/tree-queue.json</exclude>
+ <exclude>src/test/resources/globalqueues/tree-queue-adaptable.json</exclude>
</excludes>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/717874a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/FederationGlobalQueueValidationException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/FederationGlobalQueueValidationException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/FederationGlobalQueueValidationException.java
new file mode 100644
index 0000000..3a18763
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/FederationGlobalQueueValidationException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.globalpolicygenerator.globalqueues;
+
+/**
+ * Exception thrown when FederationQueue is not valid.
+ */
+public class FederationGlobalQueueValidationException extends Exception {
+
+ public FederationGlobalQueueValidationException(String s) {
+ super(s);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/717874a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/FederationGlobalView.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/FederationGlobalView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/FederationGlobalView.java
new file mode 100644
index 0000000..45668e0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/FederationGlobalView.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.globalqueues;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class represents a set of root queues (one for each sub-cluster) of a
+ * federation.
+ */
+public class FederationGlobalView implements Cloneable {
+ private ResourceCalculator rc;
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(FederationGlobalView.class);
+
+ private String name;
+ private FederationQueue global;
+ private List<FederationQueue> subClusters;
+ private Configuration conf;
+
+ public FederationGlobalView(){
+ subClusters = new ArrayList<>();
+ }
+
+ public FederationGlobalView(Configuration config, ResourceCalculator rc) {
+ this();
+ this.conf=config;
+ this.rc=rc;
+ }
+
+ public FederationGlobalView(Configuration config, ResourceCalculator rc,
+ List<FederationQueue> subClusters) {
+ this(config, rc);
+ setSubClusters(subClusters);
+ globalFromLocal();
+ }
+
+ /**
+ * This method checks that certain queue invariants are respected.
+ *
+ * @throws FederationGlobalQueueValidationException upon violation.
+ */
+ public void validate() throws FederationGlobalQueueValidationException {
+ try {
+ if (global != null) {
+ global.validate();
+ }
+ for (FederationQueue f : subClusters) {
+ f.validate();
+ }
+ } catch(FederationGlobalQueueValidationException f) {
+ LOG.error("Error in validating " + this.toQuickString());
+ throw f;
+ }
+ }
+
+ /**
+ * Returns a FederationQueue matching the queueName
+ * from the specified subClusters.
+ *
+ * @param queueName
+ * @param subClusterName
+ * @return FederationQueue corresponding to the queueName and subCluster
+ */
+ public FederationQueue getQueue(String queueName, String subClusterName) {
+ for (FederationQueue f : subClusters) {
+ if (f.getSubClusterId().equals(
+ SubClusterId.newInstance(subClusterName))) {
+ return f.getChildByName(queueName);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Get name of the FederationGlobalView
+ * @return name of the global view
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Set name of the FederationGlobalView
+ * @param name global view name
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Get global view subclusters
+ * @return subclusters associated with global view
+ */
+ public List<FederationQueue> getSubClusters() {
+ return subClusters;
+ }
+
+ /**
+ * Set global view subclusters
+ * @param subClusters subclusters associated with global view
+ */
+ public void setSubClusters(List<FederationQueue> subClusters) {
+ this.subClusters = subClusters;
+ }
+
+ /**
+ * Creates a global queue by merging queues of all subclusters.
+ */
+ public void globalFromLocal() {
+ // filling out the global object and propagating totCap
+ FederationQueue globalQueue = FederationQueue.mergeQueues(
+ this.getSubClusters(), SubClusterId.newInstance("global"));
+ Resource totCap =
+ Resources.componentwiseMax(globalQueue.getGuarCap(),
+ globalQueue.getMaxCap());
+ globalQueue.setTotCap(totCap);
+ globalQueue.propagateCapacities();
+ this.setGlobal(globalQueue);
+ }
+
+ public String toString() {
+ return toQuickString();
+ }
+
+ /**
+ * Produces a quick String representation of all the queues associated
+ * with view.
+ * Good for printing.
+ */
+ public String toQuickString() {
+ StringBuilder sb = new StringBuilder();
+ subClusters.forEach(sc -> sb.append(sc.toQuickString()).append("\n"));
+
+ return sb.toString();
+ }
+
+ /**
+ * Returns global queue associated with the view.
+ * @return global queue.
+ */
+ public FederationQueue getGlobal() {
+ return global;
+ }
+
+ /**
+ * Set global queue for FederationGlobalView
+ * @param global queue for FederationGlobalView
+ */
+ public void setGlobal(FederationQueue global) {
+ this.global = global;
+ }
+
+ // simply initialize the root to zero preemption
+ protected void initializeRootPreemption() {
+ global.setToBePreempted(Resource.newInstance(0, 0));
+ for (FederationQueue lr : subClusters) {
+ lr.setToBePreempted(Resource.newInstance(0, 0));
+ }
+ }
+
+ public FederationGlobalView clone() throws CloneNotSupportedException {
+ FederationGlobalView copy = (FederationGlobalView) super.clone();
+ copy.setGlobal(global.clone(true));
+ List<FederationQueue> clonedSubClusters = new ArrayList<>();
+ for (FederationQueue localRoot : getSubClusters()) {
+ clonedSubClusters.add(localRoot.clone(true));
+ }
+ copy.setSubClusters(clonedSubClusters);
+ return copy;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/717874a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/FederationQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/FederationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/FederationQueue.java
new file mode 100644
index 0000000..9c6d6e6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/FederationQueue.java
@@ -0,0 +1,761 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.globalqueues;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DEFAULT_RESOURCE_CALCULATOR_CLASS;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class represents a tree of queues in a sub-cluster YarnRM.
+ * Useful to communicate with GPG and global policies.
+ */
+public class FederationQueue implements Iterable<FederationQueue> {
+
+ private String queueName;
+ private String queueType;
+
+ // sub-cluster to which queue belongs.
+ private SubClusterId subClusterId;
+
+ // capacities associated with queue.
+ private Resource totCap;
+ private Resource guarCap;
+ private Resource maxCap;
+ private Resource usedCap;
+ private Resource demandCap;
+ private Resource idealAlloc;
+
+ //resource that can be preempted in this queue.
+ private Resource toBePreempted;
+
+ // Used only for testing (to embed expected behavior)
+ private Resource testExpectedIdealAlloc;
+
+ private Map<String, FederationQueue> children;
+ private ResourceCalculator rc;
+ private Resource totalUnassigned;
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(FederationQueue.class);
+
+ private Configuration conf;
+
+ public FederationQueue() {
+ this(new Configuration());
+ }
+
+ public FederationQueue(Configuration conf) {
+ this(conf,
+ ReflectionUtils.newInstance(
+ conf.getClass(RESOURCE_CALCULATOR_CLASS,
+ DEFAULT_RESOURCE_CALCULATOR_CLASS, ResourceCalculator.class),
+ conf));
+ }
+
+ public FederationQueue(Configuration conf, ResourceCalculator rc) {
+ this.conf = conf;
+ children = new HashMap<>();
+ this.rc = rc;
+ }
+
+ public FederationQueue(String queuename, SubClusterId subClusterId,
+ Resource guar, Resource max, Resource used, Resource pending) {
+ this.conf = new Configuration();
+ this.rc =
+ ReflectionUtils.newInstance(
+ conf.getClass(RESOURCE_CALCULATOR_CLASS,
+ DEFAULT_RESOURCE_CALCULATOR_CLASS, ResourceCalculator.class),
+ conf);
+ this.queueName = queuename;
+ this.subClusterId = subClusterId;
+ this.guarCap = guar;
+ this.maxCap = max;
+ this.usedCap = used;
+ this.demandCap = pending;
+ this.totCap = Resources.clone(guar);
+ this.children = new HashMap<>();
+ }
+
+ /**
+ * This method propagates from leaf to root all metrics, and pushes down the
+ * total capacity from the root.
+ */
+ public void propagateCapacities() {
+ rollDownCapacityFromRoot(totCap);
+ rollUpMetricsFromChildren();
+ }
+
+ private void rollDownCapacityFromRoot(Resource rootCap) {
+ totCap = rootCap;
+ for (FederationQueue c: children.values()) {
+ c.rollDownCapacityFromRoot(rootCap);
+ }
+ }
+
+ private void rollUpMetricsFromChildren() {
+ Resource childGuar = Resources.createResource(0L, 0);
+ Resource childMax = Resources.createResource(0L, 0);
+ Resource childSumOfMax = Resources.createResource(0L, 0);
+ Resource childUsed = Resources.createResource(0L, 0);
+ Resource childDem = Resources.createResource(0L, 0);
+
+ // this pull the leaf data up
+ for (FederationQueue c : children.values()) {
+ c.rollUpMetricsFromChildren();
+ if (c.getGuarCap() != null) {
+ Resources.addTo(childGuar, c.getGuarCap());
+ }
+ if (c.getMaxCap() != null) {
+ Resources.addTo(childSumOfMax, c.getMaxCap());
+ childMax = Resources.max(rc, totCap, childMax, c.getMaxCap());
+ }
+ if (c.getUsedCap() != null) {
+ Resources.addTo(childUsed, c.getUsedCap());
+ }
+ if (c.getDemandCap() != null) {
+ Resources.addTo(childDem, c.getDemandCap());
+ }
+ }
+ if (children.size() > 0) {
+ setGuarCap(childGuar);
+ setMaxCap(Resources.componentwiseMin(
+ Resources.componentwiseMax(childMax, childGuar), totCap));
+ setUsedCap(childUsed);
+ setDemandCap(childDem);
+ }
+ }
+
+ /**
+ * This method checks that certain queue invariants are respected.
+ *
+ * @throws FederationGlobalQueueValidationException upon violation.
+ */
+ public void validate() throws FederationGlobalQueueValidationException {
+
+ if (totCap == null) {
+ throw new FederationGlobalQueueValidationException(
+ "Total capacity must be configured");
+ }
+
+ if (Resources.lessThan(rc, totCap, usedCap, Resources.none())) {
+ throw new FederationGlobalQueueValidationException(
+ "usedCap (" + usedCap + ") exceeds totCap (" + totCap + ") for queue "
+ + this.getQueueName() + "@" + this.getSubClusterId());
+ }
+
+ if (!Resources.fitsIn(guarCap, totCap)) {
+ throw new FederationGlobalQueueValidationException(
+ "guarCap (" + guarCap + ") exceeds total capacity (" + totCap
+ + " for queue " + this.getQueueName() +
+ "@" + this.getSubClusterId());
+ }
+
+ if (Resources.lessThan(rc, totCap, guarCap, Resources.none())) {
+ throw new FederationGlobalQueueValidationException(
+ "guarCap (" + guarCap + ") is outside [0,+inf] range for queue "
+ + this.getQueueName() +
+ "@" + this.getSubClusterId());
+ }
+
+ if (!Resources.fitsIn(guarCap, maxCap)) {
+ throw new FederationGlobalQueueValidationException("maxCap (" + maxCap
+ + ") is outside [" + guarCap + ",+inf] range for queue "
+ + this.getQueueName() +
+ "@" + this.getSubClusterId());
+
+ }
+
+ if (Resources.lessThan(rc, totCap, usedCap, Resources.none())
+ || !Resources.fitsIn(usedCap, maxCap)) {
+ throw new FederationGlobalQueueValidationException("usedCap (" + usedCap
+ + ") is outside [0," + maxCap + "] range for queue "
+ + this.getQueueName() +
+ "@" + this.getSubClusterId());
+ }
+
+ if (Resources.lessThan(rc, totCap, demandCap, Resources.none())) {
+ throw new FederationGlobalQueueValidationException(
+ "demandCap (" + demandCap + ") is outside [0,+inf] range for queue "
+ + this.getQueueName() +
+ "@" + this.getSubClusterId());
+ }
+ if (idealAlloc != null && !Resources.fitsIn(idealAlloc, totCap)) {
+ throw new FederationGlobalQueueValidationException(
+ "idealAlloc (" + idealAlloc + ") is greter than totCap (" + totCap
+ + ") for queue " + this.getQueueName() +
+ "@" + this.getSubClusterId());
+ }
+
+ if (children != null && children.size() > 0) {
+ Resource childGuar = Resources.createResource(0L, 0);
+ Resource childMax = Resources.createResource(0L, 0);
+ Resource childUsed = Resources.createResource(0L, 0);
+ Resource childDem = Resources.createResource(0L, 0);
+ Resource childIdealAlloc = Resources.createResource(0, 0);
+
+ for (FederationQueue c : children.values()) {
+ Resources.addTo(childGuar, c.getGuarCap());
+ Resources.addTo(childUsed, c.getUsedCap());
+ Resources.addTo(childDem, c.getDemandCap());
+ if (c.idealAlloc != null) {
+ Resources.addTo(childIdealAlloc, c.getIdealAlloc());
+ }
+ if (!Resources.lessThanOrEqual(rc, totCap, childMax, maxCap)) {
+ throw new FederationGlobalQueueValidationException(
+ "Sum of children maxCap (" + childMax
+ + ") mismatched with parent maxCap (" + maxCap
+ + ") for queue " + this.getQueueName() + "@"
+ + this.getSubClusterId());
+ }
+
+ c.validate();
+ }
+
+ if (!Resources.equals(childGuar, guarCap)) {
+ throw new FederationGlobalQueueValidationException(
+ "Sum of children guarCap (" + childGuar
+ + ") mismatched with parent guarCap (" + guarCap
+ + ") for queue " + this.getQueueName() +
+ "@" + this.getSubClusterId());
+ }
+
+ if (!Resources.equals(childUsed, usedCap)) {
+ throw new FederationGlobalQueueValidationException(
+ "Sum of children usedCap (" + childUsed
+ + ") mismatched with parent usedCap (" + usedCap
+ + ") for queue " + this.getQueueName() +
+ "@" + this.getSubClusterId());
+ }
+
+ if (!Resources.equals(childDem, demandCap)) {
+ throw new FederationGlobalQueueValidationException(
+ "Sum of children demandCap (" + childGuar
+ + ") mismatched with parent demandCap (" + demandCap
+ + ") for queue " + this.getQueueName() +
+ "@" + this.getSubClusterId());
+ }
+
+ if (idealAlloc != null
+ && !Resources.fitsIn(childIdealAlloc, idealAlloc)) {
+ throw new FederationGlobalQueueValidationException(
+ "Sum of children idealAlloc (" + childIdealAlloc
+ + ") exceed the parent idealAlloc (" + idealAlloc
+ + ") for queue " + this.getQueueName() +
+ "@" + this.getSubClusterId());
+ }
+ }
+ }
+
+ /**
+ * This method clones the FederationQueue.
+ * @param recursive whether to clone recursively.
+ * @return cloned object of Federation Queue.
+ */
+ public FederationQueue clone(boolean recursive) {
+ FederationQueue metoo = new FederationQueue(this.conf, rc);
+ metoo.queueName = queueName;
+ metoo.subClusterId = subClusterId;
+ metoo.totCap = Resources.clone(totCap);
+ metoo.guarCap = Resources.clone(guarCap);
+ metoo.maxCap = Resources.clone(maxCap);
+ metoo.usedCap = Resources.clone(usedCap);
+ metoo.demandCap = Resources.clone(demandCap);
+ metoo.idealAlloc =
+ (idealAlloc != null) ? Resources.clone(idealAlloc) : null;
+ metoo.toBePreempted =
+ (toBePreempted != null) ? Resources.clone(toBePreempted) : null;
+ metoo.testExpectedIdealAlloc = (testExpectedIdealAlloc != null)
+ ? Resources.clone(testExpectedIdealAlloc) : null;
+ for (Map.Entry<String, FederationQueue> c : children.entrySet()) {
+ if (recursive) {
+ metoo.children.put(c.getKey(), c.getValue().clone(true));
+ } else {
+ metoo.children.put(c.getKey(), c.getValue());
+ }
+ }
+ return metoo;
+ }
+
+ /**
+ * This operation combine every level of the queue and produces a merged tree.
+ *
+ * @param subClusterQueues the input queues to merge
+ * @return the root of the merged FederationQueue tree
+ */
+ public static FederationQueue mergeQueues(
+ List<FederationQueue> subClusterQueues, SubClusterId newScope) {
+
+ FederationQueue combined = null;
+
+ for (FederationQueue root : subClusterQueues) {
+ if (combined == null) {
+ combined = root.clone(false);
+ combined.setSubClusterId(newScope);
+ continue;
+ }
+ combined.setTotCap(Resources
+ .clone(Resources.add(combined.getTotCap(), root.getTotCap())));
+ combined.setGuarCap(Resources
+ .clone(Resources.add(combined.getGuarCap(), root.getGuarCap())));
+ combined.setMaxCap(
+ Resources.clone(Resources.componentwiseMax(combined.getTotCap(),
+ Resources.add(combined.getMaxCap(), root.getMaxCap()))));
+ combined.setUsedCap(Resources
+ .clone(Resources.add(combined.getUsedCap(), root.getUsedCap())));
+ combined.setDemandCap(Resources
+ .clone(Resources.add(combined.getDemandCap(), root.getDemandCap())));
+
+ Map<String, FederationQueue> newChildren = new HashMap<>();
+ for (Map.Entry<String, FederationQueue> mychild :
+ combined.children.entrySet()) {
+ FederationQueue theirchild = root.getChildren().get(mychild.getKey());
+ List<FederationQueue> mergelist = new ArrayList<>();
+ mergelist.add(mychild.getValue());
+ mergelist.add(theirchild);
+ newChildren.put(mychild.getKey(), mergeQueues(mergelist, newScope));
+ }
+ combined.children = newChildren;
+ }
+
+ combined.propagateCapacities();
+ return combined;
+ }
+
+ /**
+ * Get child FederationQueue by name.
+ * @param queueName name of the queue.
+ * @return children FederationQueue.
+ */
+ public FederationQueue getChildByName(String queueName) {
+ return recursiveChildByName(this, queueName);
+ }
+
+ private static FederationQueue recursiveChildByName(FederationQueue f,
+ String a) {
+ if (f == null) {
+ return null;
+ }
+ if (f.getQueueName() != null && f.getQueueName().equals(a)) {
+ return f;
+ }
+ if (f.getChildren().get(a) != null) {
+ return f.getChildren().get(a);
+ }
+
+ for (FederationQueue c : f.getChildren().values()) {
+ FederationQueue ret = recursiveChildByName(c, a);
+ if (ret != null) {
+ return ret;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Sets total capacity for FederationQueue and children.
+ * @param totCapacity total capacity of the queue.
+ */
+ public void recursiveSetOfTotCap(Resource totCapacity) {
+ this.setTotCap(totCapacity);
+ for (FederationQueue child : this.getChildren().values()) {
+ child.recursiveSetOfTotCap(totCapacity);
+ }
+ }
+
+ /**
+ * Get the queue total unassigned resources.
+ * @return queue unassigned resources.
+ */
+ public Resource getTotalUnassigned() {
+ return totalUnassigned;
+ }
+
+ /**
+ * Set the queue total unassigned resources.
+ * @param totalUnassigned queue totalUnassigned resources.
+ */
+ public void setTotalUnassigned(Resource totalUnassigned) {
+ this.totalUnassigned = totalUnassigned;
+ }
+
+ /**
+ * Get the queue configuration
+ * @return queue configuration
+ */
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * Set the queue configuration
+ * @param conf queue configuration
+ */
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Get queue guaranteed capacity
+ * @return queue guaranteed capacity
+ */
+ public Resource getGuarCap() {
+ return guarCap;
+ }
+
+ /**
+ * Set queue guaranteed capacity
+ * @param guarCap queue guaranteed capacity
+ */
+ public void setGuarCap(Resource guarCap) {
+ this.guarCap = guarCap;
+ }
+
+ /**
+ * Get queue max capacity
+ * @return queue max capacity
+ */
+ public Resource getMaxCap() {
+ return maxCap;
+ }
+
+ /**
+ * Set queue max capacity
+ * @param maxCap max capacity of the queue
+ */
+ public void setMaxCap(Resource maxCap) {
+ this.maxCap = maxCap;
+ }
+
+ /**
+ * Get queue used capacity
+ * @return queue used capacity
+ */
+ public Resource getUsedCap() {
+ return usedCap;
+ }
+
+ /**
+ * Set queue used capacity
+ * @param usedCap queue used capacity
+ */
+ public void setUsedCap(Resource usedCap) {
+ this.usedCap = usedCap;
+ }
+
+ /**
+ * Get queue demand capacity
+ * @return queue demand capacity
+ */
+ public Resource getDemandCap() {
+ return demandCap;
+ }
+
+ /**
+ * Set queue demand capacity
+ * @param demandCap queue demand capacity
+ */
+ public void setDemandCap(Resource demandCap) {
+ this.demandCap = demandCap;
+ }
+
+ /**
+ * Get queue children
+ * @return queue children
+ */
+ public Map<String, FederationQueue> getChildren() {
+ return children;
+ }
+
+ /**
+ * Set queue children
+ * @param children queue children
+ */
+ public void setChildren(Map<String, FederationQueue> children) {
+ this.children = children;
+ }
+
+ /**
+ * Get queue name
+ * @return queue name
+ */
+ public String getQueueName() {
+ return queueName;
+ }
+
+ /**
+ * Get queue total capacity
+ * @return queue total capacity
+ */
+ public Resource getTotCap() {
+ return totCap;
+ }
+
+ /**
+ * Set queue total capacity
+ * @param totCap queue total capacity
+ */
+ public void setTotCap(Resource totCap) {
+ this.totCap = totCap;
+ }
+
+ /**
+ * Set queue name
+ * @param queueName queue name
+ */
+ public void setQueueName(String queueName) {
+ this.queueName = queueName;
+ }
+
+ /**
+ * Get queue ideal allocation
+ * @return queue ideal allocation
+ */
+ public Resource getIdealAlloc() {
+ return idealAlloc;
+ }
+
+ /**
+ * Set queue ideal allocation
+ * @param idealAlloc queue ideal allocation
+ */
+ public void setIdealAlloc(Resource idealAlloc) {
+ this.idealAlloc = idealAlloc;
+ }
+
+ /**
+ * Get queue resources to be preempted
+ * @return queue resources to be preempted
+ */
+ public Resource getToBePreempted() {
+ return toBePreempted;
+ }
+
+ /**
+ * Set queue resources to be preempted
+ * @param toBePreempted queue resources to be preempted
+ */
+ public void setToBePreempted(Resource toBePreempted) {
+ this.toBePreempted = toBePreempted;
+ }
+
+ /**
+ * Get queue subcluster id
+ * @return queue subcluster id
+ */
+ public SubClusterId getSubClusterId() {
+ return subClusterId;
+ }
+
+ /**
+ * Set queue subcluster id
+ * @param subClusterId queue subcluster id
+ */
+ public void setSubClusterId(SubClusterId subClusterId) {
+ this.subClusterId = subClusterId;
+ }
+
+ /**
+ * Set queue type
+ * @param queueType queue type
+ */
+ public void setQueueType(String queueType) {
+ this.queueType = queueType;
+ }
+
+ /**
+ * Get queue type
+ * @return queue type
+ */
+ public String getQueueType() {
+ return queueType;
+ }
+
+ /**
+ * Get queue expected ideal allocation
+ * @return queue ideal allocation
+ */
+ public Resource getExpectedIdealAlloc() {
+ return testExpectedIdealAlloc;
+ }
+
+ public String toString() {
+ return toQuickString();
+ }
+
+ /**
+ * Produces a quick String representation of the queue rooted at this node.
+ * Good for printing.
+ */
+ public String toQuickString() {
+ return this.appendToSB(new StringBuilder(), 0).toString();
+ }
+
+ /**
+ * Append queue hierarchy rooted at this node to the given StringBuilder.
+ */
+ private StringBuilder appendToSB(StringBuilder sb, int depth) {
+ sb.append("\n").append(String.join("", Collections.nCopies(depth, "\t")))
+ .append(queueName);
+ if (depth == 0) {
+ sb.append("(" + subClusterId + ")");
+ }
+ sb.append(" [g: ").append(guarCap.getMemorySize()).append("/")
+ .append(guarCap.getVirtualCores()).append(", m: ")
+ .append(maxCap.getMemorySize()).append("/")
+ .append(maxCap.getVirtualCores()).append(", u: ")
+ .append(usedCap.getMemorySize()).append("/")
+ .append(usedCap.getVirtualCores()).append(", d: ")
+ .append(demandCap.getMemorySize()).append("/")
+ .append(demandCap.getVirtualCores()).append(", t: ")
+ .append(totCap.getMemorySize()).append("/")
+ .append(totCap.getVirtualCores());
+ if (idealAlloc != null) {
+ sb.append(", i: ").append(idealAlloc.getMemorySize()).append("/")
+ .append(idealAlloc.getVirtualCores());
+ }
+ sb.append("]");
+ if (children != null && !children.isEmpty()) {
+ children.values().forEach(c -> c.appendToSB(sb, depth + 1));
+ }
+ return sb;
+ }
+
+ /**
+ * Count the total child queues
+ * @return total child queues
+ */
+ public long countQueues() {
+ long count = 1;
+ for (FederationQueue q : getChildren().values()) {
+ count += q.countQueues();
+ }
+ return count;
+ }
+
+ /**
+ * Checks whether queue is leaf queue
+ * @return is queue leaf queue
+ */
+ public boolean isLeaf() {
+ return this.getChildren() == null || this.getChildren().isEmpty();
+ }
+
+ /**
+ * Get queue number of children
+ * @return number of queue children
+ */
+ public int childrenNum() {
+ return this.getChildren() != null ? this.getChildren().size() : 0;
+ }
+
+ /**
+ * True if the sum of used and pending resources for this queue are smaller
+ * than the guaranteed resources.
+ */
+ public boolean isUnderutilized() {
+ return Resources.fitsIn(
+ Resources.add(this.getUsedCap(), this.getDemandCap()),
+ this.getGuarCap());
+ }
+
+ /**
+ * Return a stream of the current FederationQueue (uses the FedQueueIterator).
+ */
+ public Stream<FederationQueue> stream() {
+ return StreamSupport.stream(this.spliterator(), false);
+ }
+
+ /**
+ * Stream all leaf nodes of the FederationQueue hierarchy.
+ */
+ public Stream<FederationQueue> streamLeafQs() {
+ return this.stream().filter(FederationQueue::isLeaf);
+ }
+
+ /**
+ * Stream all leaf nodes that have non-zero guaranteed capacity.
+ */
+ public Stream<FederationQueue> streamNonEmptyLeafQs() {
+ return this.streamLeafQs()
+ .filter(leafQ -> leafQ.getGuarCap().getMemorySize() > 0);
+ }
+
+ /**
+ * Stream all inner nodes of the FederationQueue hierarchy.
+ */
+ public Stream<FederationQueue> streamInnerQs() {
+ return this.stream().filter(q -> !q.isLeaf());
+ }
+
+ @Override
+ public Iterator<FederationQueue> iterator() {
+ return new FedQueueIterator(this);
+ }
+
+ /**
+ * Iterator for FederationQueue.
+ */
+ private static final class FedQueueIterator implements
+ Iterator<FederationQueue> {
+
+ private Deque<FederationQueue> state;
+ private FederationQueue crt;
+
+ FedQueueIterator(FederationQueue root) {
+ this.state = new ArrayDeque<>();
+ state.push(root);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !state.isEmpty();
+ }
+
+ @Override
+ public FederationQueue next() {
+ crt = state.pop();
+ if (crt.getChildren() != null && !crt.getChildren().isEmpty()) {
+ state.addAll(crt.getChildren().values());
+ }
+ return crt;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/717874a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/package-info.java
new file mode 100644
index 0000000..9d08818
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/package-info.java
@@ -0,0 +1,17 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.globalpolicygenerator.globalqueues;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/717874a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/GlobalQueueTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/GlobalQueueTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/GlobalQueueTestUtil.java
new file mode 100644
index 0000000..731becd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/GlobalQueueTestUtil.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.globalpolicygenerator.globalqueues;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+
+/**
+ * This class provides support methods for all global queue tests.
+ */
+public final class GlobalQueueTestUtil {
+
+ private GlobalQueueTestUtil() {
+ }
+
+ public static List<FederationQueue> generateFedCluster() throws IOException {
+ int numSubclusters = 20;
+
+ List<FederationQueue> toMerge = new ArrayList<>();
+ String queueJson = loadFile("globalqueues/tree-queue.json");
+ for (int i = 0; i < numSubclusters; i++) {
+ FederationQueue temp = parseQueue(queueJson);
+ toMerge.add(temp);
+ }
+ return toMerge;
+ }
+
+ public static String loadFile(String filename) throws IOException {
+ ClassLoader cL = Thread.currentThread().getContextClassLoader();
+ if (cL == null) {
+ cL = Configuration.class.getClassLoader();
+ }
+ URL submitURI = cL.getResource(filename);
+
+ return FileUtils.readFileToString(new File(submitURI.getFile()),
+ Charset.defaultCharset());
+ }
+
+ public static String toJSONString(Resource resInf) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("{");
+ builder.append("\"memory\"");
+ builder.append(" : ");
+ builder.append(resInf.getMemorySize());
+ builder.append(", ");
+ builder.append("\"vCores\"");
+ builder.append(" : ");
+ builder.append(resInf.getVirtualCores());
+ builder.append("}");
+ return builder.toString();
+ }
+
+ public static FederationQueue parseQueue(String queueJson)
+ throws IOException {
+ JsonFactory jsonF = new JsonFactory();
+ ObjectMapper mapper = new ObjectMapper();
+ Map jsonMap = mapper.readValue(jsonF.createParser(queueJson), Map.class);
+ FederationQueue fq = parseFedQueue(jsonMap);
+ fq.propagateCapacities();
+ return fq;
+ }
+
+ private static FederationQueue parseFedQueue(Map jsonMap) {
+ FederationQueue fq = new FederationQueue(new Configuration());
+ fq.setQueueName(jsonMap.get("queueName").toString());
+ fq.setSubClusterId(SubClusterId.newInstance(
+ ((Map)(jsonMap.get("scope"))).get("id").toString()));
+ if (jsonMap.get("guarCap") != null) {
+ fq.setGuarCap(parseResource((Map)jsonMap.get("guarCap")));
+ }
+ if (jsonMap.get("maxCap") != null) {
+ fq.setMaxCap(parseResource((Map)jsonMap.get("maxCap")));
+ }
+ if (jsonMap.get("usedCap") != null) {
+ fq.setUsedCap(parseResource((Map)jsonMap.get("usedCap")));
+ }
+ if (jsonMap.get("totCap") != null) {
+ fq.setTotCap(parseResource((Map)jsonMap.get("totCap")));
+ }
+ if (jsonMap.get("demandCap") != null) {
+ fq.setDemandCap(parseResource((Map)jsonMap.get("demandCap")));
+ }
+ if (jsonMap.get("children") != null) {
+ List children = (List) jsonMap.get("children");
+ Map<String, FederationQueue> childrenFedQueue = new HashMap<>();
+ for (Object o : children) {
+ FederationQueue child = parseFedQueue((Map)(o));
+ childrenFedQueue.put(child.getQueueName(), child);
+ }
+ fq.setChildren(childrenFedQueue);
+ }
+ return fq;
+ }
+
+ private static Resource parseResource(Map resMap) {
+ Resource res = Resource.newInstance(0, 0);
+ if (resMap.get("memory") != null) {
+ res.setMemorySize(Integer.parseInt(resMap.get("memory").toString()));
+ }
+ if (resMap.get("vCores") != null) {
+ res.setVirtualCores(Integer.parseInt(resMap.get("vCores").toString()));
+ }
+ return res;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/717874a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/TestFederationQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/TestFederationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/TestFederationQueue.java
new file mode 100644
index 0000000..d20c631
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/globalqueues/TestFederationQueue.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.globalqueues;
+
+import static org.apache.hadoop.yarn.server.globalpolicygenerator.globalqueues.GlobalQueueTestUtil.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class provides simple tests for the {@code FederationQueue} data class.
+ */
+public class TestFederationQueue {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestFederationQueue.class);
+
+ @Test
+ public void testParseQueue() throws Exception {
+ String queueJson = loadFile("globalqueues/tree-queue.json");
+ FederationQueue fq = parseQueue(queueJson);
+ fq.validate();
+ Assert.assertEquals("root", fq.getQueueName());
+ Assert.assertEquals(2, fq.getChildren().size());
+ Assert.assertEquals(100000, fq.getGuarCap().getMemorySize());
+
+ FederationQueue queueA = fq.getChildByName("A");
+ Assert.assertEquals(2, queueA.getChildren().size());
+ Assert.assertEquals(750, queueA.getUsedCap().getVirtualCores());
+ }
+
+
+ @Test
+ public void testMergeFedQueue() throws Exception {
+ List<FederationQueue> toMerge = generateFedCluster();
+
+ FederationQueue merged =
+ FederationQueue.mergeQueues(toMerge,
+ SubClusterId.newInstance("merged"));
+
+ merged.propagateCapacities();
+ merged.validate();
+ LOG.info(merged.toQuickString());
+ }
+
+ @Test
+ public void testPropagateFedQueue() throws Exception {
+
+ String queueJson = loadFile("globalqueues/tree-queue-adaptable.json");
+
+ int numSubclusters = 10;
+
+ Resource guar = Resource.newInstance(5 * 1024, 10);
+ Resource max = Resource.newInstance(8 * 1024, 10);
+ Resource used = Resource.newInstance(4 * 1024, 10);
+ Resource dem = Resource.newInstance(1 * 1024, 10);
+
+ List<FederationQueue> toMerge = new ArrayList<>();
+ for (int i = 0; i < numSubclusters; i++) {
+ queueJson = String.format(queueJson, "A1", toJSONString(guar),
+ toJSONString(max), toJSONString(used), toJSONString(dem));
+ FederationQueue temp = parseQueue(queueJson);
+ temp.propagateCapacities();
+ temp.validate();
+ toMerge.add(temp);
+ }
+
+ FederationQueue merged =
+ FederationQueue.mergeQueues(toMerge,
+ SubClusterId.newInstance("merged"));
+
+ merged.propagateCapacities();
+ merged.validate();
+
+ LOG.info(merged.toQuickString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/717874a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/globalqueues/basic-queue.json
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/globalqueues/basic-queue.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/globalqueues/basic-queue.json
new file mode 100644
index 0000000..14f10f9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/globalqueues/basic-queue.json
@@ -0,0 +1,9 @@
+{
+ "queueName" : "%s",
+ "totCap" : %s,
+ "guarCap" : %s,
+ "maxCap" : %s,
+ "usedCap" : %s,
+ "demandCap" : %s,
+ "scope" : {"id" : "sc1"}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/717874a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/globalqueues/tree-queue-adaptable.json
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/globalqueues/tree-queue-adaptable.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/globalqueues/tree-queue-adaptable.json
new file mode 100644
index 0000000..b1cb43b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/globalqueues/tree-queue-adaptable.json
@@ -0,0 +1,96 @@
+{
+ "queueName": "root",
+ "scope" : {"id" : "sc1"},
+ "totCap": {
+ "memory": "100000",
+ "vCores": "1000"
+ },
+ "guarCap": {
+ "memory": "100000",
+ "vCores": "1000"
+ },
+ "maxCap": {
+ "memory": "100000",
+ "vCores": "1000"
+ },
+ "usedCap": {
+ "memory": "50000",
+ "vCores": "500"
+ },
+ "demandCap": {
+ "memory": "1000",
+ "vCores": "10"
+ },
+ "children": [
+ {
+ "queueName": "A",
+ "scope" : {"id" : "sc1"},
+ "guarCap": {
+ "memory": "50000",
+ "vCores": "500"
+ },
+ "maxCap": {
+ "memory": "100000",
+ "vCores": "1000"
+ },
+ "usedCap": {
+ "memory": "25000",
+ "vCores": "250"
+ },
+ "demandCap": {
+ "memory": "0",
+ "vCores": "0"
+ },
+ "children": [
+ {
+ "queueName": "%s",
+ "guarCap": %s,
+ "maxCap": %s,
+ "usedCap": %s,
+ "demandCap": %s,
+ "scope" : {"id" : "sc1"}
+ },
+ {
+ "queueName": "A2",
+ "scope" : {"id" : "sc1"},
+ "guarCap": {
+ "memory": "0",
+ "vCores": "0"
+ },
+ "maxCap": {
+ "memory": "100000",
+ "vCores": "1000"
+ },
+ "usedCap": {
+ "memory": "0",
+ "vCores": "0"
+ },
+ "demandCap": {
+ "memory": "0",
+ "vCores": "0"
+ }
+ }
+ ]
+ },
+ {
+ "queueName": "B",
+ "scope" : {"id" : "sc1"},
+ "guarCap": {
+ "memory": "50000",
+ "vCores": "500"
+ },
+ "maxCap": {
+ "memory": "100000",
+ "vCores": "1000"
+ },
+ "usedCap": {
+ "memory": "50000",
+ "vCores": "500"
+ },
+ "demandCap": {
+ "memory": "1000",
+ "vCores": "10"
+ }
+ }
+ ]
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/717874a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/globalqueues/tree-queue.json
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/globalqueues/tree-queue.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/globalqueues/tree-queue.json
new file mode 100644
index 0000000..e3f2532
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/globalqueues/tree-queue.json
@@ -0,0 +1,128 @@
+{
+ "queueName": "root",
+ "scope" : {"id" : "sc1"},
+ "totCap": {
+ "memory": "100000",
+ "vCores": "1000"
+ },
+ "guarCap": {
+ "memory": "100000",
+ "vCores": "1000"
+ },
+ "maxCap": {
+ "memory": "100000",
+ "vCores": "1000"
+ },
+ "usedCap": {
+ "memory": "100000",
+ "vCores": "1000"
+ },
+ "demandCap": {
+ "memory": "1000",
+ "vCores": "10"
+ },
+ "testExpectedToBePreempted": {
+ "memory": "0",
+ "vCores": "0"
+ },
+ "children": [
+ {
+ "queueName": "A",
+ "scope" : {"id" : "sc1"},
+ "guarCap": {
+ "memory": "50000",
+ "vCores": "500"
+ },
+ "maxCap": {
+ "memory": "100000",
+ "vCores": "1000"
+ },
+ "usedCap": {
+ "memory": "75000",
+ "vCores": "750"
+ },
+ "demandCap": {
+ "memory": "0",
+ "vCores": "0"
+ },
+ "testExpectedToBePreempted": {
+ "memory": "10000",
+ "vCores": "100"
+ },
+ "children": [
+ {
+ "queueName": "A1",
+ "scope" : {"id" : "sc1"},
+ "guarCap": {
+ "memory": "25000",
+ "vCores": "250"
+ },
+ "maxCap": {
+ "memory": "100000",
+ "vCores": "1000"
+ },
+ "usedCap": {
+ "memory": "50000",
+ "vCores": "500"
+ },
+ "demandCap": {
+ "memory": "0",
+ "vCores": "0"
+ },
+ "testExpectedToBePreempted": {
+ "memory": "10000",
+ "vCores": "100"
+ }
+ },
+ {
+ "queueName": "A2",
+ "scope" : {"id" : "sc1"},
+ "guarCap": {
+ "memory": "25000",
+ "vCores": "250"
+ },
+ "maxCap": {
+ "memory": "100000",
+ "vCores": "1000"
+ },
+ "usedCap": {
+ "memory": "25000",
+ "vCores": "250"
+ },
+ "demandCap": {
+ "memory": "0",
+ "vCores": "0"
+ },
+ "testExpectedToBePreempted": {
+ "memory": "0",
+ "vCores": "0"
+ }
+ }
+ ]
+ },
+ {
+ "queueName": "B",
+ "scope" : {"id" : "sc1"},
+ "guarCap": {
+ "memory": "50000",
+ "vCores": "500"
+ },
+ "maxCap": {
+ "memory": "100000",
+ "vCores": "1000"
+ },
+ "usedCap": {
+ "memory": "25000",
+ "vCores": "250"
+ },
+ "demandCap": {
+ "memory": "10000",
+ "vCores": "100"
+ },
+ "testExpectedToBePreempted": {
+ "memory": "0",
+ "vCores": "0"
+ }
+ }
+ ]
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org