You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2022/03/09 18:36:30 UTC

[hadoop] branch trunk updated: YARN-10918. Simplify method: CapacitySchedulerQueueManager#parseQueue. Contributed by Andras Gyori

This is an automated email from the ASF dual-hosted git repository.

snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new db8ae4b  YARN-10918. Simplify method: CapacitySchedulerQueueManager#parseQueue. Contributed by Andras Gyori
db8ae4b is described below

commit db8ae4b65448c506c9234641b2c1f9b8e894dc18
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Wed Mar 9 19:36:09 2022 +0100

    YARN-10918. Simplify method: CapacitySchedulerQueueManager#parseQueue. Contributed by Andras Gyori
---
 .../scheduler/capacity/CSQueue.java                |   7 ++
 .../capacity/CapacitySchedulerQueueManager.java    | 116 ++++++++-------------
 .../scheduler/capacity/PlanQueue.java              |  19 ++++
 .../scheduler/capacity/ReservationQueue.java       |   6 ++
 4 files changed, 74 insertions(+), 74 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 90cb4f3..e2aeaab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -95,6 +95,13 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
    */
   QueuePath getQueuePathObject();
 
+  /**
+   * Checks whether the queue is a dynamic queue (created dynamically in the fashion of auto queue
+   * creation v2).
+   * @return true, if it is a dynamic queue, false otherwise
+   */
+  boolean isDynamicQueue();
+
   public PrivilegedEntity getPrivilegedEntity();
 
   Resource getMaximumAllocation();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
index 50e8d52..29e5de0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
@@ -43,13 +43,11 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.Permission;
 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
 
 import org.apache.hadoop.classification.VisibleForTesting;
