You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2022/03/03 15:44:26 UTC
[hadoop] branch trunk updated: YARN-10947. Simplify AbstractCSQueue#initializeQueueState. Contributed by Andras Gyori
This is an automated email from the ASF dual-hosted git repository.
snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 379baa5 YARN-10947. Simplify AbstractCSQueue#initializeQueueState. Contributed by Andras Gyori
379baa5 is described below
commit 379baa5eb65a65df50461f337b88ff13f2134aeb
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Thu Mar 3 16:44:12 2022 +0100
YARN-10947. Simplify AbstractCSQueue#initializeQueueState. Contributed by Andras Gyori
---
.../scheduler/capacity/AbstractCSQueue.java | 56 +------------
.../scheduler/capacity/CSQueueUtils.java | 14 +---
.../scheduler/capacity/QueueStateHelper.java | 98 ++++++++++++++++++++++
3 files changed, 101 insertions(+), 67 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index e924932..b0ab336 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -349,7 +349,7 @@ public abstract class AbstractCSQueue implements CSQueue {
// Initialize the queue state based on previous state, configured state
// and its parent state
- initializeQueueState();
+ QueueStateHelper.setQueueState(this);
authorizer = YarnAuthorizationProvider.getInstance(configuration);
@@ -553,60 +553,6 @@ public abstract class AbstractCSQueue implements CSQueue {
return configuredCapacityVectors.get(label);
}
- private void initializeQueueState() {
- QueueState previousState = getState();
- QueueState configuredState = queueContext.getConfiguration()
- .getConfiguredState(getQueuePath());
- QueueState parentState = (parent == null) ? null : parent.getState();
-
- // verify that we can not any value for State other than RUNNING/STOPPED
- if (configuredState != null && configuredState != QueueState.RUNNING
- && configuredState != QueueState.STOPPED) {
- throw new IllegalArgumentException("Invalid queue state configuration."
- + " We can only use RUNNING or STOPPED.");
- }
- // If we did not set state in configuration, use Running as default state
- QueueState defaultState = QueueState.RUNNING;
-
- if (previousState == null) {
- // If current state of the queue is null, we would inherit the state
- // from its parent. If this queue does not has parent, such as root queue,
- // we would use the configured state.
- if (parentState == null) {
- updateQueueState((configuredState == null) ? defaultState
- : configuredState);
- } else {
- if (configuredState == null) {
- updateQueueState((parentState == QueueState.DRAINING) ?
- QueueState.STOPPED : parentState);
- } else if (configuredState == QueueState.RUNNING
- && parentState != QueueState.RUNNING) {
- throw new IllegalArgumentException(
- "The parent queue:" + parent.getQueuePath()
- + " cannot be STOPPED as the child queue:" + getQueuePath()
- + " is in RUNNING state.");
- } else {
- updateQueueState(configuredState);
- }
- }
- } else {
- // when we get a refreshQueue request from AdminService,
- if (previousState == QueueState.RUNNING) {
- if (configuredState == QueueState.STOPPED) {
- stopQueue();
- }
- } else {
- if (configuredState == QueueState.RUNNING) {
- try {
- activateQueue();
- } catch (YarnException ex) {
- throw new IllegalArgumentException(ex.getMessage());
- }
- }
- }
- }
- }
-
protected QueueInfo getQueueInfo() {
// Deliberately doesn't use lock here, because this method will be invoked
// from schedulerApplicationAttempt, to avoid deadlock, sacrifice
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
index 244bb62..c6d50a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
@@ -19,11 +19,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.Set;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.util.Sets;
+import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -302,15 +303,4 @@ public class CSQueueUtils {
}
}
}
-
- public static ApplicationPlacementContext extractQueuePath(String queuePath) {
- int parentQueueNameEndIndex = queuePath.lastIndexOf(".");
- if (parentQueueNameEndIndex > -1) {
- String parent = queuePath.substring(0, parentQueueNameEndIndex).trim();
- String leaf = queuePath.substring(parentQueueNameEndIndex + 1).trim();
- return new ApplicationPlacementContext(leaf, parent);
- } else{
- return new ApplicationPlacementContext(queuePath);
- }
- }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueStateHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueStateHelper.java
new file mode 100644
index 0000000..5ec7d01
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueStateHelper.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import java.util.Set;
+
+/**
+ * Collects all logic that are handling queue state transitions.
+ */
+public final class QueueStateHelper {
+ private static final Set<QueueState> VALID_STATE_CONFIGURATIONS = ImmutableSet.of(
+ QueueState.RUNNING, QueueState.STOPPED);
+ private static final QueueState DEFAULT_STATE = QueueState.RUNNING;
+
+ private QueueStateHelper() {}
+
+ /**
+ * Sets the current state of the queue based on its previous state, its parent's state and its
+ * configured state.
+ * @param queue the queue whose state is set
+ */
+ public static void setQueueState(AbstractCSQueue queue) {
+ QueueState previousState = queue.getState();
+ QueueState configuredState = queue.getQueueContext().getConfiguration().getConfiguredState(
+ queue.getQueuePath());
+ QueueState parentState = (queue.getParent() == null) ? null : queue.getParent().getState();
+
+ // verify that we can not any value for State other than RUNNING/STOPPED
+ if (configuredState != null && !VALID_STATE_CONFIGURATIONS.contains(configuredState)) {
+ throw new IllegalArgumentException("Invalid queue state configuration."
+ + " We can only use RUNNING or STOPPED.");
+ }
+
+ if (previousState == null) {
+ initializeState(queue, configuredState, parentState);
+ } else {
+ reinitializeState(queue, previousState, configuredState);
+ }
+ }
+
+ private static void reinitializeState(
+ AbstractCSQueue queue, QueueState previousState, QueueState configuredState) {
+ // when we get a refreshQueue request from AdminService,
+ if (previousState == QueueState.RUNNING) {
+ if (configuredState == QueueState.STOPPED) {
+ queue.stopQueue();
+ }
+ } else {
+ if (configuredState == QueueState.RUNNING) {
+ try {
+ queue.activateQueue();
+ } catch (YarnException ex) {
+ throw new IllegalArgumentException(ex.getMessage());
+ }
+ }
+ }
+ }
+
+ private static void initializeState(
+ AbstractCSQueue queue, QueueState configuredState, QueueState parentState) {
+ QueueState currentState = configuredState == null ? DEFAULT_STATE : configuredState;
+
+ if (parentState != null) {
+ if (configuredState == QueueState.RUNNING && parentState != QueueState.RUNNING) {
+ throw new IllegalArgumentException(
+ "The parent queue:" + queue.getParent().getQueuePath()
+ + " cannot be STOPPED as the child queue:" + queue.getQueuePath()
+ + " is in RUNNING state.");
+ }
+
+ if (configuredState == null) {
+ currentState = parentState == QueueState.DRAINING ? QueueState.STOPPED : parentState;
+ }
+ }
+
+ queue.updateQueueState(currentState);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org