You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2015/06/03 22:49:33 UTC
hadoop git commit: YARN-3762. FairScheduler: CME on
FSParentQueue#getQueueUserAclInfo. (kasha)
Repository: hadoop
Updated Branches:
refs/heads/trunk 107da29ff -> edb9cd0f7
YARN-3762. FairScheduler: CME on FSParentQueue#getQueueUserAclInfo. (kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/edb9cd0f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/edb9cd0f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/edb9cd0f
Branch: refs/heads/trunk
Commit: edb9cd0f7aa1ecaf34afaa120e3d79583e0ec689
Parents: 107da29
Author: Karthik Kambatla <ka...@apache.org>
Authored: Wed Jun 3 13:47:24 2015 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Wed Jun 3 13:47:24 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 2 +
.../scheduler/fair/FSParentQueue.java | 219 ++++++++++++++-----
.../scheduler/fair/QueueManager.java | 3 +-
3 files changed, 164 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/edb9cd0f/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 64f6abb..7948637 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -482,6 +482,8 @@ Release 2.8.0 - UNRELEASED
YARN-3751. Fixed AppInfo to check if used resources are null. (Sunil G via
zjshen)
+ YARN-3762. FairScheduler: CME on FSParentQueue#getQueueUserAclInfo. (kasha)
+
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/edb9cd0f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
index f74106a..7d2e5b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
@@ -23,6 +23,9 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -44,36 +47,64 @@ public class FSParentQueue extends FSQueue {
private static final Log LOG = LogFactory.getLog(
FSParentQueue.class.getName());
- private final List<FSQueue> childQueues =
- new ArrayList<FSQueue>();
+ private final List<FSQueue> childQueues = new ArrayList<>();
private Resource demand = Resources.createResource(0);
private int runnableApps;
-
+
+ private ReadWriteLock rwLock = new ReentrantReadWriteLock();
+ private Lock readLock = rwLock.readLock();
+ private Lock writeLock = rwLock.writeLock();
+
public FSParentQueue(String name, FairScheduler scheduler,
FSParentQueue parent) {
super(name, scheduler, parent);
}
public void addChildQueue(FSQueue child) {
- childQueues.add(child);
+ writeLock.lock();
+ try {
+ childQueues.add(child);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public void removeChildQueue(FSQueue child) {
+ writeLock.lock();
+ try {
+ childQueues.remove(child);
+ } finally {
+ writeLock.unlock();
+ }
}
@Override
public void recomputeShares() {
- policy.computeShares(childQueues, getFairShare());
- for (FSQueue childQueue : childQueues) {
- childQueue.getMetrics().setFairShare(childQueue.getFairShare());
- childQueue.recomputeShares();
+ readLock.lock();
+ try {
+ policy.computeShares(childQueues, getFairShare());
+ for (FSQueue childQueue : childQueues) {
+ childQueue.getMetrics().setFairShare(childQueue.getFairShare());
+ childQueue.recomputeShares();
+ }
+ } finally {
+ readLock.unlock();
}
}
public void recomputeSteadyShares() {
- policy.computeSteadyShares(childQueues, getSteadyFairShare());
- for (FSQueue childQueue : childQueues) {
- childQueue.getMetrics().setSteadyFairShare(childQueue.getSteadyFairShare());
- if (childQueue instanceof FSParentQueue) {
- ((FSParentQueue) childQueue).recomputeSteadyShares();
+ readLock.lock();
+ try {
+ policy.computeSteadyShares(childQueues, getSteadyFairShare());
+ for (FSQueue childQueue : childQueues) {
+ childQueue.getMetrics()
+ .setSteadyFairShare(childQueue.getSteadyFairShare());
+ if (childQueue instanceof FSParentQueue) {
+ ((FSParentQueue) childQueue).recomputeSteadyShares();
+ }
}
+ } finally {
+ readLock.unlock();
}
}
@@ -81,21 +112,37 @@ public class FSParentQueue extends FSQueue {
public void updatePreemptionVariables() {
super.updatePreemptionVariables();
// For child queues
- for (FSQueue childQueue : childQueues) {
- childQueue.updatePreemptionVariables();
+
+ readLock.lock();
+ try {
+ for (FSQueue childQueue : childQueues) {
+ childQueue.updatePreemptionVariables();
+ }
+ } finally {
+ readLock.unlock();
}
}
@Override
public Resource getDemand() {
- return demand;
+ readLock.lock();
+ try {
+ return Resource.newInstance(demand.getMemory(), demand.getVirtualCores());
+ } finally {
+ readLock.unlock();
+ }
}
@Override
public Resource getResourceUsage() {
Resource usage = Resources.createResource(0);
- for (FSQueue child : childQueues) {
- Resources.addTo(usage, child.getResourceUsage());
+ readLock.lock();
+ try {
+ for (FSQueue child : childQueues) {
+ Resources.addTo(usage, child.getResourceUsage());
+ }
+ } finally {
+ readLock.unlock();
}
return usage;
}
@@ -106,20 +153,25 @@ public class FSParentQueue extends FSQueue {
// Limit demand to maxResources
Resource maxRes = scheduler.getAllocationConfiguration()
.getMaxResources(getName());
- demand = Resources.createResource(0);
- for (FSQueue childQueue : childQueues) {
- childQueue.updateDemand();
- Resource toAdd = childQueue.getDemand();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Counting resource from " + childQueue.getName() + " " +
- toAdd + "; Total resource consumption for " + getName() +
- " now " + demand);
- }
- demand = Resources.add(demand, toAdd);
- demand = Resources.componentwiseMin(demand, maxRes);
- if (Resources.equals(demand, maxRes)) {
- break;
+ writeLock.lock();
+ try {
+ demand = Resources.createResource(0);
+ for (FSQueue childQueue : childQueues) {
+ childQueue.updateDemand();
+ Resource toAdd = childQueue.getDemand();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Counting resource from " + childQueue.getName() + " " +
+ toAdd + "; Total resource consumption for " + getName() +
+ " now " + demand);
+ }
+ demand = Resources.add(demand, toAdd);
+ demand = Resources.componentwiseMin(demand, maxRes);
+ if (Resources.equals(demand, maxRes)) {
+ break;
+ }
}
+ } finally {
+ writeLock.unlock();
}
if (LOG.isDebugEnabled()) {
LOG.debug("The updated demand for " + getName() + " is " + demand +
@@ -127,33 +179,31 @@ public class FSParentQueue extends FSQueue {
}
}
- private synchronized QueueUserACLInfo getUserAclInfo(
- UserGroupInformation user) {
- QueueUserACLInfo userAclInfo =
- recordFactory.newRecordInstance(QueueUserACLInfo.class);
- List<QueueACL> operations = new ArrayList<QueueACL>();
+ private QueueUserACLInfo getUserAclInfo(UserGroupInformation user) {
+ List<QueueACL> operations = new ArrayList<>();
for (QueueACL operation : QueueACL.values()) {
if (hasAccess(operation, user)) {
operations.add(operation);
}
}
-
- userAclInfo.setQueueName(getQueueName());
- userAclInfo.setUserAcls(operations);
- return userAclInfo;
+ return QueueUserACLInfo.newInstance(getQueueName(), operations);
}
@Override
- public synchronized List<QueueUserACLInfo> getQueueUserAclInfo(
- UserGroupInformation user) {
+ public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) {
List<QueueUserACLInfo> userAcls = new ArrayList<QueueUserACLInfo>();
// Add queue acls
userAcls.add(getUserAclInfo(user));
// Add children queue acls
- for (FSQueue child : childQueues) {
- userAcls.addAll(child.getQueueUserAclInfo(user));
+ readLock.lock();
+ try {
+ for (FSQueue child : childQueues) {
+ userAcls.addAll(child.getQueueUserAclInfo(user));
+ }
+ } finally {
+ readLock.unlock();
}
return userAcls;
@@ -168,12 +218,32 @@ public class FSParentQueue extends FSQueue {
return assigned;
}
- Collections.sort(childQueues, policy.getComparator());
- for (FSQueue child : childQueues) {
- assigned = child.assignContainer(node);
- if (!Resources.equals(assigned, Resources.none())) {
- break;
+ // Hold the write lock when sorting childQueues
+ writeLock.lock();
+ try {
+ Collections.sort(childQueues, policy.getComparator());
+ } finally {
+ writeLock.unlock();
+ }
+
+ /*
+ * We are releasing the lock between the sort and iteration of the
+ * "sorted" list. There could be changes to the list here:
+ * 1. Add a child queue to the end of the list, this doesn't affect
+ * container assignment.
+ * 2. Remove a child queue, this is probably good to take care of so we
+ * don't assign to a queue that is going to be removed shortly.
+ */
+ readLock.lock();
+ try {
+ for (FSQueue child : childQueues) {
+ assigned = child.assignContainer(node);
+ if (!Resources.equals(assigned, Resources.none())) {
+ break;
+ }
}
+ } finally {
+ readLock.unlock();
}
return assigned;
}
@@ -185,11 +255,17 @@ public class FSParentQueue extends FSQueue {
// Find the childQueue which is most over fair share
FSQueue candidateQueue = null;
Comparator<Schedulable> comparator = policy.getComparator();
- for (FSQueue queue : childQueues) {
- if (candidateQueue == null ||
- comparator.compare(queue, candidateQueue) > 0) {
- candidateQueue = queue;
+
+ readLock.lock();
+ try {
+ for (FSQueue queue : childQueues) {
+ if (candidateQueue == null ||
+ comparator.compare(queue, candidateQueue) > 0) {
+ candidateQueue = queue;
+ }
}
+ } finally {
+ readLock.unlock();
}
// Let the selected queue choose which of its container to preempt
@@ -201,7 +277,12 @@ public class FSParentQueue extends FSQueue {
@Override
public List<FSQueue> getChildQueues() {
- return childQueues;
+ readLock.lock();
+ try {
+ return Collections.unmodifiableList(childQueues);
+ } finally {
+ readLock.unlock();
+ }
}
@Override
@@ -218,23 +299,43 @@ public class FSParentQueue extends FSQueue {
}
public void incrementRunnableApps() {
- runnableApps++;
+ writeLock.lock();
+ try {
+ runnableApps++;
+ } finally {
+ writeLock.unlock();
+ }
}
public void decrementRunnableApps() {
- runnableApps--;
+ writeLock.lock();
+ try {
+ runnableApps--;
+ } finally {
+ writeLock.unlock();
+ }
}
@Override
public int getNumRunnableApps() {
- return runnableApps;
+ readLock.lock();
+ try {
+ return runnableApps;
+ } finally {
+ readLock.unlock();
+ }
}
@Override
public void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
- for (FSQueue childQueue : childQueues) {
- childQueue.collectSchedulerApplications(apps);
+ readLock.lock();
+ try {
+ for (FSQueue childQueue : childQueues) {
+ childQueue.collectSchedulerApplications(apps);
+ }
+ } finally {
+ readLock.unlock();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/edb9cd0f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
index 64442ab..6556717 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
@@ -304,7 +304,8 @@ public class QueueManager {
}
}
queues.remove(queue.getName());
- queue.getParent().getChildQueues().remove(queue);
+ FSParentQueue parent = queue.getParent();
+ parent.removeChildQueue(queue);
}
/**