You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by pb...@apache.org on 2021/03/04 16:24:41 UTC
[hadoop] branch trunk updated: YARN-10532. Capacity Scheduler Auto
Queue Creation: Allow auto delete queue when queue is not being used.
Contributed by Qi Zhu.
This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6699198 YARN-10532. Capacity Scheduler Auto Queue Creation: Allow auto delete queue when queue is not being used. Contributed by Qi Zhu.
6699198 is described below
commit 6699198b54bf6360c164a6ce7552c8b91a318c59
Author: Peter Bacsko <pb...@cloudera.com>
AuthorDate: Thu Mar 4 17:18:35 2021 +0100
YARN-10532. Capacity Scheduler Auto Queue Creation: Allow auto delete queue when queue is not being used. Contributed by Qi Zhu.
---
.../scheduler/capacity/AbstractCSQueue.java | 47 ++++
.../capacity/AutoCreatedQueueDeletionPolicy.java | 163 +++++++++++
.../scheduler/capacity/CapacityScheduler.java | 64 ++++-
.../capacity/CapacitySchedulerConfiguration.java | 56 ++++
.../scheduler/capacity/LeafQueue.java | 10 +
.../scheduler/capacity/ParentQueue.java | 34 ++-
...ype.java => AutoCreatedQueueDeletionEvent.java} | 49 +---
.../scheduler/event/SchedulerEventType.java | 5 +-
.../monitor/TestSchedulingMonitor.java | 43 +++
.../TestAutoCreatedQueueDeletionPolicy.java | 184 +++++++++++++
.../TestCapacitySchedulerNewQueueAutoCreation.java | 303 ++++++++++++++++++++-
11 files changed, 910 insertions(+), 48 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 28e2d54..e5380fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -154,6 +155,10 @@ public abstract class AbstractCSQueue implements CSQueue {
// is it a dynamic queue?
private boolean dynamicQueue = false;
+ // The timestamp of the last submitted application to this queue.
+ // Only applies to dynamic queues.
+ private long lastSubmittedTimestamp;
+
public AbstractCSQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
this(cs, cs.getConfiguration(), queueName, parent, old);
@@ -1642,4 +1647,46 @@ public abstract class AbstractCSQueue implements CSQueue {
return "capacity=" + queueCapacities.getCapacity();
}
}
+
+ public boolean isEligibleForAutoDeletion() {
+ return false;
+ }
+
+ public boolean isInactiveDynamicQueue() {
+ long idleDurationSeconds =
+ (Time.monotonicNow() - getLastSubmittedTimestamp())/1000;
+ return isDynamicQueue() && isEligibleForAutoDeletion() &&
+ (idleDurationSeconds > this.csContext.getConfiguration().
+ getAutoExpiredDeletionTime());
+ }
+
+ public void updateLastSubmittedTimeStamp() {
+ writeLock.lock();
+ try {
+ this.lastSubmittedTimestamp = Time.monotonicNow();
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public long getLastSubmittedTimestamp() {
+ readLock.lock();
+
+ try {
+ return lastSubmittedTimestamp;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @VisibleForTesting
+ public void setLastSubmittedTimestamp(long lastSubmittedTimestamp) {
+ writeLock.lock();
+ try {
+ this.lastSubmittedTimestamp = lastSubmittedTimestamp;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueDeletionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueDeletionPolicy.java
new file mode 100644
index 0000000..4b47bb4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueDeletionPolicy.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AutoCreatedQueueDeletionEvent;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Auto deletion policy for auto created queue V2.
+ * Just for weight based auto created queues.
+ */
+public class AutoCreatedQueueDeletionPolicy implements SchedulingEditPolicy {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AutoCreatedQueueDeletionPolicy.class);
+
+ private Clock clock;
+
+ // Pointer to other RM components
+ private RMContext rmContext;
+ private ResourceCalculator rc;
+ private CapacityScheduler scheduler;
+
+ private long monitoringInterval;
+
+ // markedForDeletion: in each interval,
+ // this set is extended by queues that are eligible for auto deletion.
+ private Set<String> markedForDeletion = new HashSet<>();
+ // sentForDeletion: if in the next interval,
+ // there is queue, that is eligible for auto deletion,
+ // and is already marked for deletion, move it to this queue.
+ private Set<String> sentForDeletion = new HashSet<>();
+
+ @Override
+ public void init(final Configuration config, final RMContext context,
+ final ResourceScheduler sched) {
+ LOG.info("Auto Deletion Policy monitor: {}" + this.
+ getClass().getCanonicalName());
+ if (!(sched instanceof CapacityScheduler)) {
+ throw new YarnRuntimeException("Class " +
+ sched.getClass().getCanonicalName() + " not instance of " +
+ CapacityScheduler.class.getCanonicalName());
+ }
+ rmContext = context;
+ scheduler = (CapacityScheduler) sched;
+ clock = scheduler.getClock();
+
+ rc = scheduler.getResourceCalculator();
+
+ CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration();
+
+ // The monitor time will equal the
+ // auto deletion expired time default.
+ monitoringInterval =
+ csConfig.getLong(CapacitySchedulerConfiguration.
+ AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME,
+ CapacitySchedulerConfiguration.
+ DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME) * 1000;
+
+ prepareForAutoDeletion();
+ }
+
+ public void prepareForAutoDeletion() {
+ Set<String> newMarks = new HashSet<>();
+ for (Map.Entry<String, CSQueue> queueEntry :
+ scheduler.getCapacitySchedulerQueueManager().getQueues().entrySet()) {
+ String queuePath = queueEntry.getKey();
+ CSQueue queue = queueEntry.getValue();
+ if (queue instanceof AbstractCSQueue &&
+ ((AbstractCSQueue) queue).isEligibleForAutoDeletion()) {
+ if (markedForDeletion.contains(queuePath)) {
+ sentForDeletion.add(queuePath);
+ markedForDeletion.remove(queuePath);
+ } else {
+ newMarks.add(queuePath);
+ }
+ }
+ }
+ markedForDeletion.clear();
+ markedForDeletion.addAll(newMarks);
+ }
+
+ @Override
+ public void editSchedule() {
+ long startTs = clock.getTime();
+
+ prepareForAutoDeletion();
+ triggerAutoDeletionForExpiredQueues();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms.");
+ }
+ }
+
+ public void triggerAutoDeletionForExpiredQueues() {
+ // Proceed new auto created queues
+ for (String queueName : sentForDeletion) {
+ CSQueue checkQueue =
+ scheduler.getCapacitySchedulerQueueManager().
+ getQueue(queueName);
+ deleteAutoCreatedQueue(checkQueue);
+ }
+ sentForDeletion.clear();
+ }
+
+ private void deleteAutoCreatedQueue(CSQueue queue) {
+ if (queue != null) {
+ AutoCreatedQueueDeletionEvent autoCreatedQueueDeletionEvent =
+ new AutoCreatedQueueDeletionEvent(queue);
+ LOG.info("Queue:" + queue.getQueuePath() +
+ " will trigger deletion event to CS.");
+ scheduler.getRMContext().getDispatcher().getEventHandler().handle(
+ autoCreatedQueueDeletionEvent);
+ }
+ }
+
+ @Override
+ public long getMonitoringInterval() {
+ return monitoringInterval;
+ }
+
+ @Override
+ public String getPolicyName() {
+ return AutoCreatedQueueDeletionPolicy.class.getCanonicalName();
+ }
+
+ @VisibleForTesting
+ public Set<String> getMarkedForDeletion() {
+ return markedForDeletion;
+ }
+
+ @VisibleForTesting
+ public Set<String> getSentForDeletion() {
+ return sentForDeletion;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index ee91b0c..467dacb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -143,9 +143,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsU
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
.QueueManagementChangeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AutoCreatedQueueDeletionEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
@@ -2106,11 +2106,35 @@ public class CapacityScheduler extends
}
}
break;
+ case AUTO_QUEUE_DELETION:
+ try {
+ AutoCreatedQueueDeletionEvent autoCreatedQueueDeletionEvent =
+ (AutoCreatedQueueDeletionEvent) event;
+ removeAutoCreatedQueue(autoCreatedQueueDeletionEvent.
+ getCheckQueue());
+ } catch (SchedulerDynamicEditException sde) {
+ LOG.error("Dynamic queue deletion cannot be applied for "
+ + "queue : ", sde);
+ }
+ break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
}
+ private void removeAutoCreatedQueue(CSQueue checkQueue)
+ throws SchedulerDynamicEditException{
+ writeLock.lock();
+ try {
+ if (checkQueue instanceof AbstractCSQueue
+ && ((AbstractCSQueue) checkQueue).isInactiveDynamicQueue()) {
+ removeQueue(checkQueue);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
private void updateNodeAttributes(
NodeAttributesUpdateSchedulerEvent attributeUpdateEvent) {
writeLock.lock();
@@ -2564,6 +2588,44 @@ public class CapacityScheduler extends
}
}
+ public void removeQueue(CSQueue queue)
+ throws SchedulerDynamicEditException {
+ writeLock.lock();
+ try {
+ LOG.info("Removing queue: " + queue.getQueuePath());
+ if (!((AbstractCSQueue)queue).isDynamicQueue()) {
+ throw new SchedulerDynamicEditException(
+ "The queue that we are asked "
+ + "to remove (" + queue.getQueuePath()
+ + ") is not a DynamicQueue");
+ }
+
+ if (!((AbstractCSQueue) queue).isEligibleForAutoDeletion()) {
+ LOG.warn("Queue " + queue.getQueuePath() +
+ " is marked for deletion, but not eligible for deletion");
+ return;
+ }
+
+ ParentQueue parentQueue = (ParentQueue)queue.getParent();
+ if (parentQueue != null) {
+ ((ParentQueue) queue.getParent()).removeChildQueue(queue);
+ } else {
+ throw new SchedulerDynamicEditException(
+ "The queue " + queue.getQueuePath()
+ + " can't be removed because it's parent is null");
+ }
+
+ if (parentQueue.childQueues.contains(queue) ||
+ queueManager.getQueue(queue.getQueuePath()) != null) {
+ throw new SchedulerDynamicEditException(
+ "The queue " + queue.getQueuePath()
+ + " has not been removed normally.");
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
@Override
public void addQueue(Queue queue)
throws SchedulerDynamicEditException, IOException {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index b66ab85..90979dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -2200,6 +2200,62 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final long DEFAULT_QUEUE_MANAGEMENT_MONITORING_INTERVAL =
1500L;
+ @Private
+ public static final boolean
+ DEFAULT_AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE = true;
+
+ @Private
+ public static final String AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE =
+ AUTO_QUEUE_CREATION_V2_PREFIX + "queue-auto-removal.enable";
+
+ // 300s for expired default
+ @Private
+ public static final long
+ DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME = 300;
+
+ @Private
+ public static final String AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME =
+ PREFIX + AUTO_QUEUE_CREATION_V2_PREFIX + "queue-expiration-time";
+
+ /**
+ * If true, auto created queue with weight mode
+ * will be deleted when queue is expired.
+ * @param queuePath the queue's path for auto deletion check
+ * @return true if auto created queue's deletion when expired is enabled
+ * else false. Default
+ * is true.
+ */
+ @Private
+ public boolean isAutoExpiredDeletionEnabled(String queuePath) {
+ boolean isAutoExpiredDeletionEnabled = getBoolean(
+ getQueuePrefix(queuePath) +
+ AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE,
+ DEFAULT_AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE);
+ return isAutoExpiredDeletionEnabled;
+ }
+
+ @Private
+ @VisibleForTesting
+ public void setAutoExpiredDeletionEnabled(String queuePath,
+ boolean autoRemovalEnable) {
+ setBoolean(getQueuePrefix(queuePath) +
+ AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE,
+ autoRemovalEnable);
+ }
+
+ @Private
+ @VisibleForTesting
+ public void setAutoExpiredDeletionTime(long time) {
+ setLong(AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME, time);
+ }
+
+ @Private
+ @VisibleForTesting
+ public long getAutoExpiredDeletionTime() {
+ return getLong(AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME,
+ DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME);
+ }
+
/**
* Time in milliseconds between invocations
* of QueueConfigurationAutoRefreshPolicy.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 104a89c..71e65cb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -614,6 +614,9 @@ public class LeafQueue extends AbstractCSQueue {
// Careful! Locking order is important!
validateSubmitApplication(applicationId, userName, queue);
+ // Signal for expired auto deletion.
+ updateLastSubmittedTimeStamp();
+
// Inform the parent queue
try {
getParent().submitApplication(applicationId, userName, queue);
@@ -2402,4 +2405,11 @@ public class LeafQueue extends AbstractCSQueue {
}
return appsToReturn;
}
+
+ @Override
+ public boolean isEligibleForAutoDeletion() {
+ return isDynamicQueue() && getNumApplications() == 0
+ && csContext.getConfiguration().
+ isAutoExpiredDeletionEnabled(this.getQueuePath());
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index ce5e490..3d28933 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -570,9 +570,10 @@ public class ParentQueue extends AbstractCSQueue {
CSQueue newQueue = createNewQueue(childQueuePath, isLeaf);
this.childQueues.add(newQueue);
+ updateLastSubmittedTimeStamp();
- // Call updateClusterResource
- // , which will deal with all effectiveMin/MaxResource
+ // Call updateClusterResource.
+ // Which will deal with all effectiveMin/MaxResource
// Calculation
this.updateClusterResource(csContext.getClusterResource(),
new ResourceLimits(this.csContext.getClusterResource()));
@@ -583,6 +584,28 @@ public class ParentQueue extends AbstractCSQueue {
}
}
+
+ // New method to remove child queue
+ public void removeChildQueue(CSQueue queue)
+ throws SchedulerDynamicEditException {
+ writeLock.lock();
+ try {
+ // Now we can do remove and update
+ this.childQueues.remove(queue);
+ this.scheduler.getCapacitySchedulerQueueManager()
+ .removeQueue(queue.getQueuePath());
+
+ // Call updateClusterResource,
+ // which will deal with all effectiveMin/MaxResource
+ // Calculation
+ this.updateClusterResource(csContext.getClusterResource(),
+ new ResourceLimits(this.csContext.getClusterResource()));
+
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
/**
* Check whether this queue supports adding additional child queues
* dynamically.
@@ -1607,4 +1630,11 @@ public class ParentQueue extends AbstractCSQueue {
Map<String, Float> getEffectiveMinRatioPerResource() {
return effectiveMinRatioPerResource;
}
+
+ @Override
+ public boolean isEligibleForAutoDeletion() {
+ return isDynamicQueue() && getChildQueues().size() == 0 &&
+ csContext.getConfiguration().
+ isAutoExpiredDeletionEnabled(this.getQueuePath());
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AutoCreatedQueueDeletionEvent.java
similarity index 50%
copy from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
copy to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AutoCreatedQueueDeletionEvent.java
index 869bf0ed9..68b86dd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AutoCreatedQueueDeletionEvent.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,45 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
-public enum SchedulerEventType {
-
- // Source: Node
- NODE_ADDED,
- NODE_REMOVED,
- NODE_UPDATE,
- NODE_RESOURCE_UPDATE,
- NODE_LABELS_UPDATE,
- NODE_ATTRIBUTES_UPDATE,
-
- // Source: RMApp
- APP_ADDED,
- APP_REMOVED,
-
- // Source: RMAppAttempt
- APP_ATTEMPT_ADDED,
- APP_ATTEMPT_REMOVED,
-
- // Source: ContainerAllocationExpirer
- CONTAINER_EXPIRED,
-
- // Source: SchedulerAppAttempt::pullNewlyUpdatedContainer.
- RELEASE_CONTAINER,
-
- /* Source: SchedulingEditPolicy */
- KILL_RESERVED_CONTAINER,
-
- // Mark a container for preemption
- MARK_CONTAINER_FOR_PREEMPTION,
-
- // Mark a for-preemption container killable
- MARK_CONTAINER_FOR_KILLABLE,
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
- // Cancel a killable container
- MARK_CONTAINER_FOR_NONKILLABLE,
+public class AutoCreatedQueueDeletionEvent extends SchedulerEvent{
+ private CSQueue checkQueue;
+ public AutoCreatedQueueDeletionEvent(CSQueue checkQueue) {
+ super(SchedulerEventType.AUTO_QUEUE_DELETION);
+ this.checkQueue = checkQueue;
+ }
- //Queue Management Change
- MANAGE_QUEUE
+ public CSQueue getCheckQueue() {
+ return checkQueue;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
index 869bf0ed9..3b8a1de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
@@ -55,5 +55,8 @@ public enum SchedulerEventType {
MARK_CONTAINER_FOR_NONKILLABLE,
//Queue Management Change
- MANAGE_QUEUE
+ MANAGE_QUEUE,
+
+ // Auto created queue, auto deletion check
+ AUTO_QUEUE_DELETION
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java
index 84126c7..f04081e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Proportion
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedQueueDeletionPolicy;
import org.junit.Test;
import java.util.HashSet;
@@ -91,5 +92,47 @@ public class TestSchedulingMonitor {
YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS);
cs.reinitialize(conf, rm.getRMContext());
assertTrue(smm.isRSMEmpty());
+ rm.close();
+ }
+
+ @Test(timeout = 10000)
+ public void testRMUpdateAutoCreatedQueueDeletionPolicy() throws Exception {
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+ conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+ AutoCreatedQueueDeletionPolicy.class.getCanonicalName());
+ MockRM rm = new MockRM(conf);
+ rm.start();
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ SchedulingMonitorManager smm = cs.getSchedulingMonitorManager();
+
+ // runningSchedulingMonitors should not be empty when initialize RM
+ // scheduler monitor
+ cs.reinitialize(conf, rm.getRMContext());
+ assertFalse(smm.isRSMEmpty());
+
+ // make sure runningSchedulingPolicies contains all the configured policy
+ // in YARNConfiguration
+ String[] configuredPolicies = conf.getStrings(
+ YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES);
+ Set<String> configurePoliciesSet = new HashSet<>();
+ for (String s : configuredPolicies) {
+ configurePoliciesSet.add(s);
+ }
+ assertTrue(smm.isSameConfiguredPolicies(configurePoliciesSet));
+
+ // make sure the running monitor contains AutoCreatedQueueDeletionPolicy
+ assertTrue(configurePoliciesSet.
+ contains(AutoCreatedQueueDeletionPolicy.class.getCanonicalName()));
+
+ // disable RM scheduler monitor
+ conf.setBoolean(
+ YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS);
+ cs.reinitialize(conf, rm.getRMContext());
+ assertTrue(smm.isRSMEmpty());
+ rm.close();
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedQueueDeletionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedQueueDeletionPolicy.java
new file mode 100644
index 0000000..5359178
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedQueueDeletionPolicy.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAutoCreatedQueueDeletionPolicy
+ extends TestCapacitySchedulerNewQueueAutoCreation {
+ private CapacityScheduler cs;
+ private AutoCreatedQueueDeletionPolicy policy;
+
+ public void prepareForSchedule() throws Exception{
+ super.startScheduler();
+
+ policy = getPolicy();
+ cs = getCs();
+
+ policy.editSchedule();
+ // There are no queues should be scheduled
+ Assert.assertEquals(policy.getMarkedForDeletion().size(), 0);
+ Assert.assertEquals(policy.getSentForDeletion().size(), 0);
+
+ createQueue("root.e.e1");
+ }
+
+ @Test
+ public void testEditSchedule() throws Exception {
+ prepareForSchedule();
+ // Make sure e not null
+ AbstractCSQueue e = (AbstractCSQueue) cs.
+ getQueue("root.e");
+ Assert.assertNotNull(e);
+ Assert.assertTrue(e.isDynamicQueue());
+
+ // Make sure e1 not null
+ AbstractCSQueue e1 = (AbstractCSQueue)cs.
+ getQueue("root.e.e1");
+ Assert.assertNotNull(e1);
+ Assert.assertTrue(e1.isDynamicQueue());
+ // signal it because of without submit created
+ e1.setLastSubmittedTimestamp(Time.monotonicNow());
+
+ ApplicationAttemptId user0AppAttemptId =
+ submitApp(cs, USER0, USER0, "root.e");
+
+ // Wait user0 created successfully.
+ GenericTestUtils.waitFor(()-> cs.getQueue(
+ "root.e.user_0") != null, 100,
+ 2000);
+ // Make sure user0 not null
+ AbstractCSQueue user0 = (AbstractCSQueue) cs
+ .getQueue("root.e.user_0");
+ Assert.assertNotNull(user0);
+ Assert.assertTrue(user0.isDynamicQueue());
+ // Make app finished
+ AppAttemptRemovedSchedulerEvent event =
+ new AppAttemptRemovedSchedulerEvent(user0AppAttemptId,
+ RMAppAttemptState.FINISHED, false);
+ cs.handle(event);
+ AppRemovedSchedulerEvent rEvent = new AppRemovedSchedulerEvent(
+ user0AppAttemptId.getApplicationId(), RMAppState.FINISHED);
+ cs.handle(rEvent);
+
+ // There are no apps in user0
+ Assert.assertEquals(user0.getNumApplications(), 0);
+
+ // Wait the time expired.
+ long l1 = user0.getLastSubmittedTimestamp();
+ GenericTestUtils.waitFor(() -> {
+ long duration = (Time.monotonicNow() - l1)/1000;
+ return duration > getCs().
+ getConfiguration().getAutoExpiredDeletionTime();
+ }, 100, 2000);
+
+ long l2 = e1.getLastSubmittedTimestamp();
+ GenericTestUtils.waitFor(() -> {
+ long duration = (Time.monotonicNow() - l2)/1000;
+ return duration > getCs().
+ getConfiguration().getAutoExpiredDeletionTime();
+ }, 100, 2000);
+
+ policy.editSchedule();
+ // Make sure user_0 , e1 queue
+ // will be scheduled to mark for deletion
+ // because it is expired for deletion.
+ Assert.assertEquals(policy.getMarkedForDeletion().size(), 2);
+ Assert.assertTrue(policy.
+ getMarkedForDeletion().contains("root.e.user_0"));
+ Assert.assertTrue(policy.
+ getMarkedForDeletion().contains("root.e.e1"));
+ // Make sure the send for deletion is empty for first mark.
+ Assert.assertEquals(policy.getSentForDeletion().size(), 0);
+
+ // Make sure user_0 , e1 queue will be scheduled to send for deletion
+ policy.prepareForAutoDeletion();
+ Assert.assertEquals(policy.getMarkedForDeletion().size(), 0);
+ Assert.assertEquals(policy.getSentForDeletion().size(), 2);
+
+ // Make sure e1, user0 not null before trigger remove.
+ e1 = (AbstractCSQueue) cs.getQueue("root.e.e1");
+ Assert.assertNotNull(e1);
+ user0 = (AbstractCSQueue)cs.getQueue("root.e.user_0");
+ Assert.assertNotNull(user0);
+
+ // Make sure e1, user0 will be null after trigger remove.
+ policy.triggerAutoDeletionForExpiredQueues();
+ Assert.assertEquals(policy.getMarkedForDeletion().size(), 0);
+ Assert.assertEquals(policy.getSentForDeletion().size(), 0);
+
+ // Wait e1, user0 auto deleted.
+ GenericTestUtils.waitFor(()-> cs.getQueue(
+ "root.e.e1") == null,
+ 100, 2000);
+ GenericTestUtils.waitFor(()-> cs.getQueue(
+ "root.e.user_0") == null,
+ 100, 2000);
+ e1 = (AbstractCSQueue) cs.getQueue("root.e.e1");
+ Assert.assertNull(e1);
+ user0 = (AbstractCSQueue)cs.getQueue("root.e.user_0");
+ Assert.assertNull(user0);
+
+ // Make sure e is not null, before schedule.
+ e = (AbstractCSQueue) cs.getQueue("root.e");
+ Assert.assertNotNull(e);
+
+ // Expired for e
+ // Wait e marked for deletion.
+ long l3 = e.getLastSubmittedTimestamp();
+ GenericTestUtils.waitFor(() -> {
+ long duration = (Time.monotonicNow() - l3)/1000;
+ return duration > getCs().
+ getConfiguration().getAutoExpiredDeletionTime();
+ }, 100, 2000);
+ policy.editSchedule();
+ e = (AbstractCSQueue) cs.getQueue("root.e");
+ Assert.assertNotNull(e);
+ Assert.assertEquals(policy.getMarkedForDeletion().size(), 1);
+ Assert.assertEquals(policy.getSentForDeletion().size(), 0);
+ Assert.assertTrue(policy.getMarkedForDeletion().contains("root.e"));
+
+ // Make sure e queue will be scheduled to send for deletion
+ policy.prepareForAutoDeletion();
+ Assert.assertEquals(policy.getMarkedForDeletion().size(), 0);
+ Assert.assertEquals(policy.getSentForDeletion().size(), 1);
+
+ // Make sure e not null before trigger remove.
+ e = (AbstractCSQueue) cs.getQueue("root.e");
+ Assert.assertNotNull(e);
+
+ // Make sure e will be null after trigger remove.
+ policy.triggerAutoDeletionForExpiredQueues();
+ // Wait e1 auto deleted.
+ GenericTestUtils.waitFor(()-> cs.getQueue(
+ "root.e") == null, 100, 2000);
+ Assert.assertEquals(policy.getMarkedForDeletion().size(), 0);
+ Assert.assertEquals(policy.getSentForDeletion().size(), 0);
+ e = (AbstractCSQueue) cs.getQueue("root.e");
+ Assert.assertNull(e);
+ }
+}
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java
index 2f83f1f..45c411f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -26,12 +28,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Before;
@@ -39,6 +44,9 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Set;
+import java.util.HashSet;
+
public class TestCapacitySchedulerNewQueueAutoCreation
extends TestCapacitySchedulerAutoCreatedQueueBase {
private static final Logger LOG = LoggerFactory.getLogger(
@@ -50,6 +58,16 @@ public class TestCapacitySchedulerNewQueueAutoCreation
private CapacityScheduler cs;
private CapacitySchedulerConfiguration csConf;
private CapacitySchedulerAutoQueueHandler autoQueueHandler;
+ private AutoCreatedQueueDeletionPolicy policy = new
+ AutoCreatedQueueDeletionPolicy();
+
+ public CapacityScheduler getCs() {
+ return cs;
+ }
+
+ public AutoCreatedQueueDeletionPolicy getPolicy() {
+ return policy;
+ }
/*
Create the following structure:
@@ -75,9 +93,12 @@ public class TestCapacitySchedulerNewQueueAutoCreation
csConf.setAutoQueueCreationV2Enabled("root", true);
csConf.setAutoQueueCreationV2Enabled("root.a", true);
csConf.setAutoQueueCreationV2Enabled("root.e", true);
+ csConf.setAutoQueueCreationV2Enabled(PARENT_QUEUE, true);
+ // Test for auto deletion when expired
+ csConf.setAutoExpiredDeletionTime(1);
}
- private void startScheduler() throws Exception {
+ protected void startScheduler() throws Exception {
RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(csConf);
mockRM = new MockRM(csConf) {
@@ -87,6 +108,8 @@ public class TestCapacitySchedulerNewQueueAutoCreation
};
cs = (CapacityScheduler) mockRM.getResourceScheduler();
cs.updatePlacementRules();
+ // Policy for new auto created queue's auto deletion when expired
+ policy.init(cs.getConfiguration(), cs.getRMContext(), cs);
mockRM.start();
cs.start();
autoQueueHandler = new CapacitySchedulerAutoQueueHandler(
@@ -506,7 +529,7 @@ public class TestCapacitySchedulerNewQueueAutoCreation
Assert.assertTrue(user0.isDynamicQueue());
Assert.assertTrue(user0 instanceof LeafQueue);
- LeafQueue user0LeafQueue = (LeafQueue)user0;
+ LeafQueue user0LeafQueue = (LeafQueue) user0;
// Assert user limit factor is -1
Assert.assertTrue(user0LeafQueue.getUserLimitFactor() == -1);
@@ -517,10 +540,11 @@ public class TestCapacitySchedulerNewQueueAutoCreation
// Assert AM Resource
Assert.assertEquals(user0LeafQueue.getAMResourceLimit().getMemorySize(),
- user0LeafQueue.getMaxAMResourcePerQueuePercent()*MAX_MEMORY*GB, 1e-6);
+ user0LeafQueue.
+ getMaxAMResourcePerQueuePercent() * MAX_MEMORY * GB, 1e-6);
// Assert user limit (no limit) when limit factor is -1
- Assert.assertEquals(MAX_MEMORY*GB,
+ Assert.assertEquals(MAX_MEMORY * GB,
user0LeafQueue.getEffectiveMaxCapacityDown("",
user0LeafQueue.getMinimumAllocation()).getMemorySize(), 1e-6);
}
@@ -585,7 +609,274 @@ public class TestCapacitySchedulerNewQueueAutoCreation
}
- private LeafQueue createQueue(String queuePath) throws YarnException {
+ @Test
+ public void testCapacitySchedulerAutoQueueDeletion() throws Exception {
+ startScheduler();
+ csConf.setBoolean(
+ YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+ csConf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+ AutoCreatedQueueDeletionPolicy.class.getCanonicalName());
+ csConf.setAutoExpiredDeletionTime(1);
+ cs.reinitialize(csConf, mockRM.getRMContext());
+
+ Set<String> policies = new HashSet<>();
+ policies.add(
+ AutoCreatedQueueDeletionPolicy.class.getCanonicalName());
+
+ Assert.assertTrue(
+ "No AutoCreatedQueueDeletionPolicy " +
+ "is present in running monitors",
+ cs.getSchedulingMonitorManager().
+ isSameConfiguredPolicies(policies));
+
+ ApplicationAttemptId a2App = submitApp(cs, USER0,
+ "a2-auto", "root.a.a1-auto");
+
+ // Wait a2 created successfully.
+ GenericTestUtils.waitFor(()-> cs.getQueue(
+ "root.a.a1-auto.a2-auto") != null,
+ 100, 2000);
+
+ AbstractCSQueue a1 = (AbstractCSQueue) cs.getQueue(
+ "root.a.a1-auto");
+ Assert.assertNotNull("a1 is not present", a1);
+ AbstractCSQueue a2 = (AbstractCSQueue) cs.getQueue(
+ "root.a.a1-auto.a2-auto");
+ Assert.assertNotNull("a2 is not present", a2);
+ Assert.assertTrue("a2 is not a dynamic queue",
+ a2.isDynamicQueue());
+
+ // Now there are still 1 app in a2 queue.
+ Assert.assertEquals(1, a2.getNumApplications());
+
+ // Wait the time expired.
+ long l1 = a2.getLastSubmittedTimestamp();
+ GenericTestUtils.waitFor(() -> {
+ long duration = (Time.monotonicNow() - l1)/1000;
+ return duration > csConf.getAutoExpiredDeletionTime();
+ }, 100, 2000);
+
+ // Make sure the queue will not be deleted
+ // when expired with remaining apps.
+ a2 = (AbstractCSQueue) cs.getQueue(
+ "root.a.a1-auto.a2-auto");
+ Assert.assertNotNull("a2 is not present", a2);
+
+ // Make app finished.
+ AppAttemptRemovedSchedulerEvent event =
+ new AppAttemptRemovedSchedulerEvent(a2App,
+ RMAppAttemptState.FINISHED, false);
+ cs.handle(event);
+ AppRemovedSchedulerEvent rEvent = new AppRemovedSchedulerEvent(
+ a2App.getApplicationId(), RMAppState.FINISHED);
+ cs.handle(rEvent);
+
+ // Now there are no apps in a2 queue.
+ Assert.assertEquals(0, a2.getNumApplications());
+
+ // Wait the a2 deleted.
+ GenericTestUtils.waitFor(() -> {
+ AbstractCSQueue a2Tmp = (AbstractCSQueue) cs.getQueue(
+ "root.a.a1-auto.a2-auto");
+ return a2Tmp == null;
+ }, 100, 3000);
+
+ a2 = (AbstractCSQueue) cs.getQueue(
+ "root.a.a1-auto.a2-auto");
+ Assert.assertNull("a2 is not deleted", a2);
+
+ // The parent will not be deleted with child queues
+ a1 = (AbstractCSQueue) cs.getQueue(
+ "root.a.a1-auto");
+ Assert.assertNotNull("a1 is not present", a1);
+
+ // Now the parent queue without child
+ // will be deleted for expired.
+ // Wait a1 deleted.
+ GenericTestUtils.waitFor(() -> {
+ AbstractCSQueue a1Tmp = (AbstractCSQueue) cs.getQueue(
+ "root.a.a1-auto");
+ return a1Tmp == null;
+ }, 100, 3000);
+ a1 = (AbstractCSQueue) cs.getQueue(
+ "root.a.a1-auto");
+ Assert.assertNull("a1 is not deleted", a1);
+ }
+
+ @Test
+ public void testCapacitySchedulerAutoQueueDeletionDisabled()
+ throws Exception {
+ startScheduler();
+ // Test for disabled auto deletion
+ csConf.setAutoExpiredDeletionEnabled(
+ "root.a.a1-auto.a2-auto", false);
+ csConf.setBoolean(
+ YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+ csConf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+ AutoCreatedQueueDeletionPolicy.class.getCanonicalName());
+ csConf.setAutoExpiredDeletionTime(1);
+ cs.reinitialize(csConf, mockRM.getRMContext());
+
+ Set<String> policies = new HashSet<>();
+ policies.add(
+ AutoCreatedQueueDeletionPolicy.class.getCanonicalName());
+
+ Assert.assertTrue(
+ "No AutoCreatedQueueDeletionPolicy " +
+ "is present in running monitors",
+ cs.getSchedulingMonitorManager().
+ isSameConfiguredPolicies(policies));
+
+ ApplicationAttemptId a2App = submitApp(cs, USER0,
+ "a2-auto", "root.a.a1-auto");
+
+ // Wait a2 created successfully.
+ GenericTestUtils.waitFor(()-> cs.getQueue(
+ "root.a.a1-auto.a2-auto") != null,
+ 100, 2000);
+
+ AbstractCSQueue a1 = (AbstractCSQueue) cs.getQueue(
+ "root.a.a1-auto");
+ Assert.assertNotNull("a1 is not present", a1);
+ AbstractCSQueue a2 = (AbstractCSQueue) cs.getQueue(
+ "root.a.a1-auto.a2-auto");
+ Assert.assertNotNull("a2 is not present", a2);
+ Assert.assertTrue("a2 is not a dynamic queue",
+ a2.isDynamicQueue());
+
+ // Make app finished.
+ AppAttemptRemovedSchedulerEvent event =
+ new AppAttemptRemovedSchedulerEvent(a2App,
+ RMAppAttemptState.FINISHED, false);
+ cs.handle(event);
+ AppRemovedSchedulerEvent rEvent = new AppRemovedSchedulerEvent(
+ a2App.getApplicationId(), RMAppState.FINISHED);
+ cs.handle(rEvent);
+
+ // Now there are no apps in a2 queue.
+ Assert.assertEquals(0, a2.getNumApplications());
+
+ // Wait the time expired.
+ long l1 = a2.getLastSubmittedTimestamp();
+ GenericTestUtils.waitFor(() -> {
+ long duration = (Time.monotonicNow() - l1)/1000;
+ return duration > csConf.getAutoExpiredDeletionTime();
+ }, 100, 2000);
+
+ // The auto deletion is no enabled for a2-auto
+ a1 = (AbstractCSQueue) cs.getQueue(
+ "root.a.a1-auto");
+ Assert.assertNotNull("a1 is not present", a1);
+ a2 = (AbstractCSQueue) cs.getQueue(
+ "root.a.a1-auto.a2-auto");
+ Assert.assertNotNull("a2 is not present", a2);
+ Assert.assertTrue("a2 is not a dynamic queue",
+ a2.isDynamicQueue());
+
+ // Enabled now
+ // The auto deletion will work.
+ csConf.setAutoExpiredDeletionEnabled(
+ "root.a.a1-auto.a2-auto", true);
+ cs.reinitialize(csConf, mockRM.getRMContext());
+
+ // Wait the a2 deleted.
+ GenericTestUtils.waitFor(() -> {
+ AbstractCSQueue a2Tmp = (AbstractCSQueue) cs.getQueue(
+ "root.a.a1-auto.a2-auto");
+ return a2Tmp == null;
+ }, 100, 3000);
+
+ a2 = (AbstractCSQueue) cs.
+ getQueue("root.a.a1-auto.a2-auto");
+ Assert.assertNull("a2 is not deleted", a2);
+ // The parent will not be deleted with child queues
+ a1 = (AbstractCSQueue) cs.getQueue(
+ "root.a.a1-auto");
+ Assert.assertNotNull("a1 is not present", a1);
+
+ // Now the parent queue without child
+ // will be deleted for expired.
+ // Wait a1 deleted.
+ GenericTestUtils.waitFor(() -> {
+ AbstractCSQueue a1Tmp = (AbstractCSQueue) cs.getQueue(
+ "root.a.a1-auto");
+ return a1Tmp == null;
+ }, 100, 3000);
+ a1 = (AbstractCSQueue) cs.getQueue(
+ "root.a.a1-auto");
+ Assert.assertNull("a1 is not deleted", a1);
+ }
+
+ @Test
+ public void testAutoCreateQueueAfterRemoval() throws Exception {
+ // queue's weights are 1
+ // root
+ // - a (w=1)
+ // - b (w=1)
+ // - c-auto (w=1)
+ // - d-auto (w=1)
+ // - e-auto (w=1)
+ // - e1-auto (w=1)
+ startScheduler();
+
+ createBasicQueueStructureAndValidate();
+
+ // Under e, there's only one queue, so e1/e have same capacity
+ CSQueue e1 = cs.getQueue("root.e-auto.e1-auto");
+ Assert.assertEquals(1 / 5f, e1.getAbsoluteCapacity(), 1e-6);
+ Assert.assertEquals(1f, e1.getQueueCapacities().getWeight(), 1e-6);
+ Assert.assertEquals(240 * GB,
+ e1.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
+
+ // Check after removal e1.
+ cs.removeQueue(e1);
+ CSQueue e = cs.getQueue("root.e-auto");
+ Assert.assertEquals(1 / 5f, e.getAbsoluteCapacity(), 1e-6);
+ Assert.assertEquals(1f, e.getQueueCapacities().getWeight(), 1e-6);
+ Assert.assertEquals(240 * GB,
+ e.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
+
+ // Check after removal e.
+ cs.removeQueue(e);
+ CSQueue d = cs.getQueue("root.d-auto");
+ Assert.assertEquals(1 / 4f, d.getAbsoluteCapacity(), 1e-6);
+ Assert.assertEquals(1f, d.getQueueCapacities().getWeight(), 1e-6);
+ Assert.assertEquals(300 * GB,
+ d.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
+
+ // Check after removal d.
+ cs.removeQueue(d);
+ CSQueue c = cs.getQueue("root.c-auto");
+ Assert.assertEquals(1 / 3f, c.getAbsoluteCapacity(), 1e-6);
+ Assert.assertEquals(1f, c.getQueueCapacities().getWeight(), 1e-6);
+ Assert.assertEquals(400 * GB,
+ c.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
+
+ // Check after removal c.
+ cs.removeQueue(c);
+ CSQueue b = cs.getQueue("root.b");
+ Assert.assertEquals(1 / 2f, b.getAbsoluteCapacity(), 1e-6);
+ Assert.assertEquals(1f, b.getQueueCapacities().getWeight(), 1e-6);
+ Assert.assertEquals(600 * GB,
+ b.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
+
+ // Check can't remove static queue b.
+ try {
+ cs.removeQueue(b);
+ Assert.fail("Can't remove static queue b!");
+ } catch (Exception ex) {
+ Assert.assertTrue(ex
+ instanceof SchedulerDynamicEditException);
+ }
+ // Check a.
+ CSQueue a = cs.getQueue("root.a");
+ Assert.assertEquals(1 / 2f, a.getAbsoluteCapacity(), 1e-6);
+ Assert.assertEquals(1f, a.getQueueCapacities().getWeight(), 1e-6);
+ Assert.assertEquals(600 * GB,
+ b.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
+ }
+
+ protected LeafQueue createQueue(String queuePath) throws YarnException {
return autoQueueHandler.autoCreateQueue(
CSQueueUtils.extractQueuePath(queuePath));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org