@@ -231,99 +229,62 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
    * @throws IOException
    */
   static CSQueue parseQueue(
-      CapacitySchedulerQueueContext queueContext,
-      CapacitySchedulerConfiguration conf,
-      CSQueue parent, String queueName,
-      CSQueueStore newQueues,
-      CSQueueStore oldQueues,
+      CapacitySchedulerQueueContext queueContext, CapacitySchedulerConfiguration conf,
+      CSQueue parent, String queueName, CSQueueStore newQueues, CSQueueStore oldQueues,
       QueueHook hook) throws IOException {
     CSQueue queue;
-    String fullQueueName = (parent == null) ?
-        queueName :
-        (parent.getQueuePath() + "." + queueName);
+    String fullQueueName = (parent == null) ? queueName :
+        (QueuePath.createFromQueues(parent.getQueuePath(), queueName).getFullPath());
     String[] staticChildQueueNames = conf.getQueues(fullQueueName);
     List<String> childQueueNames = staticChildQueueNames != null ?
         Arrays.asList(staticChildQueueNames) : Collections.emptyList();
-
-    boolean isReservableQueue = conf.isReservable(fullQueueName);
-    boolean isAutoCreateEnabled = conf.isAutoCreateChildQueueEnabled(
-        fullQueueName);
-    // if a queue is eligible for auto queue creation v2
-    // it must be a ParentQueue (even if it is empty)
-    boolean isAutoQueueCreationV2Enabled = conf.isAutoQueueCreationV2Enabled(
-        fullQueueName);
-    boolean isDynamicParent = false;
-
-    // Auto created parent queues might not have static children, but they
-    // must be kept as a ParentQueue
     CSQueue oldQueue = oldQueues.get(fullQueueName);
-    if (oldQueue instanceof ParentQueue) {
-      isDynamicParent = ((ParentQueue) oldQueue).isDynamicQueue();
-    }
 
-    if (childQueueNames.size() == 0 && !isDynamicParent &&
-        !isAutoQueueCreationV2Enabled) {
-      if (null == parent) {
-        throw new IllegalStateException(
-            "Queue configuration missing child queue names for " + queueName);
-      }
-      // Check if the queue will be dynamically managed by the Reservation
-      // system
+    boolean isReservableQueue = conf.isReservable(fullQueueName);
+    boolean isAutoCreateEnabled = conf.isAutoCreateChildQueueEnabled(fullQueueName);
+    // if a queue is eligible for auto queue creation v2 it must be a ParentQueue
+    // (even if it is empty)
+    final boolean isDynamicParent = oldQueue instanceof ParentQueue && oldQueue.isDynamicQueue();
+    boolean isAutoQueueCreationEnabledParent = isDynamicParent || conf.isAutoQueueCreationV2Enabled(
+        fullQueueName) || isAutoCreateEnabled;
+
+    if (childQueueNames.size() == 0 && !isAutoQueueCreationEnabledParent) {
+      validateParent(parent, queueName);
+      // Check if the queue will be dynamically managed by the Reservation system
       if (isReservableQueue) {
-        queue = new PlanQueue(queueContext, queueName, parent,
-            oldQueues.get(fullQueueName));
-
-        //initializing the "internal" default queue, for SLS compatibility
-        String defReservationId =
-            queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
-
-        List<CSQueue> childQueues = new ArrayList<>();
-        ReservationQueue resQueue = new ReservationQueue(queueContext,
-            defReservationId, (PlanQueue) queue);
-        try {
-          resQueue.setEntitlement(new QueueEntitlement(1.0f, 1.0f));
-        } catch (SchedulerDynamicEditException e) {
-          throw new IllegalStateException(e);
-        }
-        childQueues.add(resQueue);
-        ((PlanQueue) queue).setChildQueues(childQueues);
-        newQueues.add(resQueue);
-
-      } else if (isAutoCreateEnabled) {
-        queue = new ManagedParentQueue(queueContext, queueName, parent,
-            oldQueues.get(fullQueueName));
-
-      } else{
-        queue = new LeafQueue(queueContext, queueName, parent,
-            oldQueues.get(fullQueueName));
-        // Used only for unit tests
-        queue = hook.hook(queue);
+        queue = new PlanQueue(queueContext, queueName, parent, oldQueues.get(fullQueueName));
+        ReservationQueue defaultResQueue = ((PlanQueue) queue).initializeDefaultInternalQueue();
+        newQueues.add(defaultResQueue);
+      } else {
+        queue = new LeafQueue(queueContext, queueName, parent, oldQueues.get(fullQueueName));
       }
-    } else{
+
+      queue = hook.hook(queue);
+    } else {
       if (isReservableQueue) {
-        throw new IllegalStateException(
-            "Only Leaf Queues can be reservable for " + fullQueueName);
+        throw new IllegalStateException("Only Leaf Queues can be reservable for " + fullQueueName);
       }
 
       ParentQueue parentQueue;
       if (isAutoCreateEnabled) {
-        parentQueue = new ManagedParentQueue(queueContext, queueName, parent,
-            oldQueues.get(fullQueueName));
-      } else{
-        parentQueue = new ParentQueue(queueContext, queueName, parent,
-            oldQueues.get(fullQueueName));
+        parentQueue = new ManagedParentQueue(queueContext, queueName, parent, oldQueues.get(
+            fullQueueName));
+      } else {
+        parentQueue = new ParentQueue(queueContext, queueName, parent, oldQueues.get(
+            fullQueueName));
       }
 
-      // Used only for unit tests
       queue = hook.hook(parentQueue);
-
       List<CSQueue> childQueues = new ArrayList<>();
       for (String childQueueName : childQueueNames) {
-        CSQueue childQueue = parseQueue(queueContext, conf, queue, childQueueName,
-            newQueues, oldQueues, hook);
+        CSQueue childQueue = parseQueue(queueContext, conf, queue, childQueueName, newQueues,
+            oldQueues, hook);
         childQueues.add(childQueue);
       }
-      parentQueue.setChildQueues(childQueues);
+
+      if (!childQueues.isEmpty()) {
+        parentQueue.setChildQueues(childQueues);
+      }
 
     }
 
@@ -721,4 +682,11 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
     // that existingQueues contain valid dynamic queues.
     return !isDynamicQueue(parent);
   }
+
+  private static void validateParent(CSQueue parent, String queueName) {
+    if (parent == null) {
+      throw new IllegalStateException("Queue configuration missing child queue names for "
+          + queueName);
+    }
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
index cca46f5..3bbbffe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
@@ -21,8 +21,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 import java.io.IOException;
 
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
 
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,6 +125,23 @@ public class PlanQueue extends AbstractManagedParentQueue {
     }
   }
 
+  public ReservationQueue initializeDefaultInternalQueue() throws IOException {
+    //initializing the "internal" default queue, for SLS compatibility
+    String defReservationId =
+        getQueueName() + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
+
+    ReservationQueue resQueue = new ReservationQueue(queueContext,
+        defReservationId, this);
+    try {
+      resQueue.initializeEntitlements();
+    } catch (SchedulerDynamicEditException e) {
+      throw new IllegalStateException(e);
+    }
+    childQueues.add(resQueue);
+
+    return resQueue;
+  }
+
   private void updateQuotas(float newUserLimit, float newUserLimitFactor,
       int newMaxAppsForReservation, int newMaxAppsPerUserForReservation) {
     this.userLimit = newUserLimit;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
index 7b3144b..37cf425 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,6 +76,10 @@ public class ReservationQueue extends AbstractAutoCreatedLeafQueue {
     }
   }
 
+  public void initializeEntitlements() throws SchedulerDynamicEditException {
+    setEntitlement(new QueueEntitlement(1.0f, 1.0f));
+  }
+
   private void updateQuotas(float userLimit, float userLimitFactor,
       int maxAppsForReservation, int maxAppsPerUserForReservation) {
     setUserLimit(userLimit);

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org