You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/05/30 21:11:58 UTC
[08/50] [abbrv] hadoop git commit: YARN-8191. Fair scheduler: queue
deletion without RM restart. (Gergo Repas via Haibo Chen)
YARN-8191. Fair scheduler: queue deletion without RM restart. (Gergo Repas via Haibo Chen)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/86bc6425
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/86bc6425
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/86bc6425
Branch: refs/heads/HDDS-48
Commit: 86bc6425d425913899f1d951498bd040e453b3d0
Parents: d9852eb
Author: Haibo Chen <ha...@apache.org>
Authored: Thu May 24 17:07:21 2018 -0700
Committer: Haibo Chen <ha...@apache.org>
Committed: Thu May 24 17:12:34 2018 -0700
----------------------------------------------------------------------
.../fair/AllocationFileLoaderService.java | 16 +-
.../scheduler/fair/FSLeafQueue.java | 31 ++
.../resourcemanager/scheduler/fair/FSQueue.java | 9 +
.../scheduler/fair/FairScheduler.java | 29 +-
.../scheduler/fair/QueueManager.java | 155 +++++++--
.../fair/TestAllocationFileLoaderService.java | 100 +++---
.../scheduler/fair/TestQueueManager.java | 337 +++++++++++++++++++
7 files changed, 596 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86bc6425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
index d8d9051..7a40b6a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
@@ -87,7 +87,7 @@ public class AllocationFileLoaderService extends AbstractService {
private Path allocFile;
private FileSystem fs;
- private Listener reloadListener;
+ private final Listener reloadListener;
@VisibleForTesting
long reloadIntervalMs = ALLOC_RELOAD_INTERVAL_MS;
@@ -95,15 +95,16 @@ public class AllocationFileLoaderService extends AbstractService {
private Thread reloadThread;
private volatile boolean running = true;
- public AllocationFileLoaderService() {
- this(SystemClock.getInstance());
+ public AllocationFileLoaderService(Listener reloadListener) {
+ this(reloadListener, SystemClock.getInstance());
}
private List<Permission> defaultPermissions;
- public AllocationFileLoaderService(Clock clock) {
+ public AllocationFileLoaderService(Listener reloadListener, Clock clock) {
super(AllocationFileLoaderService.class.getName());
this.clock = clock;
+ this.reloadListener = reloadListener;
}
@Override
@@ -114,6 +115,7 @@ public class AllocationFileLoaderService extends AbstractService {
reloadThread = new Thread(() -> {
while (running) {
try {
+ reloadListener.onCheck();
long time = clock.getTime();
long lastModified =
fs.getFileStatus(allocFile).getModificationTime();
@@ -207,10 +209,6 @@ public class AllocationFileLoaderService extends AbstractService {
return allocPath;
}
- public synchronized void setReloadListener(Listener reloadListener) {
- this.reloadListener = reloadListener;
- }
-
/**
* Updates the allocation list from the allocation config file. This file is
* expected to be in the XML format specified in the design doc.
@@ -351,5 +349,7 @@ public class AllocationFileLoaderService extends AbstractService {
public interface Listener {
void onReload(AllocationConfiguration info) throws IOException;
+
+ void onCheck();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86bc6425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index 49d2166..e7da16f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -21,7 +21,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -34,6 +36,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -56,6 +59,8 @@ public class FSLeafQueue extends FSQueue {
// apps that are runnable
private final List<FSAppAttempt> runnableApps = new ArrayList<>();
private final List<FSAppAttempt> nonRunnableApps = new ArrayList<>();
+ // assignedApps keeps track of applications that have no appAttempts
+ private final Set<ApplicationId> assignedApps = new HashSet<>();
// get a lock with fair distribution for app list updates
private final ReadWriteLock rwl = new ReentrantReadWriteLock(true);
private final Lock readLock = rwl.readLock();
@@ -89,6 +94,9 @@ public class FSLeafQueue extends FSQueue {
} else {
nonRunnableApps.add(app);
}
+ // when an appAttempt is created for an application, we'd like to move
+ // it over from assignedApps to either runnableApps or nonRunnableApps
+ assignedApps.remove(app.getApplicationId());
incUsedResource(app.getResourceUsage());
} finally {
writeLock.unlock();
@@ -440,6 +448,15 @@ public class FSLeafQueue extends FSQueue {
return numPendingApps;
}
+ public int getNumAssignedApps() {
+ readLock.lock();
+ try {
+ return assignedApps.size();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
/**
* TODO: Based on how frequently this is called, we might want to club
* counting pending and active apps in the same method.
@@ -609,4 +626,18 @@ public class FSLeafQueue extends FSQueue {
", LastTimeAtMinShare: " + lastTimeAtMinShare +
"}");
}
+
+ /**
+ * This method is called when an application is assigned to this queue
+ * for book-keeping purposes (to be able to determine if the queue is empty).
+ * @param applicationId the application's id
+ */
+ public void addAssignedApp(ApplicationId applicationId) {
+ writeLock.lock();
+ try {
+ assignedApps.add(applicationId);
+ } finally {
+ writeLock.unlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86bc6425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
index 4babfd5..6b88a32 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
@@ -83,6 +83,7 @@ public abstract class FSQueue implements Queue, Schedulable {
private long minSharePreemptionTimeout = Long.MAX_VALUE;
private float fairSharePreemptionThreshold = 0.5f;
private boolean preemptable = true;
+ private boolean isDynamic = true;
public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
this.name = name;
@@ -585,4 +586,12 @@ public abstract class FSQueue implements Queue, Schedulable {
* @param sb the {code StringBuilder} which holds queue states
*/
protected abstract void dumpStateInternal(StringBuilder sb);
+
+ public boolean isDynamic() {
+ return isDynamic;
+ }
+
+ public void setDynamic(boolean dynamic) {
+ this.isDynamic = dynamic;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86bc6425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 1c4bd51..4c84aa9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -99,6 +99,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
@@ -207,7 +208,8 @@ public class FairScheduler extends
public FairScheduler() {
super(FairScheduler.class.getName());
context = new FSContext(this);
- allocsLoader = new AllocationFileLoaderService();
+ allocsLoader =
+ new AllocationFileLoaderService(new AllocationReloadListener());
queueMgr = new QueueManager(this);
maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
}
@@ -516,6 +518,7 @@ public class FairScheduler extends
new SchedulerApplication<FSAppAttempt>(queue, user);
applications.put(applicationId, application);
queue.getMetrics().submitApp(user);
+ queue.addAssignedApp(applicationId);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queue.getName()
@@ -1435,7 +1438,6 @@ public class FairScheduler extends
}
allocsLoader.init(conf);
- allocsLoader.setReloadListener(new AllocationReloadListener());
// If we fail to load allocations file on initialize, we want to fail
// immediately. After a successful load, exceptions on future reloads
// will just result in leaving things as they are.
@@ -1589,6 +1591,7 @@ public class FairScheduler extends
// Commit the reload; also create any queue defined in the alloc file
// if it does not already exist, so it can be displayed on the web UI.
+ Set<String> removedStaticQueues = getRemovedStaticQueues(queueInfo);
writeLock.lock();
try {
if (queueInfo == null) {
@@ -1599,6 +1602,7 @@ public class FairScheduler extends
setQueueAcls(allocConf.getQueueAcls());
allocConf.getDefaultSchedulingPolicy().initialize(getContext());
queueMgr.updateAllocationConfiguration(allocConf);
+ queueMgr.setQueuesToDynamic(removedStaticQueues);
applyChildDefaults();
maxRunningEnforcer.updateRunnabilityOnReload();
}
@@ -1606,6 +1610,27 @@ public class FairScheduler extends
writeLock.unlock();
}
}
+
+ private Set<String> getRemovedStaticQueues(
+ AllocationConfiguration queueInfo) {
+ if (queueInfo == null || allocConf == null) {
+ return Collections.emptySet();
+ }
+ Set<String> removedStaticQueues = new HashSet<>();
+ for (Set<String> queues : allocConf.getConfiguredQueues().values()) {
+ removedStaticQueues.addAll(queues);
+ }
+ for (Set<String> queues : queueInfo.getConfiguredQueues().values()) {
+ removedStaticQueues.removeAll(queues);
+ }
+ return removedStaticQueues;
+ }
+
+ @Override
+ public void onCheck() {
+ queueMgr.removeEmptyDynamicQueues();
+ queueMgr.removePendingIncompatibleQueues();
+ }
}
private void setQueueAcls(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86bc6425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
index 8734877..632a842 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
@@ -22,13 +22,17 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.xml.parsers.ParserConfigurationException;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -52,6 +56,36 @@ public class QueueManager {
public static final Log LOG = LogFactory.getLog(
QueueManager.class.getName());
+ private final class IncompatibleQueueRemovalTask {
+
+ private final String queueToCreate;
+ private final FSQueueType queueType;
+
+ private IncompatibleQueueRemovalTask(String queueToCreate,
+ FSQueueType queueType) {
+ this.queueToCreate = queueToCreate;
+ this.queueType = queueType;
+ }
+
+ private void execute() {
+ Boolean removed =
+ removeEmptyIncompatibleQueues(queueToCreate, queueType).orElse(null);
+ if (Boolean.TRUE.equals(removed)) {
+ FSQueue queue = getQueue(queueToCreate, true, queueType, false);
+ if (queue != null &&
+ // if queueToCreate is present in the allocation config, set it
+ // to static
+ scheduler.allocConf.configuredQueues.values().stream()
+ .anyMatch(s -> s.contains(queueToCreate))) {
+ queue.setDynamic(false);
+ }
+ }
+ if (!Boolean.FALSE.equals(removed)) {
+ incompatibleQueuesPendingRemoval.remove(this);
+ }
+ }
+ }
+
public static final String ROOT_QUEUE = "root";
private final FairScheduler scheduler;
@@ -59,6 +93,8 @@ public class QueueManager {
private final Collection<FSLeafQueue> leafQueues =
new CopyOnWriteArrayList<FSLeafQueue>();
private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
+ private Set<IncompatibleQueueRemovalTask> incompatibleQueuesPendingRemoval =
+ new HashSet<>();
private FSParentQueue rootQueue;
public QueueManager(FairScheduler scheduler) {
@@ -75,10 +111,13 @@ public class QueueManager {
// SchedulingPolicy.DEFAULT_POLICY since the allocation file hasn't been
// loaded yet.
rootQueue = new FSParentQueue("root", scheduler, null);
+ rootQueue.setDynamic(false);
queues.put(rootQueue.getName(), rootQueue);
// Create the default queue
- getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
+ FSLeafQueue defaultQueue =
+ getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
+ defaultQueue.setDynamic(false);
// Recursively reinitialize to propagate queue properties
rootQueue.reinit(true);
}
@@ -121,7 +160,8 @@ public class QueueManager {
*/
public boolean removeLeafQueue(String name) {
name = ensureRootPrefix(name);
- return removeEmptyIncompatibleQueues(name, FSQueueType.PARENT);
+ return !Boolean.FALSE.equals(
+ removeEmptyIncompatibleQueues(name, FSQueueType.PARENT).orElse(null));
}
@@ -346,9 +386,13 @@ public class QueueManager {
*
* We will never remove the root queue or the default queue in this way.
*
- * @return true if we can create queueToCreate or it already exists.
+ * @return Optional.of(Boolean.TRUE) if there was an incompatible queue that
+ * has been removed,
+ * Optional.of(Boolean.FALSE) if there was an incompatible queue that
+ * have not be removed,
+ * Optional.empty() if there is no incompatible queue.
*/
- private boolean removeEmptyIncompatibleQueues(String queueToCreate,
+ private Optional<Boolean> removeEmptyIncompatibleQueues(String queueToCreate,
FSQueueType queueType) {
queueToCreate = ensureRootPrefix(queueToCreate);
@@ -357,7 +401,7 @@ public class QueueManager {
if (queueToCreate.equals(ROOT_QUEUE) ||
queueToCreate.startsWith(
ROOT_QUEUE + "." + YarnConfiguration.DEFAULT_QUEUE_NAME + ".")) {
- return false;
+ return Optional.empty();
}
FSQueue queue = queues.get(queueToCreate);
@@ -365,19 +409,18 @@ public class QueueManager {
if (queue != null) {
if (queue instanceof FSLeafQueue) {
if (queueType == FSQueueType.LEAF) {
- // if queue is already a leaf then return true
- return true;
+ return Optional.empty();
}
// remove incompatibility since queue is a leaf currently
// needs to change to a parent.
- return removeQueueIfEmpty(queue);
+ return Optional.of(removeQueueIfEmpty(queue));
} else {
if (queueType == FSQueueType.PARENT) {
- return true;
+ return Optional.empty();
}
// If it's an existing parent queue and needs to change to leaf,
// remove it if it's empty.
- return removeQueueIfEmpty(queue);
+ return Optional.of(removeQueueIfEmpty(queue));
}
}
@@ -389,11 +432,51 @@ public class QueueManager {
String prefixString = queueToCreate.substring(0, sepIndex);
FSQueue prefixQueue = queues.get(prefixString);
if (prefixQueue != null && prefixQueue instanceof FSLeafQueue) {
- return removeQueueIfEmpty(prefixQueue);
+ return Optional.of(removeQueueIfEmpty(prefixQueue));
}
sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1);
}
- return true;
+ return Optional.empty();
+ }
+
+ /**
+ * Removes all empty dynamic queues (including empty dynamic parent queues).
+ */
+ public void removeEmptyDynamicQueues() {
+ synchronized (queues) {
+ Set<FSParentQueue> parentQueuesToCheck = new HashSet<>();
+ for (FSQueue queue : getQueues()) {
+ if (queue.isDynamic() && queue.getChildQueues().isEmpty()) {
+ boolean removed = removeQueueIfEmpty(queue);
+ if (removed && queue.getParent().isDynamic()) {
+ parentQueuesToCheck.add(queue.getParent());
+ }
+ }
+ }
+ while (!parentQueuesToCheck.isEmpty()) {
+ FSParentQueue queue = parentQueuesToCheck.iterator().next();
+ if (queue.getChildQueues().isEmpty()) {
+ removeQueue(queue);
+ if (queue.getParent().isDynamic()) {
+ parentQueuesToCheck.add(queue.getParent());
+ }
+ }
+ parentQueuesToCheck.remove(queue);
+ }
+ }
+ }
+
+ /**
+ * Re-checking incompatible queues that could not be removed earlier due to
+ * not being empty, and removing those that became empty.
+ */
+ public void removePendingIncompatibleQueues() {
+ synchronized (queues) {
+ for (IncompatibleQueueRemovalTask removalTask :
+ ImmutableSet.copyOf(incompatibleQueuesPendingRemoval)) {
+ removalTask.execute();
+ }
+ }
}
/**
@@ -435,7 +518,8 @@ public class QueueManager {
if (queue instanceof FSLeafQueue) {
FSLeafQueue leafQueue = (FSLeafQueue)queue;
return queue.getNumRunnableApps() == 0 &&
- leafQueue.getNumNonRunnableApps() == 0;
+ leafQueue.getNumNonRunnableApps() == 0 &&
+ leafQueue.getNumAssignedApps() == 0;
} else {
for (FSQueue child : queue.getChildQueues()) {
if (!isEmpty(child)) {
@@ -501,21 +585,13 @@ public class QueueManager {
LOG.error("Setting scheduling policies for existing queues failed!");
}
- for (String name : queueConf.getConfiguredQueues().get(
- FSQueueType.LEAF)) {
- if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) {
- getLeafQueue(name, true, false);
- }
- }
+ ensureQueueExistsAndIsCompatibleAndIsStatic(queueConf, FSQueueType.LEAF);
+
// At this point all leaves and 'parents with
// at least one child' would have been created.
// Now create parents with no configured leaf.
- for (String name : queueConf.getConfiguredQueues().get(
- FSQueueType.PARENT)) {
- if (removeEmptyIncompatibleQueues(name, FSQueueType.PARENT)) {
- getParentQueue(name, true, false);
- }
- }
+ ensureQueueExistsAndIsCompatibleAndIsStatic(queueConf,
+ FSQueueType.PARENT);
}
// Initialize all queues recursively
@@ -524,6 +600,35 @@ public class QueueManager {
rootQueue.recomputeSteadyShares();
}
+ private void ensureQueueExistsAndIsCompatibleAndIsStatic(
+ AllocationConfiguration queueConf, FSQueueType queueType) {
+ for (String name : queueConf.getConfiguredQueues().get(queueType)) {
+ Boolean removed =
+ removeEmptyIncompatibleQueues(name, queueType).orElse(null);
+ if (Boolean.FALSE.equals(removed)) {
+ incompatibleQueuesPendingRemoval.add(
+ new IncompatibleQueueRemovalTask(name, queueType));
+ } else {
+ FSQueue queue = getQueue(name, true, queueType, false);
+ if (queue != null) {
+ queue.setDynamic(false);
+ }
+ }
+ }
+ }
+
+ /**
+ * Setting a set of queues to dynamic.
+ * @param queueNames The names of the queues to be set to dynamic
+ */
+ protected void setQueuesToDynamic(Set<String> queueNames) {
+ synchronized (queues) {
+ for (String queueName : queueNames) {
+ queues.get(queueName).setDynamic(true);
+ }
+ }
+ }
+
/**
* Check whether queue name is valid,
* return true if it is valid, otherwise return false.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86bc6425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
index 8591d67..30b8a91 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService.Listener;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
@@ -32,6 +33,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.Fai
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Test;
+import org.mockito.Mockito;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
@@ -79,7 +82,8 @@ public class TestAllocationFileLoaderService {
fs.copyFromLocalFile(new Path(fschedURL.toURI()), new Path(fsAllocPath));
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocPath);
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(Mockito.mock(Listener.class));
Path allocationFile = allocLoader.getAllocationFile(conf);
assertEquals(fsAllocPath, allocationFile.toString());
assertTrue(fs.exists(allocationFile));
@@ -92,7 +96,8 @@ public class TestAllocationFileLoaderService {
throws UnsupportedFileSystemException {
Configuration conf = new YarnConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, "badfs:///badfile");
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(Mockito.mock(Listener.class));
allocLoader.getAllocationFile(conf);
}
@@ -105,7 +110,7 @@ public class TestAllocationFileLoaderService {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
TEST_FAIRSCHED_XML);
AllocationFileLoaderService allocLoader =
- new AllocationFileLoaderService();
+ new AllocationFileLoaderService(Mockito.mock(Listener.class));
Path allocationFile = allocLoader.getAllocationFile(conf);
assertEquals(TEST_FAIRSCHED_XML, allocationFile.getName());
assertTrue(fs.exists(allocationFile));
@@ -134,12 +139,11 @@ public class TestAllocationFileLoaderService {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(
- clock);
+ ReloadListener confHolder = new ReloadListener();
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(confHolder, clock);
allocLoader.reloadIntervalMs = 5;
allocLoader.init(conf);
- ReloadListener confHolder = new ReloadListener();
- allocLoader.setReloadListener(confHolder);
allocLoader.reloadAllocations();
AllocationConfiguration allocConf = confHolder.allocConf;
@@ -205,7 +209,9 @@ public class TestAllocationFileLoaderService {
public void testAllocationFileParsing() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+ ReloadListener confHolder = new ReloadListener();
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(confHolder);
AllocationFileWriter
.create()
@@ -278,8 +284,6 @@ public class TestAllocationFileLoaderService {
.writeToFile(ALLOC_FILE);
allocLoader.init(conf);
- ReloadListener confHolder = new ReloadListener();
- allocLoader.setReloadListener(confHolder);
allocLoader.reloadAllocations();
AllocationConfiguration queueConf = confHolder.allocConf;
@@ -427,7 +431,9 @@ public class TestAllocationFileLoaderService {
public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+ ReloadListener confHolder = new ReloadListener();
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(confHolder);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
@@ -473,8 +479,6 @@ public class TestAllocationFileLoaderService {
out.close();
allocLoader.init(conf);
- ReloadListener confHolder = new ReloadListener();
- allocLoader.setReloadListener(confHolder);
allocLoader.reloadAllocations();
AllocationConfiguration queueConf = confHolder.allocConf;
@@ -550,10 +554,10 @@ public class TestAllocationFileLoaderService {
out.println("</allocations>");
out.close();
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
- allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
- allocLoader.setReloadListener(confHolder);
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(confHolder);
+ allocLoader.init(conf);
allocLoader.reloadAllocations();
AllocationConfiguration allocConf = confHolder.allocConf;
@@ -584,10 +588,10 @@ public class TestAllocationFileLoaderService {
out.println("</allocations>");
out.close();
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
- allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
- allocLoader.setReloadListener(confHolder);
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(confHolder);
+ allocLoader.init(conf);
allocLoader.reloadAllocations();
}
@@ -608,10 +612,10 @@ public class TestAllocationFileLoaderService {
out.println("</allocations>");
out.close();
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
- allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
- allocLoader.setReloadListener(confHolder);
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(confHolder);
+ allocLoader.init(conf);
allocLoader.reloadAllocations();
}
@@ -632,10 +636,10 @@ public class TestAllocationFileLoaderService {
out.println("</allocations>");
out.close();
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
- allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
- allocLoader.setReloadListener(confHolder);
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(confHolder);
+ allocLoader.init(conf);
allocLoader.reloadAllocations();
}
@@ -654,10 +658,10 @@ public class TestAllocationFileLoaderService {
out.println("</allocations>");
out.close();
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
- allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
- allocLoader.setReloadListener(confHolder);
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(confHolder);
+ allocLoader.init(conf);
try {
allocLoader.reloadAllocations();
} catch (AllocationConfigurationException ex) {
@@ -685,10 +689,10 @@ public class TestAllocationFileLoaderService {
out.println("</allocations>");
out.close();
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
- allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
- allocLoader.setReloadListener(confHolder);
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(confHolder);
+ allocLoader.init(conf);
try {
allocLoader.reloadAllocations();
} catch (AllocationConfigurationException ex) {
@@ -714,10 +718,10 @@ public class TestAllocationFileLoaderService {
out.println("</allocations>");
out.close();
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
- allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
- allocLoader.setReloadListener(confHolder);
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(confHolder);
+ allocLoader.init(conf);
allocLoader.reloadAllocations();
AllocationConfiguration queueConf = confHolder.allocConf;
// Check whether queue 'parent' and 'child' are loaded successfully
@@ -745,10 +749,10 @@ public class TestAllocationFileLoaderService {
out.println("</allocations>");
out.close();
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
- allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
- allocLoader.setReloadListener(confHolder);
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(confHolder);
+ allocLoader.init(conf);
allocLoader.reloadAllocations();
}
@@ -767,10 +771,10 @@ public class TestAllocationFileLoaderService {
out.println("</allocations>");
out.close();
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
- allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
- allocLoader.setReloadListener(confHolder);
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(confHolder);
+ allocLoader.init(conf);
allocLoader.reloadAllocations();
}
@@ -793,10 +797,10 @@ public class TestAllocationFileLoaderService {
out.println("</allocations>");
out.close();
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
- allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
- allocLoader.setReloadListener(confHolder);
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(confHolder);
+ allocLoader.init(conf);
allocLoader.reloadAllocations();
AllocationConfiguration allocConf = confHolder.allocConf;
@@ -853,10 +857,10 @@ public class TestAllocationFileLoaderService {
out.println("</allocations>");
out.close();
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
- allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
- allocLoader.setReloadListener(confHolder);
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(confHolder);
+ allocLoader.init(conf);
allocLoader.reloadAllocations();
}
@@ -867,5 +871,9 @@ public class TestAllocationFileLoaderService {
public void onReload(AllocationConfiguration info) {
allocConf = info;
}
+
+ @Override
+ public void onCheck() {
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86bc6425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java
index eb2d402..3674ffb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java
@@ -20,15 +20,22 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
public class TestQueueManager {
@@ -305,4 +312,334 @@ public class TestQueueManager {
assertEquals("createQueue() returned wrong queue",
"root.queue1.queue2", q2.getName());
}
+
+ @Test
+ public void testRemovalOfDynamicLeafQueue() {
+ AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
+
+ queueManager.updateAllocationConfiguration(allocConf);
+
+ FSQueue q1 = queueManager.getLeafQueue("root.test.childB.dynamic1", true);
+
+ assertNotNull("Queue root.test.childB.dynamic1 was not created", q1);
+ assertEquals("createQueue() returned wrong queue",
+ "root.test.childB.dynamic1", q1.getName());
+ assertTrue("root.test.childB.dynamic1 is not a dynamic queue",
+ q1.isDynamic());
+
+ // an application is submitted to root.test.childB.dynamic1
+ notEmptyQueues.add(q1);
+
+ // root.test.childB.dynamic1 is not empty and should not be removed
+ queueManager.removePendingIncompatibleQueues();
+ queueManager.removeEmptyDynamicQueues();
+ q1 = queueManager.getLeafQueue("root.test.childB.dynamic1", false);
+ assertNotNull("Queue root.test.childB.dynamic1 was deleted", q1);
+
+ // the application finishes, the next removeEmptyDynamicQueues() should
+ // clean root.test.childB.dynamic1 up, but keep its static parent
+ notEmptyQueues.remove(q1);
+
+ queueManager.removePendingIncompatibleQueues();
+ queueManager.removeEmptyDynamicQueues();
+ q1 = queueManager.getLeafQueue("root.test.childB.dynamic1", false);
+ assertNull("Queue root.test.childB.dynamic1 was not deleted", q1);
+ assertNotNull("The static parent of root.test.childB.dynamic1 was deleted",
+ queueManager.getParentQueue("root.test.childB", false));
+ }
+
+ @Test
+ public void testRemovalOfDynamicParentQueue() {
+ AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
+
+ queueManager.updateAllocationConfiguration(allocConf);
+
+ FSQueue q1 = queueManager.getLeafQueue("root.parent1.dynamic1", true);
+
+ assertNotNull("Queue root.parent1.dynamic1 was not created", q1);
+ assertEquals("createQueue() returned wrong queue",
+ "root.parent1.dynamic1", q1.getName());
+ assertTrue("root.parent1.dynamic1 is not a dynamic queue", q1.isDynamic());
+
+ FSQueue p1 = queueManager.getParentQueue("root.parent1", false);
+ assertNotNull("Queue root.parent1 was not created", p1);
+ assertTrue("root.parent1 is not a dynamic queue", p1.isDynamic());
+
+ queueManager.removePendingIncompatibleQueues();
+ queueManager.removeEmptyDynamicQueues();
+ q1 = queueManager.getLeafQueue("root.parent1.dynamic1", false);
+ p1 = queueManager.getParentQueue("root.parent1", false);
+
+ assertNull("Queue root.parent1.dynamic1 was not deleted", q1);
+ assertNull("Queue root.parent1 was not deleted", p1);
+ }
+
+ @Test
+ public void testNonEmptyDynamicQueueBecomingStaticQueue() {
+ AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
+
+ queueManager.updateAllocationConfiguration(allocConf);
+
+ FSLeafQueue q1 = queueManager.getLeafQueue("root.leaf1", true);
+
+ assertNotNull("Queue root.leaf1 was not created", q1);
+ assertEquals("createQueue() returned wrong queue",
+ "root.leaf1", q1.getName());
+ assertTrue("root.leaf1 is not a dynamic queue", q1.isDynamic());
+
+ // pretend that we submitted an app to the queue
+ notEmptyQueues.add(q1);
+
+ // non-empty queues should not be deleted
+ queueManager.removePendingIncompatibleQueues();
+ queueManager.removeEmptyDynamicQueues();
+ q1 = queueManager.getLeafQueue("root.leaf1", false);
+ assertNotNull("Queue root.leaf1 was deleted", q1);
+
+ // next we add leaf1 under root in the allocation config
+ allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.leaf1");
+ queueManager.updateAllocationConfiguration(allocConf);
+
+ // updateAllocationConfiguration() should make root.leaf1 a dynamic queue
+ assertFalse("root.leaf1 is not a static queue", q1.isDynamic());
+
+ // application finished now and the queue is empty, but since leaf1 is a
+ // static queue at this point, hence not affected by
+ // removeEmptyDynamicQueues()
+ notEmptyQueues.clear();
+ queueManager.removePendingIncompatibleQueues();
+ queueManager.removeEmptyDynamicQueues();
+ q1 = queueManager.getLeafQueue("root.leaf1", false);
+ assertNotNull("Queue root.leaf1 was deleted", q1);
+ assertFalse("root.leaf1 is not a static queue", q1.isDynamic());
+ }
+
+ @Test
+ public void testNonEmptyStaticQueueBecomingDynamicQueue() {
+ AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
+ queueManager.updateAllocationConfiguration(allocConf);
+
+ FSLeafQueue q1 = queueManager.getLeafQueue("root.test.childA", false);
+
+ assertNotNull("Queue root.test.childA does not exist", q1);
+ assertEquals("createQueue() returned wrong queue",
+ "root.test.childA", q1.getName());
+ assertFalse("root.test.childA is not a static queue", q1.isDynamic());
+
+ // we submitted an app to the queue
+ notEmptyQueues.add(q1);
+
+ // the next removeEmptyDynamicQueues() call should not modify
+ // root.test.childA
+ queueManager.removePendingIncompatibleQueues();
+ queueManager.removeEmptyDynamicQueues();
+ q1 = queueManager.getLeafQueue("root.test.childA", false);
+ assertNotNull("Queue root.test.childA was deleted", q1);
+ assertFalse("root.test.childA is not a dynamic queue", q1.isDynamic());
+
+ // next we remove all queues from the allocation config,
+ // this causes all queues to change to dynamic
+ for (Set<String> queueNames : allocConf.configuredQueues.values()) {
+ queueManager.setQueuesToDynamic(queueNames);
+ queueNames.clear();
+ }
+ queueManager.updateAllocationConfiguration(allocConf);
+
+ q1 = queueManager.getLeafQueue("root.test.childA", false);
+ assertNotNull("Queue root.test.childA was deleted", q1);
+ assertTrue("root.test.childA is not a dynamic queue", q1.isDynamic());
+
+ // application finished - the queue does not have runnable app
+ // the next removeEmptyDynamicQueues() call should remove the queues
+ notEmptyQueues.remove(q1);
+
+ queueManager.removePendingIncompatibleQueues();
+ queueManager.removeEmptyDynamicQueues();
+
+ q1 = queueManager.getLeafQueue("root.test.childA", false);
+ assertNull("Queue root.test.childA was not deleted", q1);
+
+ FSParentQueue p1 = queueManager.getParentQueue("root.test", false);
+ assertNull("Queue root.test was not deleted", p1);
+ }
+
+ @Test
+ public void testRemovalOfChildlessParentQueue() {
+ AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
+ queueManager.updateAllocationConfiguration(allocConf);
+
+ FSParentQueue q1 = queueManager.getParentQueue("root.test.childB", false);
+
+ assertNotNull("Queue root.test.childB was not created", q1);
+ assertEquals("createQueue() returned wrong queue",
+ "root.test.childB", q1.getName());
+ assertFalse("root.test.childB is a dynamic queue", q1.isDynamic());
+
+ // static queues should not be deleted
+ queueManager.removePendingIncompatibleQueues();
+ queueManager.removeEmptyDynamicQueues();
+ q1 = queueManager.getParentQueue("root.test.childB", false);
+ assertNotNull("Queue root.test.childB was deleted", q1);
+
+ // next we remove root.test.childB from the allocation config
+ allocConf.configuredQueues.get(FSQueueType.PARENT)
+ .remove("root.test.childB");
+ queueManager.updateAllocationConfiguration(allocConf);
+ queueManager.setQueuesToDynamic(Collections.singleton("root.test.childB"));
+
+ // the next removeEmptyDynamicQueues() call should clean
+ // root.test.childB up
+ queueManager.removePendingIncompatibleQueues();
+ queueManager.removeEmptyDynamicQueues();
+ q1 = queueManager.getParentQueue("root.leaf1", false);
+ assertNull("Queue root.leaf1 was not deleted", q1);
+ }
+
+ @Test
+ public void testQueueTypeChange() {
+ AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
+ queueManager.updateAllocationConfiguration(allocConf);
+
+ FSQueue q1 = queueManager.getLeafQueue("root.parent1.leaf1", true);
+ assertNotNull("Queue root.parent1.leaf1 was not created", q1);
+ assertEquals("createQueue() returned wrong queue",
+ "root.parent1.leaf1", q1.getName());
+ assertTrue("root.parent1.leaf1 is not a dynamic queue", q1.isDynamic());
+
+ FSQueue p1 = queueManager.getParentQueue("root.parent1", false);
+ assertNotNull("Queue root.parent1 was not created", p1);
+ assertTrue("root.parent1 is not a dynamic queue", p1.isDynamic());
+
+ // adding root.parent1.leaf1 and root.parent1 to the allocation config
+ allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.parent1");
+ allocConf.configuredQueues.get(FSQueueType.LEAF)
+ .add("root.parent1.leaf1");
+
+ // updateAllocationConfiguration() should change both queues over to static
+ queueManager.updateAllocationConfiguration(allocConf);
+ q1 = queueManager.getLeafQueue("root.parent1.leaf1", false);
+ assertFalse("root.parent1.leaf1 is not a static queue", q1.isDynamic());
+ p1 = queueManager.getParentQueue("root.parent1", false);
+ assertFalse("root.parent1 is not a static queue", p1.isDynamic());
+
+ // removing root.parent1.leaf1 and root.parent1 from the allocation
+ // config
+ allocConf.configuredQueues.get(FSQueueType.PARENT).remove("root.parent1");
+ allocConf.configuredQueues.get(FSQueueType.LEAF)
+ .remove("root.parent1.leaf1");
+
+ // updateAllocationConfiguration() should change both queues
+ // to dynamic
+ queueManager.updateAllocationConfiguration(allocConf);
+ queueManager.setQueuesToDynamic(
+ ImmutableSet.of("root.parent1", "root.parent1.leaf1"));
+ q1 = queueManager.getLeafQueue("root.parent1.leaf1", false);
+ assertTrue("root.parent1.leaf1 is not a dynamic queue", q1.isDynamic());
+ p1 = queueManager.getParentQueue("root.parent1", false);
+ assertTrue("root.parent1 is not a dynamic queue", p1.isDynamic());
+ }
+
+ @Test
+ public void testApplicationAssignmentPreventsRemovalOfDynamicQueue()
+ throws Exception {
+ AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
+ queueManager = new QueueManager(scheduler);
+ queueManager.initialize(conf);
+ queueManager.updateAllocationConfiguration(allocConf);
+
+ FSLeafQueue q = queueManager.getLeafQueue("root.leaf1", true);
+ assertNotNull("root.leaf1 does not exist", q);
+ assertTrue("root.leaf1 is not empty", queueManager.isEmpty(q));
+
+ // assigning an application (without an appAttempt so far) to the queue
+ // removeEmptyDynamicQueues() should not remove the queue
+ ApplicationId applicationId = ApplicationId.newInstance(1L, 0);
+ q.addAssignedApp(applicationId);
+ q = queueManager.getLeafQueue("root.leaf1", false);
+ assertFalse("root.leaf1 is empty", queueManager.isEmpty(q));
+
+ queueManager.removePendingIncompatibleQueues();
+ queueManager.removeEmptyDynamicQueues();
+ q = queueManager.getLeafQueue("root.leaf1", false);
+ assertNotNull("root.leaf1 has been removed", q);
+ assertFalse("root.leaf1 is empty", queueManager.isEmpty(q));
+
+ ApplicationAttemptId applicationAttemptId =
+ ApplicationAttemptId.newInstance(applicationId, 0);
+ ActiveUsersManager activeUsersManager =
+ Mockito.mock(ActiveUsersManager.class);
+ RMContext rmContext = Mockito.mock(RMContext.class);
+
+ // the appAttempt is created
+ // removeEmptyDynamicQueues() should not remove the queue
+ FSAppAttempt appAttempt = new FSAppAttempt(scheduler, applicationAttemptId,
+ "a_user", q, activeUsersManager, rmContext);
+ q.addApp(appAttempt, true);
+ queueManager.removeEmptyDynamicQueues();
+ q = queueManager.getLeafQueue("root.leaf1", false);
+ assertNotNull("root.leaf1 has been removed", q);
+ assertFalse("root.leaf1 is empty", queueManager.isEmpty(q));
+
+ // the appAttempt finished, the queue should be empty
+ q.removeApp(appAttempt);
+ q = queueManager.getLeafQueue("root.leaf1", false);
+ assertTrue("root.leaf1 is not empty", queueManager.isEmpty(q));
+
+ // removeEmptyDynamicQueues() should remove the queue
+ queueManager.removePendingIncompatibleQueues();
+ queueManager.removeEmptyDynamicQueues();
+ q = queueManager.getLeafQueue("root.leaf1", false);
+ assertNull("root.leaf1 has not been removed", q);
+ }
+
+ @Test
+ public void testRemovalOfIncompatibleNonEmptyQueue()
+ throws Exception {
+ AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
+ allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.a");
+ scheduler.allocConf = allocConf;
+ queueManager.updateAllocationConfiguration(allocConf);
+
+ FSLeafQueue q = queueManager.getLeafQueue("root.a", true);
+ assertNotNull("root.a does not exist", q);
+ assertTrue("root.a is not empty", queueManager.isEmpty(q));
+
+ // we start to run an application on root.a
+ notEmptyQueues.add(q);
+ q = queueManager.getLeafQueue("root.a", false);
+ assertNotNull("root.a does not exist", q);
+ assertFalse("root.a is empty", queueManager.isEmpty(q));
+
+ // root.a should not be removed by removeEmptyDynamicQueues or by
+ // removePendingIncompatibleQueues
+ queueManager.removePendingIncompatibleQueues();
+ queueManager.removeEmptyDynamicQueues();
+ q = queueManager.getLeafQueue("root.a", false);
+ assertNotNull("root.a does not exist", q);
+
+ // let's introduce queue incompatibility
+ allocConf.configuredQueues.get(FSQueueType.LEAF).remove("root.a");
+ allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.a");
+ allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.a.b");
+ queueManager.updateAllocationConfiguration(allocConf);
+
+ // since root.a has running applications, it should be still a leaf queue
+ q = queueManager.getLeafQueue("root.a", false);
+ assertNotNull("root.a has been removed", q);
+ assertFalse("root.a is empty", queueManager.isEmpty(q));
+
+ // removePendingIncompatibleQueues should still keep root.a as a leaf queue
+ queueManager.removePendingIncompatibleQueues();
+ q = queueManager.getLeafQueue("root.a", false);
+ assertNotNull("root.a has been removed", q);
+ assertFalse("root.a is empty", queueManager.isEmpty(q));
+
+ // when the application finishes, root.a should be a parent queue
+ notEmptyQueues.clear();
+ queueManager.removePendingIncompatibleQueues();
+ queueManager.removeEmptyDynamicQueues();
+ FSParentQueue p = queueManager.getParentQueue("root.a", false);
+ assertNotNull("root.a does not exist", p);
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org